Sources API
quixstreams.sources.base.source
BaseSource
This is the base class for all sources.
Sources are executed in a sub-process of the main application.
To create your own source you need to implement:
start
stop
default_topic
BaseSource
is the most basic interface, and the framework expects every
source to implement it.
Use Source
to benefit from a base implementation.
You can connect a source to a StreamingDataframe using the Application.
Example snippet:
class RandomNumbersSource(BaseSource):
def __init__(self):
super().__init__()
self._running = False
def start(self):
self._running = True
while self._running:
number = random.randint(0, 100)
serialized = self._producer_topic.serialize(value=number)
self._producer.produce(
topic=self._producer_topic.name,
key=serialized.key,
value=serialized.value,
)
def stop(self):
self._running = False
def default_topic(self) -> Topic:
return Topic(
name="topic-name",
value_deserializer="json",
value_serializer="json",
)
def main():
app = Application(broker_address="localhost:9092")
source = RandomNumbersSource()
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
BaseSource.configure
This method is triggered before the source is started.
It configures the source's Kafka producer, the topic it will produce to and optional dependencies.
BaseSource.start
This method is triggered in the subprocess when the source is started.
The subprocess will run as long as the start method executes. Use it to fetch data and produce it to Kafka.
BaseSource.stop
This method is triggered when the application is shutting down.
The source must ensure that the run
method is completed soon.
BaseSource.default_topic
This method is triggered when the topic is not provided to the source.
The source must return a default topic configuration.
Note: if the default topic is used, the Application will prefix its name with "source__".
Source
A base class for custom Sources that provides a basic implementation of BaseSource
interface.
It is recommended to interface to create custom sources.
Subclass it and implement the run
method to fetch data and produce it to Kafka.
Example:
import random
import time
from quixstreams import Application
from quixstreams.sources import Source
class RandomNumbersSource(Source):
def run(self):
while self.running:
number = random.randint(0, 100)
serialized = self._producer_topic.serialize(value=number)
self.produce(key=str(number), value=serialized.value)
time.sleep(0.5)
def main():
app = Application(broker_address="localhost:9092")
source = RandomNumbersSource(name="random-source")
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
Helper methods and properties:
serialize()
produce()
flush()
running
Source.__init__
Arguments:
name
: The source unique name. It is used to generate the topic configuration.shutdown_timeout
: Time in second the application waits for the source to gracefully shutdown.
Source.running
Property indicating if the source is running.
The stop
method will set it to False
. Use it to stop the source gracefully.
Source.cleanup
This method is triggered once the run
method completes.
Use it to clean up the resources and shut down the source gracefully.
It flushes the producer when _run
completes successfully.
Source.stop
This method is triggered when the application is shutting down.
It sets the running
property to False
.
Source.start
This method is triggered in the subprocess when the source is started.
It marks the source as running, execute it's run method and ensure cleanup happens.
Source.run
This method is triggered in the subprocess when the source is started.
The subprocess will run as long as the run method executes. Use it to fetch data and produce it to Kafka.
Source.serialize
def serialize(key: Optional[object] = None,
value: Optional[object] = None,
headers: Optional[Headers] = None,
timestamp_ms: Optional[int] = None) -> KafkaMessage
Serialize data to bytes using the producer topic serializers and return a quixstreams.models.messages.KafkaMessage
.
Returns:
quixstreams.models.messages.KafkaMessage
Source.produce
def produce(value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3) -> None
Produce a message to the configured source topic in Kafka.
Source.flush
This method flush the producer.
It ensures all messages are successfully delivered to Kafka.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None
Raises:
CheckpointProducerTimeout
: if any message fails to produce before the timeout
Source.default_topic
Return a default topic matching the source name.
The default topic will not be used if the topic has already been provided to the source.
Note: if the default topic is used, the Application will prefix its name with "source__".
Returns:
quixstreams.models.topics.Topic
StatefulSource
A Source
class for custom Sources that need a state.
Subclasses are responsible for flushing, by calling flush
, at reasonable intervals.
Example:
import random
import time
from quixstreams import Application
from quixstreams.sources import StatefulSource
class RandomNumbersSource(StatefulSource):
def run(self):
i = 0
while self.running:
previous = self.state.get("number", 0)
current = random.randint(0, 100)
self.state.set("number", current)
serialized = self._producer_topic.serialize(value=current + previous)
self.produce(key=str(current), value=serialized.value)
time.sleep(0.5)
# flush the state every 10 messages
i += 1
if i % 10 == 0:
self.flush()
def main():
app = Application(broker_address="localhost:9092")
source = RandomNumbersSource(name="random-source")
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
StatefulSource.__init__
Arguments:
name
: The source unique name. It is used to generate the topic configuration.shutdown_timeout
: Time in second the application waits for the source to gracefully shutdown.
StatefulSource.configure
def configure(topic: Topic,
producer: RowProducer,
*,
store_partition: Optional[StorePartition] = None,
**kwargs) -> None
This method is triggered before the source is started.
It configures the source's Kafka producer, the topic it will produce to and the store partition.
StatefulSource.store_partitions_count
Count of store partitions.
Used to configure the number of partition in the changelog topic.
StatefulSource.assigned_store_partition
The store partition assigned to this instance
StatefulSource.store_name
The source store name
StatefulSource.state
Access the State
of the source.
The State
lifecycle is tied to the store transaction. A transaction is only valid until the next .flush()
call. If no valid transaction exist, a new transaction is created.
Important: after each .flush()
call, a previously returned instance is invalidated and cannot be used. The property must be called again.
StatefulSource.flush
This method commit the state and flush the producer.
It ensures the state is published to the changelog topic and all messages are successfully delivered to Kafka.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None
Raises:
CheckpointProducerTimeout
: if any message fails to produce before the timeout
quixstreams.sources.core.csv
CSVSource
CSVSource.__init__
def __init__(path: Union[str, Path],
name: str,
key_extractor: Optional[Callable[[dict], Union[str,
bytes]]] = None,
timestamp_extractor: Optional[Callable[[dict], int]] = None,
delay: float = 0,
shutdown_timeout: float = 10,
dialect: str = "excel") -> None
A base CSV source that reads data from a CSV file and produces rows
to the Kafka topic in JSON format.
Arguments:
path
: a path to the CSV file.name
: a unique name for the Source. It is used as a part of the default topic name.key_extractor
: an optional callable to extract the message key from the row. It must return eitherstr
orbytes
. If empty, the Kafka messages will be produced without keys. Default -None
.timestamp_extractor
: an optional callable to extract the message timestamp from the row. It must return time in milliseconds asint
. If empty, the current epoch will be used. Default -None
delay
: an optional delay after producing each row for stream simulation. Default -0
.shutdown_timeout
: Time in second the application waits for the source to gracefully shut down.dialect
: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default -"excel"
.
quixstreams.sources.core.kafka.kafka
KafkaReplicatorSource
Source implementation that replicates a topic from a Kafka broker to your application broker.
Running multiple instances of this source is supported.
Example Snippet:
from quixstreams import Application
from quixstreams.sources.kafka import KafkaReplicatorSource
app = Application(
consumer_group="group",
)
source = KafkaReplicatorSource(
name="source-second-kafka",
app_config=app.config,
topic="second-kafka-topic",
broker_address="localhost:9092",
)
sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()
KafkaReplicatorSource.__init__
def __init__(
name: str,
app_config: "ApplicationConfig",
topic: str,
broker_address: Union[str, ConnectionConfig],
auto_offset_reset: Optional[AutoOffsetReset] = "latest",
consumer_extra_config: Optional[dict] = None,
consumer_poll_timeout: Optional[float] = None,
shutdown_timeout: float = 10,
on_consumer_error: ConsumerErrorCallback = default_on_consumer_error,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes") -> None
Arguments:
name
: The source unique name. It is used to generate the default topic name and consumer group name on the source broker. Running multiple instances ofKafkaReplicatorSource
with the same name connected to the same broker will make them share the same consumer group.app_config
: The configuration of the application. Used by the source to connect to the application kafka broker.topic
: The topic to replicate.broker_address
: The connection settings for the source Kafka.auto_offset_reset
: Consumerauto.offset.reset
setting. Default - Use the Applicationauto_offset_reset
setting.consumer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Default -None
consumer_poll_timeout
: timeout forRowConsumer.poll()
Default - Use the Applicationconsumer_poll_timeout
setting.shutdown_timeout
: Time in second the application waits for the source to gracefully shutdown.on_consumer_error
: Triggered when the sourceConsumer
fails to poll Kafka.value_deserializer
: The default topic value deserializer, used by StreamingDataframe connected to the source. Default -json
key_deserializer
: The default topic key deserializer, used by StreamingDataframe connected to the source. Default -json
quixstreams.sources.core.kafka.quix
QuixEnvironmentSource
Source implementation that replicates a topic from a Quix Cloud environment to your application broker. It can copy messages for development and testing without risking producing them back or affecting the consumer groups.
Running multiple instances of this source is supported.
Example Snippet:
from quixstreams import Application
from quixstreams.sources.kafka import QuixEnvironmentSource
app = Application(
consumer_group="group",
)
source = QuixEnvironmentSource(
name="source-quix",
app_config=app.config,
quix_workspace_id="WORKSPACE_ID",
quix_sdk_token="WORKSPACE_SDK_TOKEN",
topic="quix-source-topic",
)
sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()
QuixEnvironmentSource.__init__
def __init__(
name: str,
app_config: "ApplicationConfig",
topic: str,
quix_sdk_token: str,
quix_workspace_id: str,
quix_portal_api: Optional[str] = None,
auto_offset_reset: Optional[AutoOffsetReset] = None,
consumer_extra_config: Optional[dict] = None,
consumer_poll_timeout: Optional[float] = None,
shutdown_timeout: float = 10,
on_consumer_error: ConsumerErrorCallback = default_on_consumer_error,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes") -> None
Arguments:
quix_workspace_id
: The Quix workspace ID of the source environment.quix_sdk_token
: Quix cloud sdk token used to connect to the source environment.quix_portal_api
: The Quix portal API URL of the source environment. Default -Quix__Portal__Api
environment variable or Quix cloud production URL
For other parameters See quixstreams.sources.kafka.KafkaReplicatorSource
quixstreams.sources.community.file.file
FileFetcher
Serves individual files while downloading another in the background.
FileSource
Ingest a set of files from a desired origin into Kafka by iterating through the provided folder and processing all nested files within it.
Origins include a local filestore, AWS S3, or Microsoft Azure.
FileSource defaults to a local filestore (LocalOrigin) + JSON format.
Expects folder and file structures as generated by the related FileSink connector:
my_topics/
├── topic_a/
│ ├── 0/
│ │ ├── 0000.ext
│ │ └── 0011.ext
│ └── 1/
│ ├── 0003.ext
│ └── 0016.ext
└── topic_b/
└── etc...
Intended to be used with a single topic (ex: topic_a), but will recursively read from whatever entrypoint is passed to it.
File format structure depends on the file format.
See the .formats
and .compressions
modules to see what is supported.
Example Usage:
from quixstreams import Application
from quixstreams.sources.community.file import FileSource
from quixstreams.sources.community.file.origins import S3Origin
app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
origin = S3Origin(
bucket="<YOUR BUCKET>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
)
source = FileSource(
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!
if __name__ == "__main__":
app.run()
FileSource.__init__
def __init__(directory: Union[str, Path],
format: Union[Format, FormatName] = "json",
origin: Origin = LocalOrigin(),
compression: Optional[CompressionName] = None,
replay_speed: float = 1.0,
name: Optional[str] = None,
shutdown_timeout: float = 30)
Arguments:
directory
: a directory to recursively read through; it is recommended to provide the path to a given topic folder (ex:/path/to/topic_a
).format
: what format the message files are in (ex: json, parquet). Optionally, can provide aFormat
instance if more than compression is necessary to define (compression will then be ignored).origin
: an Origin type (defaults to reading local files).compression
: what compression is used on the given files, if any.replay_speed
: Produce the messages with this speed multiplier, which roughly reflects the time "delay" between the original message producing. Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall.name
: The name of the Source application (Default: last folder name).shutdown_timeout
: Time in seconds the application waits for the source to gracefully shutdown
FileSource.default_topic
Uses the file structure to generate the desired partition count for the
internal topic.
Returns:
the original default topic, with updated partition count
quixstreams.sources.community.file.compressions.gzip
quixstreams.sources.community.file.origins.azure
AzureFileOrigin
AzureFileOrigin.__init__
Arguments:
connection_string
: Azure client authentication string.container
: Azure container name.
AzureFileOrigin.get_folder_count
This is a simplified version of the recommended way to retrieve folder names based on the azure SDK docs examples.
quixstreams.sources.community.file.origins.local
quixstreams.sources.community.file.origins.s3
S3Origin
S3Origin.__init__
def __init__(
bucket: str,
region_name: Optional[str] = getenv("AWS_REGION"),
aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3"))
Configure IcebergSink to work with AWS Glue.
Arguments:
bucket
: The S3 bucket name only (ex: 'your-bucket').region_name
: The AWS region. NOTE: can alternatively set the AWS_REGION environment variableaws_access_key_id
: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variableaws_secret_access_key
: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variableendpoint_url
: the endpoint URL to use; only required for connecting to a locally hosted S3. NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable
quixstreams.sources.community.file.formats.json
JSONFormat
JSONFormat.__init__
Read a JSON-formatted file (along with decompressing it).
Arguments:
compression
: the compression type used on the fileloads
: A custom function to deserialize objects to the expected dict with {_key: str, _value: dict, _timestamp: int}.
quixstreams.sources.community.file.formats.parquet
quixstreams.sources.community.kinesis.kinesis
KinesisSource
NOTE: Requires pip install quixstreams[kinesis]
to work.
This source reads data from an Amazon Kinesis stream, dumping it to a
kafka topic using desired StreamingDataFrame
-based transformations.
Provides "at-least-once" guarantees.
The incoming message value will be in bytes, so transform in your SDF accordingly.
Example Usage:
from quixstreams import Application
from quixstreams.sources.community.kinesis import KinesisSource
kinesis = KinesisSource(
stream_name="<YOUR STREAM>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
auto_offset_reset="earliest", # start from the beginning of the stream (vs end)
)
app = Application(
broker_address="<YOUR BROKER INFO>",
consumer_group="<YOUR GROUP>",
)
sdf = app.dataframe(source=kinesis).print(metadata=True)
# YOUR LOGIC HERE!
if __name__ == "__main__":
app.run()
KinesisSource.__init__
def __init__(
stream_name: str,
aws_region: Optional[str] = getenv("AWS_REGION"),
aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
aws_endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_KINESIS"),
shutdown_timeout: float = 10,
auto_offset_reset: AutoOffsetResetType = "latest",
max_records_per_shard: int = 1000,
commit_interval: float = 5.0,
retry_backoff_secs: float = 5.0)
Arguments:
stream_name
: name of the desired Kinesis stream to consume.aws_region
: The AWS region. NOTE: can alternatively set the AWS_REGION environment variableaws_access_key_id
: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variableaws_secret_access_key
: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variableaws_endpoint_url
: the endpoint URL to use; only required for connecting to a locally hosted Kinesis. NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variableshutdown_timeout
:auto_offset_reset
: When no previous offset has been recorded, whether to start from the beginning ("earliest") or end ("latest") of the stream.max_records_per_shard
: During round-robin consumption, how many records to consume per shard (partition) per consume (NOT per-commit).commit_interval
: the time between commitsretry_backoff_secs
: how long to back off from doing HTTP calls for a shard when Kinesis consumer encounters handled/expected errors.
quixstreams.sources.community.pubsub.pubsub
PubSubSource
This source enables reading from a Google Cloud Pub/Sub topic, dumping it to a kafka topic using desired SDF-based transformations.
Provides "at-least-once" guarantees.
Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported.
The incoming message value will be in bytes, so transform in your SDF accordingly.
Example Usage:
from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ
source = PubSubSource(
# Suggested: pass JSON-formatted credentials from an environment variable.
service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
project_id="<project ID>",
topic_id="<topic ID>", # NOTE: NOT the full /x/y/z path!
subscription_id="<subscription ID>", # NOTE: NOT the full /x/y/z path!
create_subscription=True,
)
app = Application(
broker_address="localhost:9092",
auto_offset_reset="earliest",
consumer_group="gcp",
loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)
if __name__ == "__main__":
app.run()
PubSubSource.__init__
def __init__(project_id: str,
topic_id: str,
subscription_id: str,
service_account_json: Optional[str] = None,
commit_every: int = 100,
commit_interval: float = 5.0,
create_subscription: bool = False,
enable_message_ordering: bool = False,
shutdown_timeout: float = 10.0)
Arguments:
project_id
: a Google Cloud project ID.topic_id
: a Pub/Sub topic ID (NOT the full path).subscription_id
: a Pub/Sub subscription ID (NOT the full path).service_account_json
: a Google Cloud Credentials JSON as a string Can instead use environment variables (which have different behavior):- "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json
- "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub
commit_every
: max records allowed to be processed before committing.commit_interval
: max allowed elapsed time between commits.create_subscription
: whether to attempt to create a subscription at startup; if it already exists, it instead logs its details (DEBUG level).enable_message_ordering
: When creating a Pub/Sub subscription, whether to allow message ordering. NOTE: does NOT affect existing subscriptions!shutdown_timeout
: How long to wait for a graceful shutdown of the source.