Kafka Producer & Consumer API
quixstreams.kafka.producer
Producer
Producer.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
logger: logging.Logger = logger,
error_callback: Callable[[KafkaError], None] = _default_error_cb,
extra_config: Optional[dict] = None,
flush_timeout: Optional[float] = None)
A wrapper around confluent_kafka.Producer
.
It initializes confluent_kafka.Producer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.logger
: a Logger instance to attach librdkafka logging toerror_callback
: callback used for producer errorsextra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is. Note: values passed as arguments override values inextra_config
.flush_timeout
: The time the producer is waiting for all messages to be delivered.
Producer.produce
def produce(topic: str,
value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3,
on_delivery: Optional[DeliveryCallback] = None)
Produce a message to a topic.
It also polls Kafka for callbacks before producing to minimize
the probability of BufferError
.
If BufferError
still happens, the method will poll Kafka with timeout
to free up the buffer and try again.
Arguments:
topic
: topic namevalue
: message valuekey
: message keyheaders
: message headerspartition
: topic partitiontimestamp
: message timestamppoll_timeout
: timeout forpoll()
call in case ofBufferError
buffer_error_max_tries
: max retries forBufferError
. Pass0
to not retry afterBufferError
.on_delivery
: the delivery callback to be triggered onpoll()
for the produced message.
Producer.poll
Polls the producer for events and calls on_delivery
callbacks.
Arguments:
timeout
: poll timeout seconds; Default: 0 (unlike others)NOTE: -1 will hang indefinitely if there are no messages to acknowledge
Producer.flush
Wait for all messages in the Producer queue to be delivered.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None
Returns:
number of messages remaining to flush
TransactionalProducer
A separate producer class used only internally for transactions (transactions are only needed when using a consumer).
quixstreams.kafka.consumer
BaseConsumer
BaseConsumer.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
consumer_group: Optional[str],
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
logger: logging.Logger = logger,
error_callback: Callable[[KafkaError], None] = _default_error_cb,
on_commit: Optional[Callable[
[Optional[KafkaError], List[TopicPartition]], None]] = None,
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Consumer
.
It initializes confluent_kafka.Consumer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:
"earliest" - automatically reset the offset to the smallest offset
"latest" - automatically reset the offset to the largest offset
"error" - trigger an error (ERR__AUTO_OFFSET_RESET
) which is retrieved by consuming messages (used for testing)auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.logger
: a Logger instance to attach librdkafka logging toerror_callback
: callback used for consumer errorson_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.
BaseConsumer.poll
Consumes a single message, calls callbacks and returns events.
The application must check the returned class:Message
object's func:Message.error()
method to distinguish between proper
messages (error() returns None), or an event or error.
Note: a RebalancingCallback
may be called from this method (
on_assign
, on_revoke
, or on_lost
).
Arguments:
timeout
(float
): Maximum time in seconds to block waiting for message, event or callback. None or -1 is infinite. Default: None.
Raises:
RuntimeError
: if called on a closed consumer
Returns:
Optional[Message]
: A Message
object or None
on timeout
BaseConsumer.unsubscribe
Remove current subscription.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
BaseConsumer.store_offsets
def store_offsets(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None)
Store offsets for a message or a list of offsets.
message
and offsets
are mutually exclusive. The stored offsets
will be committed according to 'auto.commit.interval.ms' or manual
offset-less commit
.
Note that 'enable.auto.offset.store' must be set to False when using this API.
Arguments:
message
(confluent_kafka.Message
): Store message's offset+1.offsets
(List[TopicPartition]
): List of topic+partitions+offsets to store.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
BaseConsumer.commit
def commit(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None,
asynchronous: bool = True) -> Optional[List[TopicPartition]]
Commit a message or a list of offsets.
The message
and offsets
parameters are mutually exclusive.
If neither is set, the current partition assignment's offsets are used instead.
Use this method to commit offsets if you have 'enable.auto.commit' set to False.
Arguments:
message
(Message
): Commit the message's offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.offsets
(List[TopicPartition]
): List of topic+partitions+offsets to commit.asynchronous
(bool
): If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
BaseConsumer.committed
def committed(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Retrieve committed offsets for the specified partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to query for stored offsets.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partitions with offset and possibly error set.
BaseConsumer.get_watermark_offsets
def get_watermark_offsets(partition: TopicPartition,
timeout: Optional[float] = None,
cached: bool = False) -> Tuple[int, int]
Retrieve low and high offsets for the specified partition.
Arguments:
partition
(TopicPartition
): Topic+partition to return offsets for.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Ignored if cached=True. Default: Nonecached
(bool
): Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
Tuple[int, int]
: Tuple of (low,high) on success or None on timeout.
The high offset is the offset of the last message + 1.
BaseConsumer.list_topics
Request metadata from the cluster.
This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Arguments:
topic
(str
): If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.timeout
(float
): The maximum response time before timing out None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occurs
BaseConsumer.memberid
Return this client's broker-assigned group member id.
The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.
Raises:
RuntimeError
: if called on a closed consumer
Returns:
Optional[string]
: Member id string or None
BaseConsumer.offsets_for_times
def offsets_for_times(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
Arguments:
partitions
(List[TopicPartition]
): topic+partitions with timestamps in the TopicPartition.offset field.timeout
(float
): The maximum response time before timing out. None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partition with offset field set and possibly error set
BaseConsumer.pause
Pause consumption for the provided list of partitions.
Paused partitions must be tracked manually.
Does NOT affect the result of Consumer.assignment()
.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to pause.
Raises:
KafkaException
: if a Kafka-based error occurs
BaseConsumer.resume
Resume consumption for the provided list of partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to resume.
Raises:
KafkaException
: if a Kafka-based error occurs
BaseConsumer.position
Retrieve current positions (offsets) for the specified partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partitions with offset and possibly error set.
BaseConsumer.seek
Set consume position for partition to offset.
The offset may be an absolute (>=0) or a
logical offset like OFFSET_BEGINNING
.
seek()
may only be used to update the consume offset of an
actively consumed partition (i.e., after Consumer.assign()
),
to set the starting offset of partition not being consumed instead
pass the offset in an assign()
call.
Arguments:
partition
(TopicPartition
): Topic+partition+offset to seek to.
Raises:
KafkaException
: if a Kafka-based error occurs
BaseConsumer.assignment
Returns the current partition assignment.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of assigned topic+partitions.
BaseConsumer.set_sasl_credentials
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
Arguments:
username
(str
): your usernamepassword
(str
): your password
BaseConsumer.incremental_assign
Assign new partitions.
Can be called outside the Consumer
on_assign
callback (multiple times).
Partitions immediately show on Consumer.assignment()
.
Any additional partitions besides the ones passed during the Consumer
on_assign
callback will NOT be associated with the consumer group.
Arguments:
partitions
(List[TopicPartition]
): a list of topic partitions
BaseConsumer.assign
Set the consumer partition assignment to the provided list of TopicPartition
and start consuming.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions and optionally initial offsets to start consuming from.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
BaseConsumer.incremental_unassign
Revoke partitions.
Can be called outside an on_revoke callback.
Arguments:
partitions
(List[TopicPartition]
): a list of topic partitions
BaseConsumer.unassign
Removes the current partition assignment and stops consuming.
Raises:
KafkaException
:RuntimeError
: if called on a closed consumer
BaseConsumer.close
Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming.
- Commits offsets, unless the consumer property 'enable.auto.commit' is set to False.
- Leaves the consumer group.
Registered callbacks may be called from this method,
see poll()
for more info.
BaseConsumer.consumer_group_metadata
Used by the producer during consumer offset sending for an EOS transaction.