Kafka Producer & Consumer API
quixstreams.kafka.producer
Producer
Producer.__init__
def __init__(broker_address: str,
partitioner: Partitioner = "murmur2",
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Producer
.
It initializes confluent_kafka.Producer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Producer
.partitioner
: A function to be used to determine the outgoing message partition. Available values: "random", "consistent_random", "murmur2", "murmur2_random", "fnv1a", "fnv1a_random" Default - "murmur2".extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is. Note: values passed as arguments override values inextra_config
.
Producer.produce
def produce(topic: str,
value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3)
Produce message to topic.
It also polls Kafka for callbacks before producing in order to minimize
the probability of BufferError
.
If BufferError
still happens, the method will poll Kafka with timeout
to free up the buffer and try again.
Arguments:
topic
: topic namevalue
: message valuekey
: message keyheaders
: message headerspartition
: topic partitiontimestamp
: message timestamppoll_timeout
: timeout forpoll()
call in case ofBufferError
buffer_error_max_tries
: max retries forBufferError
. Pass0
to not retry afterBufferError
.
Producer.poll
Polls the producer for events and calls on_delivery
callbacks.
Arguments:
timeout
: poll timeout seconds; Default: 0 (unlike others)NOTE: -1 will hang indefinitely if there are no messages to acknowledge
Producer.flush
Wait for all messages in the Producer queue to be delivered.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None or -1 is infinite. Default: None
Returns:
number of messages remaining to flush
quixstreams.kafka.consumer
Consumer
Consumer.__init__
def __init__(broker_address: str,
consumer_group: Optional[str],
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
assignment_strategy: AssignmentStrategy = "range",
on_commit: Optional[Callable[
[Optional[KafkaError], List[TopicPartition]], None]] = None,
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Consumer
.
It initializes confluent_kafka.Consumer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Consumer
.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:- "earliest" - automatically reset the offset to the smallest offset
- "latest" - automatically reset the offset to the largest offset
- "error" - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages (used for testing)
auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.assignment_strategy
: The name of a partition assignment strategy. Available values: "range", "roundrobin", "cooperative-sticky".on_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.
Consumer.poll
Consumes a single message, calls callbacks and returns events.
The application must check the returned class:Message
object's func:Message.error()
method to distinguish between proper
messages (error() returns None), or an event or error.
Note: Callbacks may be called from this method, such as
on_assign
, on_revoke
, et al.
Arguments:
timeout
(float
): Maximum time in seconds to block waiting for message, event or callback. None or -1 is infinite. Default: None.
Raises:
None
: RuntimeError if called on a closed consumer
Returns:
A Message object or None on timeout
Consumer.subscribe
def subscribe(topics: List[str],
on_assign: Optional[RebalancingCallback] = None,
on_revoke: Optional[RebalancingCallback] = None,
on_lost: Optional[RebalancingCallback] = None)
Set subscription to supplied list of topics
This replaces a previous subscription.
Arguments:
topics
(list(str)
): List of topics (strings) to subscribe to.on_assign
(callable
): callback to provide handling of customized offsets on completion of a successful partition re-assignment.on_revoke
(callable
): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.on_lost
(callable
): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
Raises:
KafkaException
:None
: RuntimeError if called on a closed consumer .. py:function:: on_assign(consumer, partitions) .. py:function:: on_revoke(consumer, partitions) .. py:function:: on_lost(consumer, partitions)
:param Consumer consumer: Consumer instance. :param list(TopicPartition) partitions: Absolute list of partitions being assigned or revoked.
Consumer.unsubscribe
Remove current subscription.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.store_offsets
def store_offsets(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None)
.. py:function:: store_offsets([message=None], [offsets=None])
Store offsets for a message or a list of offsets.
message
and offsets
are mutually exclusive. The stored offsets
will be committed according to 'auto.commit.interval.ms' or manual
offset-less commit
.
Note that 'enable.auto.offset.store' must be set to False when using this API.
Arguments:
message
(confluent_kafka.Message
): Store message's offset+1.offsets
(list(TopicPartition)
): List of topic+partitions+offsets to store.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.commit
def commit(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None,
asynchronous: bool = True) -> Optional[List[TopicPartition]]
Commit a message or a list of offsets.
The message
and offsets
parameters are mutually exclusive.
If neither is set, the current partition assignment's offsets are used instead.
Use this method to commit offsets if you have 'enable.auto.commit' set to False.
Arguments:
message
(confluent_kafka.Message
): Commit the message's offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.offsets
(list(TopicPartition)
): List of topic+partitions+offsets to commit.asynchronous
(bool
): If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.committed
def committed(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
.. py:function:: committed(partitions, [timeout=None])
Retrieve committed offsets for the specified partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to query for stored offsets.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Default: None
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of topic+partitions with offset and possibly error set.
Consumer.get_watermark_offsets
def get_watermark_offsets(partition: TopicPartition,
timeout: Optional[float] = None,
cached: bool = False) -> Tuple[int, int]
Retrieve low and high offsets for the specified partition.
Arguments:
partition
(TopicPartition
): Topic+partition to return offsets for.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Ignored if cached=True. Default: Nonecached
(bool
): Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
tuple(int,int)
: Tuple of (low,high) on success or None on timeout.
The high offset is the offset of the last message + 1.
Consumer.list_topics
.. py:function:: list_topics([topic=None], [timeout=-1])
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Arguments:
topic
(str
): If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.timeout
(float
): The maximum response time before timing out None or -1 is infinite. Default: None
Raises:
None
: KafkaException
Consumer.memberid
Return this client's broker-assigned group member id.
The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.
:returns: Member id string or None :rtype: string :raises: RuntimeError if called on a closed consumer
Consumer.offsets_for_times
def offsets_for_times(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
:param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field. :param float timeout: The maximum response time before timing out. None or -1 is infinite. Default: None :returns: List of topic+partition with offset field set and possibly error set :rtype: list(TopicPartition) :raises: KafkaException :raises: RuntimeError if called on a closed consumer
Consumer.pause
Pause consumption for the provided list of partitions.
Paused partitions must be tracked manually.
Does NOT affect the result of Consumer.assignment().
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to pause.
Raises:
None
: KafkaException
Consumer.resume
.. py:function:: resume(partitions)
Resume consumption for the provided list of partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to resume.
Raises:
None
: KafkaException
Consumer.position
Retrieve current positions (offsets) for the specified partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of topic+partitions with offset and possibly error set.
Consumer.seek
Set consume position for partition to offset.
The offset may be an absolute (>=0) or a
logical offset (const:OFFSET_BEGINNING
et.al).
seek() may only be used to update the consume offset of an
actively consumed partition (i.e., after const:assign()
),
to set the starting offset of partition not being consumed instead
pass the offset in an assign()
call.
Arguments:
partition
(TopicPartition
): Topic+partition+offset to seek to.
Raises:
None
: KafkaException
Consumer.assignment
Returns the current partition assignment.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of assigned topic+partitions.
Consumer.set_sasl_credentials
Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
Consumer.incremental_assign
Assign new partitions.
Can be called outside the Consumer
on_assign
callback (multiple times).
Partitions immediately show on Consumer.assignment()
.
Any additional partitions besides the ones passed during the Consumer
on_assign
callback will NOT be associated with the consumer group.
Consumer.incremental_unassign
Revoke partitions.
Can be called outside an on_revoke callback.
Consumer.close
Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming.
- Commits offsets, unless the consumer property 'enable.auto.commit' is set to False.
- Leaves the consumer group.
Registered callbacks may be called from this method,
see poll()
for more info.