Skip to content

Application

quixstreams.app

Application

class Application()

[VIEW SOURCE]

The main Application class.

Typically, the primary object needed to get a kafka application up and running.

Most functionality is explained the various methods, except for "column assignment".


What it Does:

  • During user setup:
    • Provides defaults or helper methods for commonly needed objects
    • For Quix Platform Users: Configures the app for it (see Application.Quix())
  • When executed via .run() (after setup):
    • Initializes Topics and StreamingDataFrames
    • Facilitates processing of Kafka messages with a StreamingDataFrame
    • Handles all Kafka client consumer/producer responsibilities.


Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value))

app.run(dataframe=df)



Application.__init__

def __init__(broker_address: str,
             consumer_group: str,
             auto_offset_reset: AutoOffsetReset = "latest",
             auto_commit_enable: bool = True,
             partitioner: Partitioner = "murmur2",
             consumer_extra_config: Optional[dict] = None,
             producer_extra_config: Optional[dict] = None,
             state_dir: str = "state",
             rocksdb_options: Optional[RocksDBOptionsType] = None,
             on_consumer_error: Optional[ConsumerErrorCallback] = None,
             on_processing_error: Optional[ProcessingErrorCallback] = None,
             on_producer_error: Optional[ProducerErrorCallback] = None,
             on_message_processed: Optional[MessageProcessedCallback] = None,
             consumer_poll_timeout: float = 1.0,
             producer_poll_timeout: float = 0.0,
             loglevel: Optional[LogLevel] = "INFO",
             auto_create_topics: bool = True,
             use_changelog_topics: bool = True,
             topic_manager: Optional[TopicManager] = None)

[VIEW SOURCE]


Arguments:

  • broker_address: Kafka broker host and port in format <host>:<port>. Passed as bootstrap.servers to confluent_kafka.Consumer.
  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer
  • auto_offset_reset: Consumer auto.offset.reset setting
  • auto_commit_enable: If true, periodically commit offset of the last message handed to the application. Default - True.
  • partitioner: A function to be used to determine the outgoing message partition.
  • consumer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is.
  • producer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is.
  • state_dir: path to the application state directory. Default - ".state".
  • rocksdb_options: RocksDB options. If None, the default options will be used.
  • consumer_poll_timeout: timeout for RowConsumer.poll(). Default - 1.0s
  • producer_poll_timeout: timeout for RowProducer.poll(). Default - 0s.
  • on_message_processed: a callback triggered when message is successfully processed.
  • loglevel: a log level for "quixstreams" logger. Should be a string or None. If None is passed, no logging will be configured. You may pass None and configure "quixstreams" logger externally using logging library. Default - "INFO".
  • auto_create_topics: Create all Topics made via Application.topic() Default - True
  • use_changelog_topics: Use changelog topics to back stateful operations Default - True
  • topic_manager: A TopicManager instance Error Handlers

To handle errors, Application accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returns True, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop. - on_consumer_error: triggered when internal RowConsumer fails to poll Kafka or cannot deserialize a message. - on_processing_error: triggered when exception is raised within StreamingDataFrame.process(). - on_producer_error: triggered when RowProducer fails to serialize or to produce a message to Kafka.



Application.Quix

@classmethod
def Quix(cls,
         consumer_group: str,
         auto_offset_reset: AutoOffsetReset = "latest",
         auto_commit_enable: bool = True,
         partitioner: Partitioner = "murmur2",
         consumer_extra_config: Optional[dict] = None,
         producer_extra_config: Optional[dict] = None,
         state_dir: str = "state",
         rocksdb_options: Optional[RocksDBOptionsType] = None,
         on_consumer_error: Optional[ConsumerErrorCallback] = None,
         on_processing_error: Optional[ProcessingErrorCallback] = None,
         on_producer_error: Optional[ProducerErrorCallback] = None,
         on_message_processed: Optional[MessageProcessedCallback] = None,
         consumer_poll_timeout: float = 1.0,
         producer_poll_timeout: float = 0.0,
         loglevel: Optional[LogLevel] = "INFO",
         quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
         auto_create_topics: bool = True,
         use_changelog_topics: bool = True,
         topic_manager: Optional[QuixTopicManager] = None) -> Self

[VIEW SOURCE]

Initialize an Application to work with Quix platform,

assuming environment is properly configured (by default in the platform).

It takes the credentials from the environment and configures consumer and producer to properly connect to the Quix platform.

NOTE: Quix platform requires consumer_group and topic names to be prefixed with workspace id. If the application is created via Application.Quix(), the real consumer group will be <workspace_id>-<consumer_group>, and the real topic names will be <workspace_id>-<topic_name>.


Example Snippet:

from quixstreams import Application

# Set up an `app = Application.Quix` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything. Also shows off how to
# use the quix-specific serializers and deserializers.

app = Application.Quix()
input_topic = app.topic("topic-in", value_deserializer="quix")
output_topic = app.topic("topic-out", value_serializer="quix_timeseries")
df = app.dataframe(topic_in)
df = df.to_topic(output_topic)

app.run(dataframe=df)


Arguments:

  • consumer_group: Kafka consumer group. Passed as group.id to confluent_kafka.Consumer.

    NOTE: The consumer group will be prefixed by Quix workspace id.

  • auto_offset_reset: Consumer auto.offset.reset setting
  • auto_commit_enable: If true, periodically commit offset of the last message handed to the application. Default - True.
  • partitioner: A function to be used to determine the outgoing message partition.
  • consumer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is.
  • producer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is.
  • state_dir: path to the application state directory. Default - ".state".
  • rocksdb_options: RocksDB options. If None, the default options will be used.
  • consumer_poll_timeout: timeout for RowConsumer.poll(). Default - 1.0s
  • producer_poll_timeout: timeout for RowProducer.poll(). Default - 0s.
  • on_message_processed: a callback triggered when message is successfully processed.
  • loglevel: a log level for "quixstreams" logger. Should be a string or None. If None is passed, no logging will be configured. You may pass None and configure "quixstreams" logger externally using logging library. Default - "INFO".
  • auto_create_topics: Create all Topics made via Application.topic() Default - True
  • use_changelog_topics: Use changelog topics to back stateful operations Default - True
  • topic_manager: A QuixTopicManager instance Error Handlers

To handle errors, Application accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returns True, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop. - on_consumer_error: triggered when internal RowConsumer fails to poll Kafka or cannot deserialize a message. - on_processing_error: triggered when exception is raised within StreamingDataFrame.process(). - on_producer_error: triggered when RowProducer fails to serialize or to produce a message to Kafka.

Quix-specific Parameters - quix_config_builder: instance of QuixKafkaConfigsBuilder to be used instead of the default one.


Returns:

Application object



Application.topic

def topic(name: str,
          value_deserializer: DeserializerType = "json",
          key_deserializer: DeserializerType = "bytes",
          value_serializer: SerializerType = "json",
          key_serializer: SerializerType = "bytes",
          config: Optional[TopicConfig] = None,
          timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic

[VIEW SOURCE]

Create a topic definition.

Allows you to specify serialization that should be used when consuming/producing to the topic in the form of a string name (i.e. "json" for JSON) or a serialization class instance directly, like JSONSerializer().


Example Snippet:

from quixstreams import Application

# Specify an input and output topic for a `StreamingDataFrame` instance,
# where the output topic requires adjusting the key serializer.

app = Application()
input_topic = app.topic("input-topic", value_deserializer="json")
output_topic = app.topic(
    "output-topic", key_serializer="str", value_serializer=JSONSerializer()
)
sdf = app.dataframe(input_topic)
sdf.to_topic(output_topic)


Arguments:

  • name: topic name

    NOTE: If the application is created via Quix.Application(), the topic name will be prefixed by Quix workspace id, and it will be <workspace_id>-<name>

  • value_deserializer: a deserializer type for values; default="json"
  • key_deserializer: a deserializer type for keys; default="bytes"
  • value_serializer: a serializer type for values; default="json"
  • key_serializer: a serializer type for keys; default="bytes"
  • config: optional topic configurations (for creation/validation)

    NOTE: will not create without Application's auto_create_topics set to True (is True by default)

  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message. Default - None.


Example Snippet:

app = Application(...)


def custom_ts_extractor(
    value: Any,
    headers: Optional[List[Tuple[str, bytes]]],
    timestamp: float,
    timestamp_type: TimestampType,
) -> int:
    return value["timestamp"]

topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)


Returns:

Topic object



Application.dataframe

def dataframe(topic: Topic) -> StreamingDataFrame

[VIEW SOURCE]

A simple helper method that generates a StreamingDataFrame, which is used

to define your message processing pipeline.

See :class:quixstreams.dataframe.StreamingDataFrame for more details.


Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)

app.run(dataframe=df)


Arguments:

  • topic: a quixstreams.models.Topic instance to be used as an input topic.


Returns:

StreamingDataFrame object



Application.stop

def stop()

[VIEW SOURCE]

Stop the internal poll loop and the message processing.

Only necessary when manually managing the lifecycle of the Application ( likely through some sort of threading).

To otherwise stop an application, either send a SIGTERM to the process (like Kubernetes does) or perform a typical KeyboardInterrupt (Ctrl+C).



Application.get_producer

def get_producer() -> Producer

[VIEW SOURCE]

Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application.

It's useful for producing data to Kafka outside the standard Application processing flow, (e.g. to produce test data into a topic). Using this within the StreamingDataFrame functions is not recommended, as it creates a new Producer instance each time, which is not optimized for repeated use in a streaming pipeline.


Example Snippet:

from quixstreams import Application

app = Application.Quix(...)
topic = app.topic("input")

with app.get_producer() as producer:
    for i in range(100):
        producer.produce(topic=topic.name, key=b"key", value=b"value")



Application.get_consumer

def get_consumer() -> Consumer

[VIEW SOURCE]

Create and return a pre-configured Consumer instance. The Consumer is initialized with params passed to Application.

It's useful for consuming data from Kafka outside the standard Application processing flow. (e.g. to consume test data from a topic). Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance each time, which is not optimized for repeated use in a streaming pipeline.

Note: By default this consumer does not autocommit consumed offsets to allow exactly-once processing. To store the offset call store_offsets() after processing a message. If autocommit is necessary set enable.auto.offset.store to True in the consumer config when creating the app.


Example Snippet:

from quixstreams import Application

app = Application.Quix(...)
topic = app.topic("input")

with app.get_consumer() as consumer:
    consumer.subscribe([topic.name])
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is not None:
            # Process message
            # Optionally commit the offset
            # consumer.store_offsets(msg)



Application.clear_state

def clear_state()

[VIEW SOURCE]

Clear the state of the application.



Application.run

def run(dataframe: StreamingDataFrame)

[VIEW SOURCE]

Start processing data from Kafka using provided StreamingDataFrame

One started, can be safely terminated with a SIGTERM signal (like Kubernetes does) or a typical KeyboardInterrupt (Ctrl+C).


Example Snippet:

from quixstreams import Application

# Set up an `app = Application` and  `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.

app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)

app.run(dataframe=df)


Arguments:

  • dataframe: instance of StreamingDataFrame

quixstreams.state.types

State

class State(Protocol)

[VIEW SOURCE]

Primary interface for working with key-value state data from StreamingDataFrame



State.get

def get(key: Any, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



State.set

def set(key: Any, value: Any)

[VIEW SOURCE]

Set value for the key.


Arguments:

  • key: key
  • value: value



State.delete

def delete(key: Any)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.


Arguments:

  • key: key



State.exists

def exists(key: Any) -> bool

[VIEW SOURCE]

Check if the key exists in state.


Arguments:

  • key: key


Returns:

True if key exists, False otherwise