Skip to content

Topics API

quixstreams.models.topics

quixstreams.models.topics.admin



convert_topic_list

def convert_topic_list(topics: List[Topic]) -> List[ConfluentTopic]

[VIEW SOURCE]

Converts Topics to ConfluentTopics as required for Confluent's

AdminClient.create_topic().


Arguments:

  • topics: list of Topics


Returns:

list of confluent_kafka ConfluentTopics

TopicAdmin

class TopicAdmin()

[VIEW SOURCE]

For performing "admin"-level operations on a Kafka cluster, mostly around topics.

Primarily used to create and inspect topic configurations.



TopicAdmin.__init__

def __init__(broker_address: Union[str, ConnectionConfig],
             logger: logging.Logger = logger,
             extra_config: Optional[Mapping] = None)

[VIEW SOURCE]


Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • logger: a Logger instance to attach librdkafka logging to
  • extra_config: optional configs (generally accepts producer configs)



TopicAdmin.list_topics

def list_topics(timeout: float = -1) -> Dict[str, ConfluentTopicMetadata]

[VIEW SOURCE]

Get a list of topics and their metadata from a Kafka cluster


Arguments:

  • timeout: response timeout (seconds); Default infinite (-1)


Returns:

a dict of topic names and their metadata objects



TopicAdmin.inspect_topics

def inspect_topics(topic_names: List[str],
                   timeout: float = 30) -> Dict[str, Optional[TopicConfig]]

[VIEW SOURCE]

A simplified way of getting the topic configurations of the provided topics

from the cluster (if they exist).


Arguments:

  • topic_names: a list of topic names
  • timeout: response timeout (seconds)

    NOTE: timeout must be >0 here (expects non-neg, and 0 != inf).


Returns:

a dict with topic names and their respective TopicConfig



TopicAdmin.create_topics

def create_topics(topics: List[Topic],
                  timeout: float = 30,
                  finalize_timeout: float = 60)

[VIEW SOURCE]

Create the given list of topics and confirm they are ready.

Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).


Arguments:

  • topics: a list of Topic
  • timeout: creation acknowledge timeout (seconds)
  • finalize_timeout: topic finalization timeout (seconds)

    NOTE: timeout must be >0 here (expects non-neg, and 0 != inf).

quixstreams.models.topics.topic

TopicConfig

@dataclasses.dataclass(eq=True)
class TopicConfig()

[VIEW SOURCE]

Represents all kafka-level configuration for a kafka topic.

Generally used by Topic and any topic creation procedures.

Topic

class Topic()

[VIEW SOURCE]

A definition of a Kafka topic.

Typically created with an app = quixstreams.app.Application() instance via app.topic(), and used by quixstreams.dataframe.StreamingDataFrame instance.



Topic.__init__

def __init__(
        name: str,
        config: Optional[TopicConfig] = None,
        value_deserializer: Optional[DeserializerType] = None,
        key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
        value_serializer: Optional[SerializerType] = None,
        key_serializer: Optional[SerializerType] = BytesSerializer(),
        timestamp_extractor: Optional[TimestampExtractor] = None)

[VIEW SOURCE]


Arguments:

  • name: topic name
  • config: topic configs via TopicConfig (creation/validation)
  • value_deserializer: a deserializer type for values
  • key_deserializer: a deserializer type for keys
  • value_serializer: a serializer type for values
  • key_serializer: a serializer type for keys
  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message.



Topic.row_serialize

def row_serialize(row: Row, key: Any) -> KafkaMessage

[VIEW SOURCE]

Serialize Row to a Kafka message structure


Arguments:

  • row: Row to serialize
  • key: message key to serialize


Returns:

KafkaMessage object with serialized values



Topic.row_deserialize

def row_deserialize(
        message: ConfluentKafkaMessageProto) -> Union[Row, List[Row], None]

[VIEW SOURCE]

Deserialize incoming Kafka message to a Row.


Arguments:

  • message: an object with interface of confluent_kafka.Message


Returns:

Row, list of Rows or None if the message is ignored.

quixstreams.models.topics.manager



affirm_ready_for_create

def affirm_ready_for_create(topics: List[Topic])

[VIEW SOURCE]

Validate a list of topics is ready for creation attempt


Arguments:

  • topics: list of Topics

TopicManager

class TopicManager()

[VIEW SOURCE]

The source of all topic management for a Quix Streams Application.

Intended only for internal use by Application.

To create a Topic, use Application.topic() or generate them directly.



TopicManager.__init__

def __init__(topic_admin: TopicAdmin,
             consumer_group: str,
             timeout: float = 30,
             create_timeout: float = 60,
             auto_create_topics: bool = True)

[VIEW SOURCE]


Arguments:

  • topic_admin: an Admin instance (required for some functionality)
  • consumer_group: the consumer group (of the Application)
  • timeout: response timeout (seconds)
  • create_timeout: timeout for topic creation



TopicManager.changelog_topics

@property
def changelog_topics() -> Dict[str, Dict[str, Topic]]

[VIEW SOURCE]

Note: Topics are the changelogs.

returns: the changelog topic dict, {topic_name: {suffix: Topic}}



TopicManager.all_topics

@property
def all_topics() -> Dict[str, Topic]

[VIEW SOURCE]

Every registered topic name mapped to its respective Topic.

returns: full topic dict, {topic_name: Topic}



TopicManager.topic_config

def topic_config(num_partitions: Optional[int] = None,
                 replication_factor: Optional[int] = None,
                 extra_config: Optional[dict] = None) -> TopicConfig

[VIEW SOURCE]

Convenience method for generating a TopicConfig with default settings


Arguments:

  • num_partitions: the number of topic partitions
  • replication_factor: the topic replication factor
  • extra_config: other optional configuration settings


Returns:

a TopicConfig object



TopicManager.topic

def topic(name: str,
          value_deserializer: Optional[DeserializerType] = None,
          key_deserializer: Optional[DeserializerType] = "bytes",
          value_serializer: Optional[SerializerType] = None,
          key_serializer: Optional[SerializerType] = "bytes",
          config: Optional[TopicConfig] = None,
          timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic

[VIEW SOURCE]

A convenience method for generating a Topic. Will use default config options

as dictated by the TopicManager.


Arguments:

  • name: topic name
  • value_deserializer: a deserializer type for values
  • key_deserializer: a deserializer type for keys
  • value_serializer: a serializer type for values
  • key_serializer: a serializer type for keys
  • config: optional topic configurations (for creation/validation)
  • timestamp_extractor: a callable that returns a timestamp in milliseconds from a deserialized message.


Returns:

Topic object with creation configs



TopicManager.register

def register(topic: Topic) -> Topic

[VIEW SOURCE]

Register an already generated :class:quixstreams.models.topics.Topic to the topic manager.

The topic name and config can be updated by the topic manager.


Arguments:

  • topic: The topic to register



TopicManager.repartition_topic

def repartition_topic(operation: str,
                      topic_name: str,
                      value_deserializer: Optional[DeserializerType] = "json",
                      key_deserializer: Optional[DeserializerType] = "json",
                      value_serializer: Optional[SerializerType] = "json",
                      key_serializer: Optional[SerializerType] = "json",
                      timeout: Optional[float] = None) -> Topic

[VIEW SOURCE]

Create an internal repartition topic.


Arguments:

  • operation: name of the GroupBy operation (column name or user-defined).
  • topic_name: name of the topic the GroupBy is sourced from.
  • value_deserializer: a deserializer type for values; default - JSON
  • key_deserializer: a deserializer type for keys; default - JSON
  • value_serializer: a serializer type for values; default - JSON
  • key_serializer: a serializer type for keys; default - JSON
  • timeout: config lookup timeout (seconds); Default 30


Returns:

Topic object (which is also stored on the TopicManager)



TopicManager.changelog_topic

def changelog_topic(topic_name: str,
                    store_name: str,
                    timeout: Optional[float] = None) -> Topic

[VIEW SOURCE]

Performs all the logic necessary to generate a changelog topic based on a

"source topic" (aka input/consumed topic).

Its main goal is to ensure partition counts of the to-be generated changelog match the source topic, and ensure the changelog topic is compacted. Also enforces the serialization type. All Topic objects generated with this are stored on the TopicManager.

If source topic already exists, defers to the existing topic settings, else uses the settings as defined by the Topic (and its defaults) as generated by the TopicManager.

In general, users should NOT need this; an Application knows when/how to generate changelog topics. To turn off changelogs, init an Application with "use_changelog_topics"=False.


Arguments:

  • topic_name: name of consumed topic (app input topic)

    NOTE: normally contain any prefixes added by TopicManager.topic()

  • store_name: name of the store this changelog belongs to (default, rolling10s, etc.)
  • timeout: config lookup timeout (seconds); Default 30


Returns:

Topic object (which is also stored on the TopicManager)



TopicManager.create_topics

def create_topics(topics: List[Topic],
                  timeout: Optional[float] = None,
                  create_timeout: Optional[float] = None)

[VIEW SOURCE]

Creates topics via an explicit list of provided Topics.

Exists as a way to manually specify what topics to create; otherwise, create_all_topics() is generally simpler.


Arguments:

  • topics: list of Topics
  • timeout: creation acknowledge timeout (seconds); Default 30
  • create_timeout: topic finalization timeout (seconds); Default 60



TopicManager.create_all_topics

def create_all_topics(timeout: Optional[float] = None,
                      create_timeout: Optional[float] = None)

[VIEW SOURCE]

A convenience method to create all Topic objects stored on this TopicManager.

If auto_create_topics is set to False no topic will be created.


Arguments:

  • timeout: creation acknowledge timeout (seconds); Default 30
  • create_timeout: topic finalization timeout (seconds); Default 60



TopicManager.validate_all_topics

def validate_all_topics(timeout: Optional[float] = None)

[VIEW SOURCE]

Validates all topics exist and changelogs have correct topic and rep factor.

Issues are pooled and raised as an Exception once inspections are complete.

quixstreams.models.topics.exceptions