back
March 2, 2023
|
Announcements

Introducing Quix Streams, an open source library for telemetry data streaming

Lightweight, powerful, no JVM and no need for separate clusters of orchestrators. Here’s a look at our next-gen streaming library for C# and Python developers including feature summaries, code samples, and a sneak peek into our roadmap.

Two black Quix windows open in different tabs.
Quix Streams is a fast and general-purpose processing framework for streaming data. Build real-time applications and analytics systems on data streams using Python DataFrames and stateful operators, all without having to install a server-side engine.

An open source streaming library for data intensive applications

It’s our third birthday today, and to celebrate we decided to open source a project we’ve been working on since 2nd March 2020.

After three years in development, we’re pleased to release Quix Streams, a new library designed for building real-time applications that process high volumes of telemetry data when developers need a quick response and guaranteed reliability at scale.

Quix Streams is a lightweight library like Kafka Streams, has next-gen blob-backed state management like Flink, and Python-native support like Faust. It's written in C# and designed to be easily extended to other programming languages with an included interop generator project. Quix Streams is JVM free, does not require separate clusters or orchestrators, and includes enhanced features for working with time-series and binary telemetry data at the edge, on-prem or in the cloud.

Available to everyone for free under the Apache 2.0 licence. Quix Streams is aimed at people who work with time-series telemetry data streams—from software, data and ML engineers to data scientists and mechanical engineers.

Why we’re open sourcing Quix Streams

We’re excited by the potential of real-time telemetry data streams to create a new wave of next-gen applications at the intersection of the physical and digital worlds.

However we know this won’t happen unless we free developers from the drudgery of event streaming on the JVM and the hard choices they have to make between simplicity, power and reliability.

We want to provide the data community with a tool that is both powerful and easy to use. Thereby giving more developers the freedom to build previously unimaginable applications. Our hope is that open sourcing Quix Streams will encourage the wider data community to help make it even better.

What makes Quix Streams unique

Quix is a little different from other stream processing libraries and Kafka clients for a few key reasons:

  • It treats time as a “first-class citizen"

    The data in each message is given a timestamp that is automatically set as the primary key. This simplifies operations that involve time, such as the order of messages, buffering etc. Quix Streams also includes special handling for time-series data like window operations where the true timestamp from the source is used rather than the consumer clock.
  • It provides a "stream context" which let you group messages by source

    When dealing with topic partitions, it can be difficult to ensure that the right data is sent to the right partition. To solve this issue, we created the concept of a Stream context. This ensures that a stream binds all of its messages to one partition and creates an envelope for all different types of data in your topic. This means you can always be sure that the messages from each stream are ordered correctly and ensures that the integrity of the data is retained, even if there are outages or disruptions.

  • It has built-in support for time-series data, event data, and large binary blobs

    When appending data to a message, you would normally create a Python object and serialize it into bytes. In Quix Streams, the process is simpler. Instead, you append column values to rows and will merge the data into DataFrames and send them over the wire. This is not just more efficient for CPU and bandwidth but also easier to process further in the pipeline.

What you can do with Quix Streams

To learn more about what you can do with this library, check out the core features with simplified code samples to demonstrate how they work:

Use stream contexts for horizontal scaling

Stream contexts allow you to bundle data from one data source into the same scope with supplementary metadata—thus enabling workloads to be horizontally scaled with multiple replicas.

  • In the following sample, the create_stream function is used to create a stream called bus-123AAAV which gets assigned to one particular consumer and will receive messages in the correct order:

topic_producer = client.get_topic_producer("data")

stream = topic_producer.create_stream("bus-123AAAV")
# Message 1 sent (the stream context)

stream.properties.name = "BUS 123 AAAV"
# Message 2 sent (the human-readable identifier the bus)

stream.timeseries \
    .buffer \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_value("Lat", math.sin(index / 100.0) + math.sin(index) / 5.0) \
    .add_value("Long", math.sin(index / 200.0) + math.sin(index) / 5.0) \
    .publish()
# Message 3 sent (the time-series telemetry data from the bus)

stream.events \
    .add_timestamp_in_nanoseconds(time.time_ns()) \
    .add_value("driver_bell", "Doors 3 bell activated by passenger") \
    .publish()
# Message 4 sent (an event related to something that happened on the bus)

view raw 5-create-a-stream-context-to-reliably-scale-your-processing-horizontally.py hosted with ❤ by GitHub

Produce time-series data without worrying about serialization or deserialization

Quix Streams serializes and deserializes time-series data using different codecs and optimizations to minimize payloads in order to increase throughput and reduce latency.

  • The following example shows data being appended to a stream with the add_value method:

# Open the producer topic where to publish data.
topic_producer = client.get_topic_producer("data")

# Create a new stream for each device.
stream = topic_producer.create_stream("bus-123AAAV")
stream.properties.name = "BUS 123 AAAV"
stream.timeseries.buffer.time_span_in_milliseconds = 100
print("Sending values for 30 seconds.")

for index in range(0, 3000):
    
    stream.timeseries \
        .buffer \
        .add_timestamp(datetime.datetime.utcnow()) \
        .add_value("Lat", math.sin(index / 100.0) + math.sin(index) / 5.0) \
        .add_value("Long", math.sin(index / 200.0) + math.sin(index) / 5.0) \
        .publish()
        
    time.sleep(0.01)

view raw 1-produce-time-series-data-efficiently.py hosted with ❤ by GitHub

Leverage built-in buffers to optimize processing operations for windows of time-series data

If you’re sending data at high frequency, processing each message can be costly. The library provides built-in time-series buffers for producing and consuming, allowing several configurations for balancing between latency and cost.

  • For example, you can configure the library to release a packet from the buffer whenever 100 items of timestamped data are collected or when a certain number of milliseconds in data have elapsed (note that this is using time in the data, not the consumer clock).

def on_read_dataframe(stream: StreamConsumer, df: pd.DataFrame):
    df["total"] = df["price"] * df["items_count"]

    topic_producer.get_or_create_stream(stream.stream_id).timeseries_data.write(df)

buffer.on_received_dataframe = on_read_dataframe_handler

view raw 6b-perform-aggregation-over-a-window-of-time-series-data-with-built-in-buffers.py hosted with ❤ by GitHub

Use Pandas DataFrames to produce data more efficiently

Time-series parameters are emitted at the same time, so they share one timestamp. Handling this data independently is wasteful. The library uses a tabular system that can work for instance with Pandas DataFrames natively. Each row has a timestamp and user-defined tags as indexes


# Callback triggered for each new data frame
def on_parameter_data_handler(df: pd.DataFrame):

    # If the braking force applied is more than 50%, we mark HardBraking with True
    df["HardBraking"] = df.apply(lambda row: "True" if row.Brake > 0.5 else "False", axis=1)

    stream_producer.timeseries.publish(df)  # Send data back to the stream

view raw 7a-produce-pandas-dataframes-directly-to-kafka-without-any-extra-serialization.py hosted with ❤ by GitHub

Produce and consume different types of mixed data

This library allows you to produce and consume different types of mixed data in the same timestamp, like numbers, strings or binary data.

  • For example, you can produce both time-series data and large binary blobs together.

    Often, you’ll want to combine time series data with binary data. In the following example, we combine bus's onboard camera with telemetry from its ECU unit so we can analyze the onboard camera feed with context.

# Open the producer topic where to publish data.
topic_producer = client.get_topic_producer("data")

# Create a new stream for each device.
stream = topic_producer.create_stream("bus-123AAAV")

telemetry = BusVehicle.get_vehicle_telemetry("bus-123AAAV")

def on_new_camera_frame(frame_bytes):
    
    stream.timeseries \
        .buffer \
        .add_timestamp(datetime.datetime.utcnow()) \
        .add_value("camera_frame", frame_bytes) \
        .add_value("speed", telemetry.get_speed()) \
        .publish()
    
telemetry.on_new_camera_frame = on_new_camera_frame

view raw 3-produce-both-time-series-data-blobs.py hosted with ❤ by GitHub

  • You can also produce events that include payloads:

    For example, you might need to listen for changes in time-series or binary streams and produce an event (such as "speed limit exceeded"). These might require some kind of document to send along with the event message (e.g. transaction invoices, or a speeding ticket with photographic proof). Here's an example for a speeding camera:

# Callback triggered for each new data frame.
def on_data_frame_handler(topic, stream: StreamConsumer, df: pd.DataFrame):
		
	# We filter rows where the driver was speeding.
	above_speed_limit = df[df["speed"] > 130]

	# If there is a record of speeding, we sent a ticket.
	if df.shape[0] > O:

		# We find the moment with the highest speed.
		max_speed_moment = df['speed'].idxmax()
		speed = df.loc[max_speed_moment]
		time = df.loc[max_speed_moment]["time"]

		# We create a document that will be consumed by the ticket service.
		speeding_ticket = {
			'vehicle': stream.stream_id,
			'time': time,
			'speed': speed,
			'fine': (speed - 130) * 100,
			'photo_proof': df.loc[max_speed_moment]["camera_frame"]
		}

		topic_producer.get_or_create_stream(stream.stream_id) \
			.events \
			.add_timestamp_in_nanoseconds(time) \
			.add_value("ticket", json.dumps(speeding_ticket)) \
			.publish()

view raw 4-produce-events-from-time-series-and-binary-data-streams-with-payloads.py hosted with ❤ by GitHub

Leverage built-in stateful processing for greater resiliency

Quix Streams includes an easy-to-use state store combining blob storage and Kubernetes persistence volumes that ensures quick recovery from any outages or disruptions. To use it, you can create an instance of LocalFileStorage or use one of our helper classes to manage the state such as InMemoryStorage.

Here's an example of a stateful operation sum for a selected column in data.


state = InMemoryStorage(LocalFileStorage())

def on_g_force_x(topic, stream_consumer: StreamConsumer, data: TimeseriesData):

    for row in data.timestamps:
	# Append G-Force sensor value to accumulated state (SUM).
        state[stream_consumer.stream_id] += abs(row.parameters["gForceX"].numeric_value)
				
	# Attach new column with aggregated values.
        row.add_value("gForceX_sum", state[stream_consumer.stream_id])

	# Send updated rows to the producer topic.
    	topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(data)

# read streams
def read_stream(topic_consumer: TopicConsumer, stream_consumer: StreamConsumer):

	# If there is no record for this stream, create a default value.
	if stream_consumer.stream_id not in state:
		state[stream_consumer.stream_id] = 0

		# We subscribe to gForceX column.
		stream_consumer.timeseries.create_buffer("gForceX").on_read = on_g_force_x

view raw 8-manage-state-with-local-file-storage-for-stateful-processing.py hosted with ❤ by GitHub

Take advantage of performance and usability enhancements

The library also includes a number of other enhancements that are designed to simplify the process of managing configuration and performance when interacting with Kafka:

  • No schema registry required: Quix Streams doesn't need a schema registry to send different sets of types or parameters, this is handled internally by the protocol. This means that you can send more than one schema per topic.
  • Message splitting: Quix Streams automatically handles large messages on the producer side, splitting them up if required. You no longer need to worry about Kafka message limits. On the consumer side, those messages are automatically merged back.
  • Message Broker configuration: Many configuration settings are needed to use Kafka at its best, and the ideal configuration takes time. Quix Streams takes care of Kafka configuration by default but also supports custom configurations.
  • Checkpointing: Quix Streams supports manual or automatic checkpointing when you consume data from a Kafka Topic. This provides the ability to inform the Message Broker that you have already processed messages up to one point.
  • Horizontal scaling: Quix Streams handles horizontal scaling using the streaming context feature. You can scale the processing services, from one replica to many and back to one, and the library ensures that the data load is always shared between your replicas reliably.

For a detailed overview of features, see our library documentation.

What's next?

This is the first iteration of Quix Streams, and we’re already working on the next release. The main highlight is a new feature called "streaming data frames" that simplifies stateful stream processing for users coming from a batch processing environment. It eliminates the need for users to manage state in memory, update rolling windows, deal with checkpointing and state persistence, and manage state recovery after a service unexpectedly restarts. By introducing a familiar interface to Pandas DataFrames, we hope to make stream processing even more accessible to data professionals who are new to streaming data.

The following example shows how you would perform rolling window calculation on a streaming data frame:


# Create a projection for columns we need.
df = input_stream.df[["gForceX", "gForceY", "gForceZ"]] 

# Create new feature by simply combining three columns to one new column.
df["gForceTotal"] = df["gForceX"].abs() + df["gForceY"].abs() + df["gForceZ"].abs()

# Calculate rolling window of previous column for last 10 minutes
df["gForceTotal_avg10s"] = df["gForceTotal"].rolling("10m").mean()

# Loop through the stream row by row as data frow through the service. 
# Async iterator will stop the code if there is no new data incoming from i 
async for row in df:
    print(row)
    await output_stream.write(row)

view raw oss_whats-coming.py hosted with ❤ by GitHub

Note that this is exactly how you would do the same calculation on static data in Jupyter notebook—so will be easy to learn for those of you who are used to batch processing.

There's also no need to grapple with the complexity of stateful processing on streaming data—this will all be managed by the library. Moreover, although it will still feel like Pandas, it will use binary tables under the hood—which adds a significant performance boost compared to traditional Pandas DataFrames.

To find out when the next version is ready, make sure you watch the Quix Streams GitHub repo.

We also want our roadmap to be shaped by feedback and contributions from the wider data community:

What’s a Rich Text element?

The rich text element allows you to create and format headings, paragraphs, blockquotes, images, and video all in one place instead of having to add and format them individually. Just double-click and easily create content.

Static and dynamic content editing

A rich text element can be used with static or dynamic content. For static content, just drop it into any page and begin editing. For dynamic content, add a rich text field to any collection and then connect a rich text element to that field in the settings panel. Voila!

How to customize formatting for each rich text

Headings, paragraphs, blockquotes, figures, images, and figure captions can all be styled after a class is added to the rich text element using the "When inside of" nested selector system.

Related content

Graphic showing Quix Streams windowing code
Announcements

Introducing Streaming DataFrames

Learn how Streaming DataFrames can simplify real-time data processing in Python with a familiar DataFrame approach.
Tomáš Neubauer
Words by
2.0 text on radial gradient background.
Announcements

Announcing Quix 2.0—now with Git integration and multiple environments

Quix 2.0 is here 🚀 Designed around the concept of Infrastructure-as-Code, Quix 2.0 makes it easier to build and run reliable, powerful event-streaming applications that scale, with a single source of truth powered by Git.
Mike Rosam
Words by
Two black Quix windows open in different tabs.
Announcements

Introducing Quix Streams, an open source library for telemetry data streaming

Lightweight, powerful, no JVM and no need for separate clusters of orchestrators. Here’s a look at our next-gen streaming library for C# and Python developers including feature summaries, code samples, and a sneak peek into our roadmap.
Tomáš Neubauer
Words by