Skip to content

Sources (beta)

The first step of any stream processing pipeline is to get data. Quix streams provide a source API to connect your data source to Kafka and a StreamingDataframe easily.

For example, using a CSV file:

from quixstreams import Application
from quixstreams.sources import CSVSource

def main():
    app = Application()
    source = CSVSource(path="input.csv")

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()

if __name__ == "__main__":
    main()

Supported sources

Quix Streams provides the following sources out of the box:

  • CSVSource: A source that reads data from a single CSV file.
  • KafkaReplicatorSource: A source that replicates a topic from a Kafka broker to your application broker.
  • QuixEnvironmentSource: A source that replicates a topic from a Quix Cloud environment to your application broker.

To create a custom source, read Creating a Custom Source.

Multiprocessing

For good performance, each source runs in a subprocess. Quix Streams automatically manages the subprocess's setting up, monitoring, and tearing down.

For multiplatform support, Quix Streams starts the source process using the spawn approach. As a side effect, each Source instance must be pickleable. If a source needs to handle unpickleable objects, it's best to initialize those in the source subprocess (in the BaseSource.start or Source.run methods).

Customize Topic Configuration

Sources work by sending data to intermediate Kafka topics, which StreamingDataFrames then consume and process.

By default, each Source provides a default topic based on its configuration.
To customize the topic config, pass a new Topic object to the app.dataframe() method together with the Source instance.

Example:

Provide a custom topic with four partitions to the source.

from quixstreams import Application
from quixstreams.sources import CSVSource
from quixstreams.models.topics import TopicConfig

def main():
    app = Application()
    # Create a CSVSource
    source = CSVSource(path="input.csv")

    # Define a topic for the CSVSource with a custom config
    topic = app.topic("my_csv_source", config=TopicConfig(num_partitions=4, replication_factor=1))

    # Pass the topic together with the CSVSource to a dataframe
    # When the CSVSource starts, it will produce data to this topic
    sdf = app.dataframe(topic=topic, source=source)
    sdf.print(metadata=True)

    app.run()

if __name__ == "__main__":
    main()

Standalone sources

So far we have covered how to run Sources and process data within the same application.

When you scale your processing applications to more instances, you may need to run only a single instance of the Source.
For example, when the source is reading data from some Websocket API, and you want to process it with multiple apps.

To achieve that, Sources can be run in a standalone mode.

Example

Running an imaginary Websocket source in a standalone mode to read data only once.

from quixstreams import Application

def main():
    app = Application()

    # Create an instance of SomeWebsocketSource
    source = SomeWebsocketSource(url="wss://example.com")

    # Register the source in the app
    app.add_source(source)

    # Start the application
    # The app will start SomeWebsocketSource, and it will produce data to the default intermediate topic
    app.run()

if __name__ == "__main__":
    main()

To customize the topic the Source will use, create a new Topic and pass it to the app.add_source() method:

from quixstreams import Application
from quixstreams.sources import CSVSource
from quixstreams.models.topics import TopicConfig

def main():
    app = Application()
    # Create an instance of SomeWebsocketSource
    source = SomeWebsocketSource(url="wss://example.com")

    # Define a topic for the CSVSource with a custom config
    topic = app.topic("some-websocket-source", config=TopicConfig(num_partitions=4, replication_factor=1))

    # Register the source and topic in the application
    app.add_source(source=source, topic=topic)

    # Start the application
    app.run()

if __name__ == "__main__":
    main()