Get started developing locally with Quix Streams

Quix Streams is an open source library for processing data in Kafka using pure Python. It’s designed to give you the power of a distributed system in a lightweight library by combining the low-level scalability and resiliency features of Kafka with an easy to use Python interface.

Step 1

Install Quix Streams

Copy
python3 -m pip install quixstreams
Step 2

Connect to Kafka and process streaming data with DataFrames

Copy
from quixstreams import Application
import uuid

# Connect to the public Quix hosted broker to consume data
app = Application(
    broker_address="publickafka.quix.io:9092",  # Kafka broker address
    consumer_group=str(uuid.uuid4()),  # Kafka consumer group
    auto_offset_reset='latest',  # Read topic from the end
    producer_extra_config={'enable.idempotence': False}
)

input_topic = app.topic("demo-onboarding-prod-chat", value_deserializer='json')

sdf = app.dataframe(input_topic)

sdf["tokens_count"] = sdf["message"].apply(lambda message: len(message.split(" ")))
sdf = sdf[["role", "tokens_count"]]

sdf = sdf.update(lambda row: print(row))

app.run(sdf)
Learn more about Quix Streams

Quix CLI is a free terminal tool that enables you to create, debug, and run data pipelines on your local machine. Start building with Quix Streams, Docker and Git on your preferred IDE.

Build your pipeline

Quix's open-source stream processing library is intuitive for engineers coming from batch tools like Pandas and Spark. It's as simple as:

Step 1

Install Quix Streams

Copy code
python3 -m pip install quixstreams
Step 2

Connect to Kafka and process streaming data with DataFrames

Copy code
# Library requirements
# Python 3.8+, Apache Kafka 0.10+

from quixstreams import Application
import uuid

app = Application(
    broker_address="publickafka.quix.io:9092",  # Kafka broker address
    consumer_group=str(uuid.uuid4())  # Kafka consumer group
)
input_topic = app.topic("demo-onboarding-prod-chat", value_deserializer='json')

sdf = app.dataframe(input_topic)

sdf["tokens_count"] = sdf["message"].apply(lambda message: len(message.split(" ")))
sdf = sdf[["role", "tokens_count"]]

sdf = sdf.update(lambda row: print(row))

app.run(sdf)