CSV Sink
A basic sink to write processed data to a single CSV file.
It's meant to be used mostly for local debugging.
How To Use CSV Sink
To use a CSV sink, you need to create an instance of CSVSink
and pass
it to the StreamingDataFrame.sink()
method:
from quixstreams import Application
from quixstreams.sinks.core.csv import CSVSink
app = Application(broker_address="localhost:9092")
topic = app.topic("input-topic")
# Initialize a CSV sink with a file path
csv_sink = CSVSink(path="file.csv")
sdf = app.dataframe(topic)
# Do some processing here ...
# Sink data to a CSV file
sdf.sink(csv_sink)
if __name__ == '__main__':
app.run()
How the CSV Sink Works
CSVSink
is a batching sink.
It batches processed records in memory per topic partition, and writes them to the file when a checkpoint is committed.
The output file format is the following:
key,value,timestamp,topic,partition,offset
b'afd7e8ab-4af5-4322-8417-dbfc7a0d7694',"{""number"": 0}",1722945524540,numbers-10k-keys,0,0
b'557bae7f-14b6-46c4-abc3-12f232b54c8e',"{""number"": 1}",1722945524546,numbers-10k-keys,0,1
Serialization Formats
By default, CSVSink
serializes record keys by calling str()
on them, and message values with json.dumps()
.
To use your own serializer, pass key_serializer
and value_serializer
to CSVSink
:
import json
from quixstreams.sinks.core.csv import CSVSink
# Initialize a CSVSink with a file path
csv_sink = CSVSink(
path="file.csv",
# Define custom serializers for keys and values here.
# The callables must accept one argument for key/value, and return a string
key_serializer=lambda key: json.dumps(key),
value_serializer=lambda value: str(value),
)
Delivery Guarantees
The CSVSink
provides at-least-once guarantees, and the resulting CSV file may contain duplicated rows of data if there were errors during processing.