Process - threshold detection
In this part of the tour you'll learn how to create a threshold detection transform. The transform detects if CPU load exceeds a certain threshold, and if so, sends a message to its output topic.
Create the threshold detection transform
To create the threshold detection transform:
- In your
Developenvironment, click onCode Samplesin the main left-hand navigation. - Select the
Python,Transformation, andBasic templatesfilters. - For
Starter transformationclickPreview code. - Click
Edit code. - Name the transform "CPU Threshold".
- Select the input topic
cpu-load. - For the output topic, add a new topic called
cpu-spike. - In the application view, click on
main.pyto edit it. -
Replace all the code in
main.pywith the following:from quixstreams import Application import os # Create an Application # It will get the SDK token from environment variables to connect to Quix Kafka app = Application() # Define input and output topics input_topic = app.topic(os.environ["input"]) output_topic = app.topic(os.environ["output"]) # Create a StreamingDataFrame to process data sdf = app.dataframe(input_topic) # Filter in all rows where CPU load is over 25. sdf = sdf.filter(lambda row: row["cpu_load"] > 25) # Produce message payload with alert. sdf = sdf.apply(lambda row: "CPU value is " + str(row["cpu_load"])) # Print messages to the console sdf = sdf.update(lambda row: print(row)) # Send messages to the output topic sdf = sdf.to_topic(output_topic) if __name__ == "__main__": # Run the Application app.run(sdf) -
Tag the project as
process-v1and deploy as a service. - Monitor the logs for the deployed process.
If CPU load exceeds the threshold the message is published to the output topic, for further processing in the next stage of the pipeline.