Quix Streams

The open source framework for processing data on Apache Kafka with Streaming DataFrames.

Copy
from quixstreams import Application

# Downsample temperature sensor data to a fixed frequency
# using tumbling windows
app = Application(broker_address="localhost:9092")

input_topic = app.topic("temperature-data-raw")
output_topic = app.topic("temperature-data-5hz")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Calculate the average temperature over tumbling windows of 200ms
sdf = sdf.apply(lambda row: row["temperature_C"])\ 
    .tumbling_window(duration_ms=200).mean().final()

# Send rows from SDF back to the output topic as JSON messages
sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run()
Learn more about windowing in Docs
Copy
from quixstreams import Application

# Filter events with a temperature higher than a specific value
app = Application(broker_address="localhost:9092")

input_topic = app.topic("temperature-data-raw")
output_topic = app.topic("temperature-data-filtered")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Use a DataFrame expression to filter events
sdf = sdf[sdf["temperature_C"] >= 100]

# Send rows from SDF back to the output topic as JSON messages
sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run()
Learn more about filtering in Docs
Copy
from quixstreams import Application

app = Application(broker_address="localhost:9092")

input_topic = app.topic("temperature-data-raw")
output_topic = app.topic("temperature-data-projected")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Project sensor GPS data to a new column
sdf["gps"] = sdf.apply(
    lambda row: {"lat": row["sensor_latitude"], "long": row["sensor_longitude"]}
)

# Derive a new column based on the existing one
sdf["is_freezing"] = sdf["temperature_C"] < 0

# Extract columns from the rows before sending them to the output topic
sdf = sdf[["temperature_C", "sensor_id", "gps", "is_freezing"]]

# Send rows from SDF back to the output topic as JSON messages
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
    app.run()
Learn more about projection in Docs
Copy
from quixstreams import Application
from quixstreams.sinks.core.influxdb3 import InfluxDB3Sink
from quixstreams.sources import CSVSource

app = Application(broker_address="localhost:9092")

# Define a CSV source connector to read data from the file
csv_source = CSVSource(path="temperature_data_test.csv", name="temperature-csv-source")

# Pass the source to a StreamingDataFrame (SDF)
sdf = app.dataframe(source=csv_source)

# Define a sink connector for InfluxDB
influxdb_sink = InfluxDB3Sink(
    token="<token>",
    host="<host>",
    organization_id="<organization_id>",
    database="<db name>",
    measurement="temperature_F",
)

# Add a new column with temperature values converted from °C to °F
sdf["temperature_F"] = sdf["temperature_C"].apply(lambda temp: temp * 9 / 5 + 32.0)

# Send rows from the SDF back to the InfluxDB database
sdf.sink(influxdb_sink)

if __name__ == "__main__":
    app.run()
Learn more about connectors in Docs
Quix home heading animation with three dots.

Python-native processing and connectors

Pure Python, meaning no wrappers around Java and no cross-language debugging.

Sources & Sinks API for building custom connectors that integrate data with Kafka whilst handling retries and backpressure. JSON, Avro, Protobuf and schema registry support to keep your ever-changing data valid and clean.

Process streaming data using DataFrames API

Treat real-time data streams as continuously updating tables via a Streaming DataFrame API. Ideal for transitioning projects from Pandas or PySpark.

Use built-in operators for aggregation, grouping, windowing, filtering,  branching, merging, and more to build stateful applications in fewer lines of Python code.

Flexible, scalable and fault tolerant

Checkpointing and exactly-once processing guarantees ensure your data pipelines are durable and fault-tolerant through unpredictable infrastructure issues. 

By leveraging Kafka, you’re building infinitely horizontally scalable services. Data replication ensures redundancy and high availability for your data consumers.

Connect your Kafka to any source or destination

Quix Streams has out-of-the-box source and sink connectors for popular technologies. The built-in Sources & Sinks API also makes it easy to build custom Kafka connectors for any technology.

Explore connectors
Cloud containing emojis representing AI apps, data and LLM models on a soft blue gradient background

Get started with streaming DataFrames

Learn how the Quix Streams library simplifies building real-time apps with Kafka and Python in our video walkthroughs and get up and running with streaming DataFrames in minutes.

Check out the playlist on our YouTube account

Developer love for Quix

Open source Python client library

Support the project by starring the repo, or submit a PR to become a contributor.

Check out the repo
Quix Streams GitHub

Start building your pipeline

Create, debug and run streaming data pipelines on your local machine using the Quix CLI.

Quickstart guide