Quix Streams Release 3.21.0
Quix Streams 3.21.0 introduces dynamic topic routing for content-based message distribution, streamlining R&D data workflows and reducing processing overhead.

Quix Streams 3.21.0 introduces dynamic topic routing with StreamingDataFrame.to_topic()
, enabling content-based message distribution to streamline R&D data workflows and reduce downstream processing overhead.
Content-based data routing for automated R&D workflows
Dynamic topic selection: Route messages based on real-time data values
R&D environments generate diverse data streams that require different processing paths. Battery test data needs immediate routing to safety monitoring systems when voltage exceeds thresholds, while normal readings flow to historical analysis pipelines. The enhanced to_topic()
method now accepts a callable function that examines message content and dynamically selects the destination topic.
from quixstreams import Application
app = Application(broker_address="localhost:9092")
# Define specialized topics for different data types
sensor_input = app.topic('battery-telemetry', value_deserializer='json')
normal_data = app.topic('historical-analysis', value_serializer='json')
critical_alerts = app.topic('safety-monitoring', value_serializer='json')
performance_data = app.topic('optimization-analysis', value_serializer='json')
sdf = app.dataframe(sensor_input)
def route_battery_data(value, key, timestamp, headers):
"""Route battery test data based on operational parameters"""
voltage = value.get('voltage', 0)
temperature = value.get('temperature', 0)
# Critical safety conditions
if voltage > 4.2 or temperature > 60:
return critical_alerts
# Performance optimization data
elif value.get('test_phase') == 'performance_eval':
return performance_data
# Standard historical data
else:
return normal_data
sdf.to_topic(topic=route_battery_data)
This eliminates the need for downstream consumers to filter irrelevant messages, reducing processing overhead and improving system responsiveness for time-sensitive R&D operations.
Intelligent data segregation: Separate experimental data streams automatically
Multi-phase R&D projects generate different data types during various test stages. Drone flight testing produces navigation data during flight phases, diagnostic data during ground checks, and performance metrics during stress tests. Dynamic routing automatically segregates these data streams without manual intervention or complex filtering logic.
Teams can focus on relevant data subsets without building complex filtering infrastructure, accelerating analysis cycles and reducing the cognitive load on R&D engineers.
Conditional processing workflows: Trigger specialized analysis based on data characteristics
R&D often requires different analysis approaches based on test conditions or results. HVAC system testing might route steady-state data to efficiency analysis while transient data goes to control system evaluation. The routing function has access to the complete message context including headers and timestamps, enabling sophisticated decision logic.
def route_drone_telemetry(value, key, timestamp, headers):
"""Separate drone test data by operational phase"""
phase = value.get('flight_phase', 'unknown')
altitude = value.get('altitude', 0)
if phase == 'flight' and altitude > 100:
return flight_performance_topic
elif phase == 'ground_test':
return diagnostic_topic
elif value.get('stress_test_active', False):
return stress_analysis_topic
else:
return general_telemetry_topic
This approach eliminates the need to process all data through every analysis pipeline, reducing computational overhead and enabling more targeted R&D insights.
Simplified data architecture for R&D teams
Reduced downstream complexity: Eliminate filtering logic in consumer applications
Traditional approaches require each consumer application to implement filtering logic, leading to duplicated code and maintenance overhead. With dynamic routing, filtering logic centralizes in the streaming application, and consumers receive only relevant data. This simplifies the architecture for R&D teams with limited software engineering resources.
Improved processing efficiency: Reduce network traffic and storage overhead
By routing data to appropriate topics at the source, teams avoid transmitting irrelevant data across the network and storing unnecessary information in specialized databases. A rocket engine test generating 10GB of telemetry per hour can automatically separate thrust vector data (100MB) from general monitoring data (9.9GB), reducing storage costs for high-frequency analysis systems.
Upgrading to Quix Streams 3.21.0
Update your environment using pip:
pip install --upgrade quixstreams
The dynamic routing feature is fully backward compatible. Existing to_topic()
calls with static topics continue to work unchanged, while new deployments can adopt callable-based routing incrementally.
Next steps
Dynamic topic routing helps R&D teams build more efficient data processing architectures by automatically distributing data based on content characteristics. This reduces the infrastructure complexity typically associated with multi-stream R&D data processing and enables teams to focus on analysis rather than data management.
Explore the splitting data documentation for implementation examples, or review the complete release notes for additional technical details.
What’s a Rich Text element?
The rich text element allows you to create and format headings, paragraphs, blockquotes, images, and video all in one place instead of having to add and format them individually. Just double-click and easily create content.
Static and dynamic content editing
A rich text element can be used with static or dynamic content. For static content, just drop it into any page and begin editing. For dynamic content, add a rich text field to any collection and then connect a rich text element to that field in the settings panel. Voila!
How to customize formatting for each rich text
Headings, paragraphs, blockquotes, figures, images, and figure captions can all be styled after a class is added to the rich text element using the "When inside of" nested selector system.

Check out the repo
Our Python client library is open source, and brings DataFrames and the Python ecosystem to stream processing.

Interested in Quix Cloud?
Take a look around and explore the features of our platform.

Interested in Quix Cloud?
Take a look around and explore the features of our platform.
