Skip to content

Kafka Replicator Source

A source that reads data from a Kafka topic and produce it to another Kafka topic. The two topics can be located on different Kafka clusters.

This source supports exactly-once guarantees.

How To Use

To use a Kafka Replicator source, you need to create an instance of KafkaReplicatorSource and pass it to the app.dataframe() method.

from quixstreams import Application
from quixstreams.sources.core.kafka import KafkaReplicatorSource

def main():
    app = Application()
    source = KafkaReplicatorSource(
      name="my-source",
      app_config=app.config,
      topic="source-topic",
      broker_address="source-broker-address"
    )

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()

if __name__ == "__main__":
    main()

Topic

The Kafka Replicator source only deals with bytes. It reads the remote keys and values as bytes and produces them directly as bytes. You can configure the key and value deserializer used by the Streaming Dataframe with the key_deserializer and value_deserializer paramaters.

Consumer group

The Kafka Replicator consumer group is the source name prefixed by source-. Changing the name will reset the source state and it will re-replicate the data based on the configured auto_offset_reset. It is not based on the application consumer group, changing the application consumer group will not reset the source.

For more information about consumer group see the glosary