Get started with Quix Streams in less than a minute

Quix Streams is an open-source library for processing data in Kafka using pure Python. It’s designed to give you the power of a distributed system in a lightweight library by combining the low-level scalability and resiliency features of Kafka with an easy to use Python interface.

Step 1

Install Quix Streams

Copy
python3 -m pip install quixstreams
Step 2

Connect to Kafka and process streaming data with DataFrames

Copy
from quixstreams import Application

app = Application(broker_address="localhost:9092")

input_topic = app.topic("cardata")
output_topic = app.topic("speed-hopping-windows")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Calculate hopping window of 1s with 200ms steps
sdf = sdf.apply(lambda row: row["Speed"]) \
      .hopping_window(1000, 200).mean().final() 

# Send rows from SDF back to the output topic as JSON messages
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run(sdf)

Quix CLI is a free tool that enables you to create, debug, and run pipelines locally using Quix Streams. Experience the best of Quix by developing applications locally in Python with Quix Streams, the Quix CLI, and your preferred IDE.

Start building

Quix cloud offers an intuitive user interface to easily build and visualize your pipelines, and a CLI that helps you manage your infrastructure while developing locally.

Try Quix cloud free for 30 days with $300 of free credit in computing resources.

Quix's open-source stream processing library is intuitive for engineers coming from batch tools like Pandas and Spark. It's as simple as:

Step 1

Install Quix Streams

Copy code
python3 -m pip install quixstreams
Step 2

Connect to Kafka and process streaming data with DataFrames

Copy code
# Library requirements
# Python 3.8+, Apache Kafka 0.10+

from quixstreams import Application
import uuid

app = Application(
    broker_address="publickafka.quix.io:9092",  # Kafka broker address
    consumer_group=str(uuid.uuid4())  # Kafka consumer group
)
input_topic = app.topic("demo-onboarding-prod-chat", value_deserializer='json')

sdf = app.dataframe(input_topic)

sdf["tokens_count"] = sdf["message"].apply(lambda message: len(message.split(" ")))
sdf = sdf[["role", "tokens_count"]]

sdf = sdf.update(lambda row: print(row))

app.run(sdf)