Topics API
quixstreams.models.topics
quixstreams.models.topics.admin
TopicAdmin
For performing "admin"-level operations on a Kafka cluster, mostly around topics.
Primarily used to create and inspect topic configurations.
TopicAdmin.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
logger: logging.Logger = logger,
extra_config: Optional[Mapping] = None)
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.logger
: a Logger instance to attach librdkafka logging toextra_config
: optional configs (generally accepts producer configs)
TopicAdmin.list_topics
Get a list of topics and their metadata from a Kafka cluster
Arguments:
timeout
: response timeout (seconds); Default infinite (-1)
Returns:
a dict of topic names and their metadata objects
TopicAdmin.inspect_topics
A simplified way of getting the topic configurations of the provided topics
from the cluster (if they exist).
Arguments:
topic_names
: a list of topic namestimeout
: response timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
Returns:
a dict with topic names and their respective TopicConfig
TopicAdmin.create_topics
Create the given list of topics and confirm they are ready.
Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).
Arguments:
topics
: a list ofTopic
timeout
: creation acknowledge timeout (seconds)finalize_timeout
: topic finalization timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
quixstreams.models.topics.topic
TopicConfig
Represents all kafka-level configuration for a kafka topic.
Generally used by Topic and any topic creation procedures.
Topic
A definition of a Kafka topic.
Typically created with an app = quixstreams.app.Application()
instance via
app.topic()
, and used by quixstreams.dataframe.StreamingDataFrame
instance.
Topic.__init__
def __init__(
name: str,
config: Optional[TopicConfig] = None,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = BytesSerializer(),
timestamp_extractor: Optional[TimestampExtractor] = None)
Arguments:
name
: topic nameconfig
: topic configs viaTopicConfig
(creation/validation)value_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keystimestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Topic.as_newtopic
Converts Topic
s to NewTopic
s as required for Confluent's
AdminClient.create_topic()
.
Returns:
confluent_kafka NewTopic
s
Topic.row_serialize
Serialize Row to a Kafka message structure
Arguments:
row
: Row to serializekey
: message key to serialize
Returns:
KafkaMessage object with serialized values
Topic.row_deserialize
Deserialize incoming Kafka message to a Row.
Arguments:
message
: an object with interface ofconfluent_kafka.Message
Returns:
Row, list of Rows or None if the message is ignored.
quixstreams.models.topics.manager
affirm_ready_for_create
Validate a list of topics is ready for creation attempt
Arguments:
topics
: list ofTopic
s
TopicManager
The source of all topic management for a Quix Streams Application.
Intended only for internal use by Application.
To create a Topic, use Application.topic() or generate them directly.
TopicManager.__init__
def __init__(topic_admin: TopicAdmin,
consumer_group: str,
timeout: float = 30,
create_timeout: float = 60,
auto_create_topics: bool = True)
Arguments:
topic_admin
: anAdmin
instance (required for some functionality)consumer_group
: the consumer group (of theApplication
)timeout
: response timeout (seconds)create_timeout
: timeout for topic creation
TopicManager.changelog_topics
Note: Topic
s are the changelogs.
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.changelog_topics_list
Returns a list of changelog topics
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.non_changelog_topics
Returns a dict with normal and repartition topics
TopicManager.all_topics
Every registered topic name mapped to its respective Topic
.
returns: full topic dict, {topic_name: Topic}
TopicManager.topic_config
def topic_config(num_partitions: Optional[int] = None,
replication_factor: Optional[int] = None,
extra_config: Optional[dict] = None) -> TopicConfig
Convenience method for generating a TopicConfig
with default settings
Arguments:
num_partitions
: the number of topic partitionsreplication_factor
: the topic replication factorextra_config
: other optional configuration settings
Returns:
a TopicConfig object
TopicManager.topic
def topic(name: str,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = "bytes",
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = "bytes",
config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
A convenience method for generating a Topic
. Will use default config options
as dictated by the TopicManager.
Arguments:
name
: topic namevalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keysconfig
: optional topic configurations (for creation/validation)timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Returns:
Topic object with creation configs
TopicManager.register
Register an already generated :class:quixstreams.models.topics.Topic
to the topic manager.
The topic name and config can be updated by the topic manager.
Arguments:
topic
: The topic to register
TopicManager.repartition_topic
def repartition_topic(operation: str,
topic_name: str,
value_deserializer: Optional[DeserializerType] = "json",
key_deserializer: Optional[DeserializerType] = "json",
value_serializer: Optional[SerializerType] = "json",
key_serializer: Optional[SerializerType] = "json",
timeout: Optional[float] = None) -> Topic
Create an internal repartition topic.
Arguments:
operation
: name of the GroupBy operation (column name or user-defined).topic_name
: name of the topic the GroupBy is sourced from.value_deserializer
: a deserializer type for values; default - JSONkey_deserializer
: a deserializer type for keys; default - JSONvalue_serializer
: a serializer type for values; default - JSONkey_serializer
: a serializer type for keys; default - JSONtimeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.changelog_topic
def changelog_topic(topic_name: Optional[str],
store_name: str,
config: Optional[TopicConfig] = None,
timeout: Optional[float] = None) -> Topic
Performs all the logic necessary to generate a changelog topic based on an
optional "source topic" (aka input/consumed topic).
Its main goal is to ensure partition counts of the to-be generated changelog
match the source topic, and ensure the changelog topic is compacted. Also
enforces the serialization type. All Topic
objects generated with this are
stored on the TopicManager.
If source topic already exists, defers to the existing topic settings, else
uses the settings as defined by the Topic
(and its defaults) as generated
by the TopicManager
.
In general, users should NOT need this; an Application knows when/how to
generate changelog topics. To turn off changelogs, init an Application with
"use_changelog_topics"=False
.
Arguments:
topic_name
: name of consumed topic (app input topic)NOTE: normally contain any prefixes added by TopicManager.topic()
store_name
: name of the store this changelog belongs to (default, rolling10s, etc.)config
: the changelog topic configuration. Default totopic_name
configuration or TopicManager defaulttimeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.create_topics
def create_topics(topics: List[Topic],
timeout: Optional[float] = None,
create_timeout: Optional[float] = None)
Creates topics via an explicit list of provided Topics
.
Exists as a way to manually specify what topics to create; otherwise,
create_all_topics()
is generally simpler.
Arguments:
topics
: list ofTopic
stimeout
: creation acknowledge timeout (seconds); Default 30create_timeout
: topic finalization timeout (seconds); Default 60
TopicManager.create_all_topics
A convenience method to create all Topic objects stored on this TopicManager.
If auto_create_topics
is set to False no topic will be created.
Arguments:
timeout
: creation acknowledge timeout (seconds); Default 30create_timeout
: topic finalization timeout (seconds); Default 60
TopicManager.validate_all_topics
Validates all topics exist and changelogs have correct topic and rep factor.
Issues are pooled and raised as an Exception once inspections are complete.