Skip to content

Creating a custom source

Quix Streams also provides a set of classes to help users implement custom sources.

Source

The recommended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers.

To get started, implement the run method and return when self.running is False.

Example subclass:

from quixstreams.sources.base import Source

class MySource(Source):
    def run(self):
        with open("file.txt", "r") as f:
            while self.running:

                line = f.readline()
                if not line:
                   return

                msg = self.serialize(
                    key="file.txt",
                    value=line.strip(),
                )

                self.produce(
                    key=msg.key,
                    value=msg.value,
                )

For more information, see quixstreams.sources.base.Source docstrings.

Stateful Source

The recommended parent class to create new sources that need a state. Subclass of Source.

Fault Tolerance & Recovery

Stateful sources store the state in memory. To prevent data loss when the application restarts, the store is backed by a changelog topic in Kafka. On startup, the application will consume the changelog topic to rebuild the source state.

How to Use State in Sources

There are two main moving parts:

  • The StatefulSource.state property - use it to access the State object which provides an interface to update and retrieve keys from the state.
  • The StatefulSource.flush method - call it to commit the state changes and the progress of the Source.

In Stateful sources, the lifecycle of the State object is tied to the store transaction.
When the StatefulSource.flush is called, it commits the current store transaction to guarantee that the state changes are saved.

After that, the State returned by StatefulSource.state is no longer valid, and you must call StatefulSource.state again to get a fresh State instance.

We recommend to access the State through the state property as it handles the lifecycle for you.

To learn more about the State, see the Stateful Processing page and For more information, see quixstreams.sources.base.StatefulSource API docs.

Example subclass:

import sys
import time

from quixstreams.sources.base import StatefulSource

class RangeSource(StatefulSource):
    def run(self):
        # Get the key "current" from the state
        start = self.state.get("current", 0) + 1
        for i in range(start, sys.maxsize):
            if not self.running:
                return

            # Update the key in the state
            self.state.set("current", i)
            serialized = self._producer_topic.serialize(value=i)
            self.produce(key="range", value=serialized.value)
            time.sleep(0.1)

            # Flush the state changes every 10 messages
            if i % 10 == 0:
                self.flush()

BaseSource

This is the base class for all sources. It handles configuring the source and requires the definition of three must-have methods.

  • start: This method is called, in the subprocess, when the source is started.
  • stop: This method is called, in the subporcess, when the application is shutting down.
  • default_topic: This method is called, in the main process, when a topic is not provided with the source.

For more information, see quixstreams.sources.base.BaseSource docstrings.

Custom Sources and Jupyter Notebook

Due to the multiprocessing nature of sources, writing a custom one in a Jupyter Notebook doesn't work out of the box.

Running this cell will produce a similar output as below:

from quixstreams import Application
from quixstreams.sources import Source

import random
import time

class MySource(Source):
    def run(self):
        while self.running:
            msg = self.serialize(key="test", value=random.randint(0, 10000))

            self.produce(
                key=msg.key,
                value=msg.value,
            )
            time.sleep(1)

def main():
    app = Application(broker_address="localhost:19092")
    source = MySource(name="mysource")

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()

if __name__ == "__main__":
    main()
[2024-09-25 10:54:37,852] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'localhost:19092'}" consumer_group="quixstreams-default" auto_offset_reset="latest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-09-25 10:54:37,853] [INFO] [quixstreams] : Topics required for this application: "mysource"
[2024-09-25 10:54:37,855] [INFO] [quixstreams] : Creating a new topic "mysource" with config: "{'num_partitions': 1, 'replication_factor': 1, 'extra_config': {}}"
[2024-09-25 10:54:38,856] [INFO] [quixstreams] : Topic "mysource" has been created
[2024-09-25 10:54:38,857] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-09-25 10:54:38,859] [INFO] [quixstreams] : Kafka topics validation complete
[2024-09-25 10:54:38,860] [INFO] [quixstreams] : Initializing state directory at "<project path>/state/quixstreams-default"
[2024-09-25 10:54:38,860] [INFO] [quixstreams] : Waiting for incoming messages
[2024-09-25 10:54:39,007] [INFO] [quixstreams] : Starting source mysource
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<env path>/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<env path>/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'MySource' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

To fix that, you need to define your custom source in a separate file.

%%writefile source.py
# indicate to IPython we want to write this content to a file

from quixstreams.sources import Source

import time
import random

class MySource(Source):
    def run(self):
        while self.running:
            msg = self.serialize(key="test", value=random.randint(0, 10000))

            self.produce(
                key=msg.key,
                value=msg.value,
            )
            time.sleep(1)
from quixstreams import Application

from source import MySource

def main():
  app = Application(broker_address="localhost:19092")
  source = MySource(name="mysource")

  sdf = app.dataframe(source=source)
  sdf.print(metadata=True)

  app.run()

if __name__ == "__main__":
  main()