Topics API
quixstreams.models.topics
quixstreams.models.topics.admin
TopicAdmin
For performing "admin"-level operations on a Kafka cluster, mostly around topics.
Primarily used to create and inspect topic configurations.
TopicAdmin.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
logger: logging.Logger = logger,
extra_config: Optional[Mapping] = None)
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.logger
: a Logger instance to attach librdkafka logging toextra_config
: optional configs (generally accepts producer configs)
TopicAdmin.list_topics
Get a list of topics and their metadata from a Kafka cluster
Arguments:
timeout
: response timeout (seconds); Default infinite (-1)
Returns:
a dict of topic names and their metadata objects
TopicAdmin.inspect_topics
A simplified way of getting the topic configurations of the provided topics
from the cluster (if they exist).
Arguments:
topic_names
: a list of topic namestimeout
: response timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
Returns:
a dict with topic names and their respective TopicConfig
TopicAdmin.create_topics
Create the given list of topics and confirm they are ready.
Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).
Arguments:
topics
: a list ofTopic
timeout
: creation acknowledge timeout (seconds)finalize_timeout
: topic finalization timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
quixstreams.models.topics.topic
TopicConfig
Represents all kafka-level configuration for a kafka topic.
Generally used by Topic and any topic creation procedures.
Topic
A definition of a Kafka topic.
Typically created with an app = quixstreams.app.Application()
instance via
app.topic()
, and used by quixstreams.dataframe.StreamingDataFrame
instance.
Topic.__init__
def __init__(
name: str,
create_config: Optional[TopicConfig] = None,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = BytesSerializer(),
timestamp_extractor: Optional[TimestampExtractor] = None)
Arguments:
name
: topic namecreate_config
: aTopicConfig
to create a new topic if it does not existvalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keystimestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Topic.create_config
A config to create the topic
Topic.broker_config
A topic config obtained from the Kafka broker
Topic.row_serialize
Serialize Row to a Kafka message structure
Arguments:
row
: Row to serializekey
: message key to serialize
Returns:
KafkaMessage object with serialized values
Topic.row_deserialize
Deserialize incoming Kafka message to a Row.
Arguments:
message
: an object with interface ofconfluent_kafka.Message
Returns:
Row, list of Rows or None if the message is ignored.
quixstreams.models.topics.manager
TopicManager
The source of all topic management for a Quix Streams Application.
Intended only for internal use by Application.
To create a Topic, use Application.topic() or generate them directly.
TopicManager.__init__
def __init__(topic_admin: TopicAdmin,
consumer_group: str,
timeout: float = 30,
create_timeout: float = 60,
auto_create_topics: bool = True)
Arguments:
topic_admin
: anAdmin
instance (required for some functionality)consumer_group
: the consumer group (of theApplication
)timeout
: response timeout (seconds)create_timeout
: timeout for topic creation
TopicManager.changelog_topics
Note: Topic
s are the changelogs.
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.changelog_topics_list
Returns a list of changelog topics
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.non_changelog_topics
Returns a dict with normal and repartition topics
TopicManager.all_topics
Every registered topic name mapped to its respective Topic
.
returns: full topic dict, {topic_name: Topic}
TopicManager.topic_config
def topic_config(num_partitions: Optional[int] = None,
replication_factor: Optional[int] = None,
extra_config: Optional[dict] = None) -> TopicConfig
Convenience method for generating a TopicConfig
with default settings
Arguments:
num_partitions
: the number of topic partitionsreplication_factor
: the topic replication factorextra_config
: other optional configuration settings
Returns:
a TopicConfig object
TopicManager.topic
def topic(name: str,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = "bytes",
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = "bytes",
create_config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
A convenience method for generating a Topic
. Will use default config options
as dictated by the TopicManager.
Arguments:
name
: topic namevalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keyscreate_config
: optional topic configurations (for creation/validation)timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Returns:
Topic object with creation configs
TopicManager.register
Register an already generated :class:quixstreams.models.topics.Topic
to the topic manager.
The topic name and config can be updated by the topic manager.
Arguments:
topic
: The topic to register
TopicManager.repartition_topic
def repartition_topic(operation: str,
topic_name: str,
value_deserializer: Optional[DeserializerType] = "json",
key_deserializer: Optional[DeserializerType] = "json",
value_serializer: Optional[SerializerType] = "json",
key_serializer: Optional[SerializerType] = "json",
timeout: Optional[float] = None) -> Topic
Create an internal repartition topic.
Arguments:
operation
: name of the GroupBy operation (column name or user-defined).topic_name
: name of the topic the GroupBy is sourced from.value_deserializer
: a deserializer type for values; default - JSONkey_deserializer
: a deserializer type for keys; default - JSONvalue_serializer
: a serializer type for values; default - JSONkey_serializer
: a serializer type for keys; default - JSONtimeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.changelog_topic
def changelog_topic(topic_name: Optional[str],
store_name: str,
config: Optional[TopicConfig] = None,
timeout: Optional[float] = None) -> Topic
Performs all the logic necessary to generate a changelog topic based on an
optional "source topic" (aka input/consumed topic).
Its main goal is to ensure partition counts of the to-be generated changelog
match the source topic, and ensure the changelog topic is compacted. Also
enforces the serialization type. All Topic
objects generated with this are
stored on the TopicManager.
If source topic already exists, defers to the existing topic settings, else
uses the settings as defined by the Topic
(and its defaults) as generated
by the TopicManager
.
In general, users should NOT need this; an Application knows when/how to
generate changelog topics. To turn off changelogs, init an Application with
"use_changelog_topics"=False
.
Arguments:
topic_name
: name of consumed topic (app input topic)NOTE: normally contain any prefixes added by TopicManager.topic()
store_name
: name of the store this changelog belongs to (default, rolling10s, etc.)config
: the changelog topic configuration. Default totopic_name
configuration or TopicManager defaulttimeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.validate_all_topics
Validates that all topics have ".broker_config" set and changelog topics have correct numbers of partitions and replication factors.
Issues are pooled and raised as an Exception once inspections are complete.