Sources (beta)
The first step of any stream processing pipeline is to get data. Quix streams provide a source API to connect your data source to Kafka and a StreamingDataframe easily.
For example, using a CSV file:
from quixstreams import Application
from quixstreams.sources import CSVSource
def main():
app = Application()
source = CSVSource(path="input.csv")
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run(sdf)
if __name__ == "__main__":
main()
Supported sources
Quix streams provide a source out of the box.
- CSVSource: A source that reads data from a single CSV file.
- KafkaReplicatorSource: A source that replicates a topic from a Kafka broker to your application broker.
- QuixEnvironmentSource: A source that replicates a topic from a Quix Cloud environment to your application broker.
You can also implement your own, have a look at Creating a Custom Source for documentation on how to do that.
Multiprocessing
For good performance, each source runs in a subprocess. Quix Streams automatically manages the subprocess's setting up, monitoring, and tearing down.
For multiplatform support, Quix Streams starts the source process using the spawn approach. As a side effect, each Source instance must be pickleable. If a source needs to handle unpickleable objects, it's best to initialize those in the source subprocess (in the BaseSource.start
or Source.run
methods).
Topics
Sources work by sending data to Kafka topics. Then StreamingDataFrames consume these topics.
Each source provides a default topic based on its configuration. You can override the default topic by specifying a topic using the app.dataframe()
method.
Example
Provide a custom topic with four partitions to the source.
from quixstreams import Application
from quixstreams.sources import CSVSource
from quixstreams.models.topics import TopicConfig
def main():
app = Application()
source = CSVSource(path="input.csv")
topic = app.topic("my_csv_source", config=TopicConfig(num_partitions=4, replication_factor=1))
sdf = app.dataframe(topic=topic, source=source)
sdf.print(metadata=True)
app.run(sdf)
if __name__ == "__main__":
main()