Skip to content

Sinks (beta)

In many stream processing use cases the results need to be written to external destinations to be shared with other subsystems.

Quix Streams provides a sink API to achieve that.

An example using InfluxDB Sink:

from quixstreams import Application
from quixstreams.sinks.influxdb3 import InfluxDB3Sink

app = Application(broker_address="localhost:9092")
topic = app.topic("numbers-topic")

# Initialize InfluxDB3Sink
influx_sink = InfluxDB3Sink(
    token="<influxdb-access-token>",
    host="<influxdb-host>",
    organization_id="<influxdb-org>",
    database="<influxdb-database>",
    measurement="numbers",
    fields_keys=["number"],
    tags_keys=["tag"]
)

sdf = app.dataframe(topic)
# Do some processing here ...
# Sink data to InfluxDB
sdf.sink(influx_sink)

Sinks Are Destinations

When .sink() is called on a StreamingDataFrame instance, it marks the end of the processing pipeline, and the StreamingDataFrame can't be changed anymore.

Make sure you call StreamingDataFrame.sink() as the last operation.

Supported Sinks

Currently, Quix Streams provides these sinks out of the box: - CSV Sink - a simple CSV sinks that writes data to a single CSV file. - InfluxDB 3 Sink - a sink to write data to InfluxDB 3.

It's also possible to implement your own custom sinks.
Please see the Creating a Custom Sink page on how to do that.

Performance Considerations

Since the implementation of BatchingSink accumulates data in-memory, it will increase memory usage.

If the batches become large enough, it can also put additional load on the destination and decrease the overall throughput.

To adjust the number of messages that are batched and written in one go, you may provide a commit_every parameter to the Application.
It will limit the amount of data processed and sinked during a single checkpoint.
Note that it only limits the amount of incoming messages, and not the number of records being written to sinks.

Example:

from quixstreams import Application
from quixstreams.sinks.influxdb3 import InfluxDB3Sink

# Commit the checkpoints after processing 1000 messages or after a 5 second interval has elapsed (whichever is sooner).
app = Application(
    broker_address="localhost:9092",
    commit_interval=5.0, 
    commit_every=1000,  
)
topic = app.topic('numbers-topic')
sdf = app.dataframe(topic)

# Create an InfluxDB sink that batches data between checkpoints.
influx_sink = InfluxDB3Sink(
    token="<influxdb-access-token>",
    host="<influxdb-host>",
    organization_id="<influxdb-org>",
    database="<influxdb-database>",
    measurement="numbers",
    fields_keys=["number"],
    tags_keys=["tag"]
)

# The sink will write to InfluxDB across all assigned partitions.
sdf.sink(influx_sink)