Full Reference
quixstreams
quixstreams.core
quixstreams.core.stream
quixstreams.core.stream.stream
Stream
Stream.__init__
A base class for all streaming operations.
Stream
is an abstraction of a function pipeline.
Each Stream has a function and a parent (None by default).
When adding new function to the stream, it creates a new Stream
object and
sets "parent" to the previous Stream
to maintain an order of execution.
Streams supports 3 types of functions:
- "Apply" - generate new values based on a previous one.
The result of an Apply function is passed downstream to the next functions.
If "expand=True" is passed and the function returns an Iterable
,
each item of it will be treated as a separate value downstream.
- "Update" - update values in-place.
The result of an Update function is always ignored, and its input is passed
downstream.
- "Filter" - to filter values from the Stream.
The result of a Filter function is interpreted as boolean.
If it's True
, the input will be passed downstream.
If it's False
, the Filtered
exception will be raised to signal that the
value is filtered out.
To execute the functions on the Stream
, call .compose()
method, and
it will return a closure to execute all the functions accumulated in the Stream
and its parents.
Arguments:
func
: a function to be called on the stream. It is expected to be wrapped into one of "Apply", "Filter" or "Update" fromquixstreams.core.stream.functions
package. Default - "Apply(lambda v: v)".parent
: a parentStream
Stream.add_filter
Add a function to filter values from the Stream.
The return value of the function will be interpreted as bool
.
If the function returns False
-like result, the Stream will raise Filtered
exception during execution.
Arguments:
func
: a function to filter values from the stream
Returns:
a new Stream
derived from the current one
Stream.add_apply
Add an "apply" function to the Stream.
The function is supposed to return a new value, which will be passed further during execution.
Arguments:
func
: a function to generate a new valueexpand
: if True, expand the returned iterable into individual values downstream. If returned value is not iterable,TypeError
will be raised. Default -False
.
Returns:
a new Stream
derived from the current one
Stream.add_update
Add an "update" function to the Stream, that will mutate the input value.
The return of this function will be ignored and its input will be passed downstream.
Arguments:
func
: a function to mutate the value
Returns:
a new Stream derived from the current one
Stream.diff
Takes the difference between Streams self
and other
based on their last
common parent, and returns a new Stream
that includes only this difference.
It's impossible to calculate a diff when:
- Streams don't have a common parent.
- When the self
Stream already includes all the nodes from
the other
Stream, and the resulting diff is empty.
Arguments:
other
: aStream
to take a diff from.
Raises:
ValueError
: if Streams don't have a common parent or if the diff is empty.
Returns:
new Stream
instance including all the Streams from the diff
Stream.tree
Return a list of all parent Streams including the node itself.
The tree is ordered from child to parent (current node comes first).
Returns:
a list of Stream
objects
Stream.compose
def compose(allow_filters: bool = True,
allow_updates: bool = True,
allow_expands: bool = True) -> Callable[[T], R]
Compose a list of functions from this Stream
and its parents into one
big closure using a "composer" function.
Closures are more performant than calling all the functions in the
Stream.tree()
one-by-one.
Arguments:
allow_filters
: If False, this function will fail withValueError
if the stream has filter functions in the tree. Default - True.allow_updates
: If False, this function will fail withValueError
if the stream has update functions in the tree. Default - True.allow_expands
: If False, this function will fail withValueError
if the stream has functions with "expand=True" in the tree. Default - True.
Raises:
ValueError
: if disallowed functions are present in the stream tree.
quixstreams.core.stream.functions
StreamFunction
A base class for all the streaming operations in Quix Streams.
It provides two methods that return closures to be called on the input values:
- get_executor
- a wrapper to execute on a single value
- get_executor_expanded
- a wrapper to execute on an expanded value.
Expanded value is a list, where each item should be treated as a separate value.
StreamFunction.func
The original function
StreamFunction.get_executor
Returns a wrapper to be called on a single value.
StreamFunction.get_executor_expanded
Returns a wrapper to be called on a list of expanded values.
ApplyFunction
Wrap a function into "Apply" function.
The provided function is expected to return a new value based on input, and its result will always be passed downstream.
ApplyExpandFunction
Wrap a function into "Apply" function and expand the returned iterable into separate values downstream.
The provided function is expected to return an Iterable
.
If the returned value is not Iterable
, TypeError
will be raised.
FilterFunction
Wraps a function into a "Filter" function.
The result of a Filter function is interpreted as boolean.
If it's True
, the input will be return downstream.
If it's False
, the Filtered
exception will be raised to signal that the
value is filtered out.
UpdateFunction
Wrap a function into an "Update" function.
The provided function is expected to mutate the value or to perform some side effect. Its result will always be ignored, and its input is passed downstream.
compose
def compose(functions: List[StreamFunction],
allow_filters: bool = True,
allow_updates: bool = True,
allow_expands: bool = True) -> StreamCallable
Composes a list of functions and its parents into a single
big closure like this:
Closures are more performant than calling all functions one by one in a loop.
Arguments:
functions
: list ofStreamFunction
objects to composeallow_filters
: If False, will fail withValueError
if the list hasFilterFunction
. Default - True.allow_updates
: If False, will fail withValueError
if the list hasUpdateFunction
. Default - True.allow_expands
: If False, will fail withValueError
if the list hasApplyFunction
with "expand=True". Default - True.
Raises:
ValueError
: if disallowed functions are present in the list of functions.
composer
A function that wraps two other functions into a closure.
It passes the result of the inner function as an input to the outer function.
Returns:
a function with one argument (value)
quixstreams.dataframe.utils
ensure_milliseconds
Convert timedelta to milliseconds.
If the delta
is not
This function will also round the value to the closest milliseconds in case of
higher precision.
Arguments:
delta
:timedelta
object
Returns:
timedelta value in milliseconds as int
quixstreams.dataframe.windows
quixstreams.dataframe.windows.base
get_window_ranges
def get_window_ranges(timestamp_ms: int,
duration_ms: int,
step_ms: Optional[int] = None) -> List[Tuple[int, int]]
Get a list of window ranges for the given timestamp.
Arguments:
timestamp_ms
: timestamp in millisecondsduration_ms
: window duration in millisecondsstep_ms
: window step in milliseconds for hopping windows, optional.
Returns:
a list of (
quixstreams.dataframe.windows.time_based
FixedTimeWindow
FixedTimeWindow.final
Apply the window aggregation and return results only when the windows are
closed.
The format of returned windows:
{
"start": <window start time in milliseconds>,
"end": <window end time in milliseconds>,
"value: <aggregated window value>,
}
The individual window is closed when the event time (the maximum observed timestamp across the partition) passes its end timestamp + grace period. The closed windows cannot receive updates anymore and are considered final.
NOTE: Windows can be closed only within the same message key. If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message the same key is received.
Arguments:
expand
: ifTrue
, each window result will be sent downstream as an individual item. Otherwise, the list of window results will be sent. Default -True
FixedTimeWindow.current
Apply the window transformation to the StreamingDataFrame to return results
for each updated window.
The format of returned windows:
{
"start": <window start time in milliseconds>,
"end": <window end time in milliseconds>,
"value: <aggregated window value>,
}
This method processes streaming data and returns results as they come, regardless of whether the window is closed or not.
Arguments:
expand
: ifTrue
, each window result will be sent downstream as an individual item. Otherwise, the list of window results will be sent. Default -True
quixstreams.dataframe.windows.definitions
FixedTimeWindowDefinition
FixedTimeWindowDefinition.sum
Configure the window to aggregate data by summing up values within
each window period.
Returns:
an instance of FixedTimeWindow
configured to perform sum aggregation.
FixedTimeWindowDefinition.count
Configure the window to aggregate data by counting the number of values
within each window period.
Returns:
an instance of FixedTimeWindow
configured to perform record count.
FixedTimeWindowDefinition.mean
Configure the window to aggregate data by calculating the mean of the values
within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the mean
of the values.
FixedTimeWindowDefinition.reduce
def reduce(reducer: Callable[[Any, Any], Any],
initializer: Callable[[Any], Any]) -> "FixedTimeWindow"
Configure the window to perform a custom aggregation using reducer
and initializer
functions.
Example Snippet:
sdf = StreamingDataFrame(...)
# Using "reduce()" to calculate multiple aggregates at once
def reducer(agg: dict, current: int):
aggregated = {
'min': min(agg['min'], current),
'max': max(agg['max'], current)
'count': agg['count'] + 1
}
return aggregated
def initializer(current) -> dict:
return {'min': current, 'max': current, 'count': 1}
window = (
sdf.tumbling_window(duration_ms=1000)
.reduce(reducer=reducer, initializer=initializer)
.final()
)
Arguments:
reducer
: A function that takes two arguments (the accumulated value and a new value) and returns a single value. The returned value will be saved to the state store and sent downstream.initializer
: A function to call for every first element of the window. This function is used to initialize the aggregation within a window.
Returns:
A window configured to perform custom reduce aggregation on the data.
FixedTimeWindowDefinition.max
Configure a window to aggregate the maximum value within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the maximum
value within each window period.
FixedTimeWindowDefinition.min
Configure a window to aggregate the minimum value within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the maximum
value within each window period.
quixstreams.dataframe
quixstreams.dataframe.series
StreamingSeries
StreamingSeries
are typically generated by StreamingDataframes
when getting
elements from, or performing certain operations on, a StreamingDataframe
,
thus acting as a representation of "column" value.
They share some operations with the StreamingDataframe
, but also provide some
additional functionality.
Most column value operations are handled by this class, and StreamingSeries
can
generate other StreamingSeries
as a result of said operations.
What it Does:
- Allows ways to do simple operations with dataframe "column"/dictionary values:
- Basic ops like add, subtract, modulo, etc.
- Enables comparisons/inequalities:
- Greater than, equals, etc.
- and/or, is/not operations
- Can check for existence of columns in
StreamingDataFrames
- Enables chaining of various operations together
How to Use:
For the most part, you may not even notice this class exists!
They will naturally be created as a result of typical StreamingDataFrame
use.
Auto-complete should help you with valid methods and type-checking should alert
you to invalid operations between StreamingSeries
.
In general, any typical Pands dataframe operation between columns should be valid
with StreamingSeries
, and you shouldn't have to think about them explicitly.
Example Snippet:
# Random methods for example purposes. More detailed explanations found under
# various methods or in the docs folder.
sdf = StreamingDataframe()
sdf = sdf["column_a"].apply(a_func).apply(diff_func, stateful=True)
sdf["my_new_bool_field"] = sdf["column_b"].contains("this_string")
sdf["new_sum_field"] = sdf["column_c"] + sdf["column_d"] + 2
sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]
StreamingSeries.from_func
Create a StreamingSeries from a function.
The provided function will be wrapped into Apply
Arguments:
func
: a function to apply
Returns:
instance of StreamingSeries
StreamingSeries.apply
Add a callable to the execution list for this series.
The provided callable should accept a single argument, which will be its input. The provided callable should similarly return one output, or None
They can be chained together or included with other operations.
Example Snippet:
# The `StreamingSeries` are generated when `sdf["COLUMN_NAME"]` is called.
# This stores a string in state and capitalizes the column value; the result is
# assigned to a new column.
# Another apply converts a str column to an int, assigning it to a new column.
def func(value: str, state: State):
if value != state.get("my_store_key"):
state.set("my_store_key") = value
return v.upper()
sdf = StreamingDataframe()
sdf["new_col"] = sdf["a_column"]["nested_dict_key"].apply(func, stateful=True)
sdf["new_col_2"] = sdf["str_col"].apply(lambda v: int(v)) + sdf["str_col2"] + 2
Arguments:
func
: a callable with one argument and one output
Returns:
a new StreamingSeries
with the new callable added
StreamingSeries.compose
Compose all functions of this StreamingSeries into one big closure.
Closures are more performant than calling all the functions in the
StreamingDataFrame
one-by-one.
Generally not required by users; the quixstreams.app.Application
class will
do this automatically.
Example Snippet:
from quixstreams import Application
app = Application(...)
sdf = app.dataframe()
sdf = sdf["column_a"].apply(apply_func)
sdf = sdf["column_b"].contains(filter_func)
sdf = sdf.compose()
result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})
Arguments:
allow_filters
: If False, this function will fail with ValueError if the stream has filter functions in the tree. Default - True.allow_updates
: If False, this function will fail with ValueError if the stream has update functions in the tree. Default - True.
Raises:
ValueError
: if disallowed functions are present in the tree of underlyingStream
.
Returns:
a function that accepts "value"
and returns a result of StreamingSeries
StreamingSeries.test
A shorthand to test StreamingSeries
with provided value
and MessageContext
.
Arguments:
value
: value to pass throughStreamingSeries
ctx
: instance ofMessageContext
, optional. Provide it if the StreamingSeries instance has functions callingget_current_key()
. Default -None
.
Returns:
result of StreamingSeries
StreamingSeries.isin
Check if series value is in "other".
Same as "StreamingSeries in other".
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "str_column" is contained in a column with a list of strings and
# assign the resulting `bool` to a new column: "has_my_str".
sdf = app.dataframe()
sdf["has_my_str"] = sdf["str_column"].isin(sdf["column_with_list_of_strs"])
Arguments:
other
: a container to check
Returns:
new StreamingSeries
StreamingSeries.contains
Check if series value contains "other"
Same as "other in StreamingSeries".
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" contains "my_substring" and assign the resulting
# `bool` to a new column: "has_my_substr"
sdf = app.dataframe()
sdf["has_my_substr"] = sdf["column_a"].contains("my_substring")
Arguments:
other
: object to check
Returns:
new StreamingSeries
StreamingSeries.is_
Check if series value refers to the same object as other
Runtime result will be a bool
.
Example Snippet:
# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_same"
from quixstreams import Application
sdf = app.dataframe()
sdf["is_same"] = sdf["column_a"].is_(sdf["column_b"])
Arguments:
other
: object to check for "is"
Returns:
new StreamingSeries
StreamingSeries.isnot
Check if series value does not refer to the same object as other
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_not_same"
sdf = app.dataframe()
sdf["is_not_same"] = sdf["column_a"].isnot(sdf["column_b"])
Arguments:
other
: object to check for "is_not"
Returns:
new StreamingSeries
StreamingSeries.isnull
Check if series value is None.
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is null and assign the resulting `bool` to a new column:
# "is_null"
sdf = app.dataframe()
sdf["is_null"] = sdf["column_a"].isnull()
Returns:
new StreamingSeries
StreamingSeries.notnull
Check if series value is not None.
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is not null and assign the resulting `bool` to a new column:
# "is_not_null"
sdf = app.dataframe()
sdf["is_not_null"] = sdf["column_a"].notnull()
Returns:
new StreamingSeries
StreamingSeries.abs
Get absolute value of the series value.
Example Snippet:
from quixstreams import Application
# Get absolute value of "int_col" and add it to "other_int_col".
# Finally, assign the result to a new column: "abs_col_sum".
sdf = app.dataframe()
sdf["abs_col_sum"] = sdf["int_col"].abs() + sdf["other_int_col"]
Returns:
new StreamingSeries
quixstreams.dataframe.base
quixstreams.dataframe.exceptions
quixstreams.dataframe.dataframe
StreamingDataFrame
StreamingDataFrame
is the main object you will use for ETL work.
Typically created with an app = quixstreams.app.Application()
instance,
via sdf = app.dataframe()
.
What it Does:
- Builds a data processing pipeline, declaratively (not executed immediately)
- Executes this pipeline on inputs at runtime (Kafka message values)
- Provides functions/interface similar to Pandas Dataframes/Series
- Enables stateful processing (and manages everything related to it)
How to Use:
Define various operations while continuously reassigning to itself (or new fields).
These operations will generally transform your data, access/update state, or produce to kafka topics.
We recommend your data structure to be "columnar" (aka a dict/JSON) in nature so
that it works with the entire interface, but simple types like ints
, str
, etc.
are also supported.
See the various methods and classes for more specifics, or for a deep dive into
usage, see streamingdataframe.md
under the docs/
folder.
NOTE: column referencing like
sdf["a_column"]
and various methods often create other object types (typicallyquixstreams.dataframe.StreamingSeries
), which is expected; type hinting should alert you to any issues should you attempt invalid operations with said objects (however, we cannot infer whether an operation is valid with respect to your data!).
Example Snippet:
sdf = StreamingDataframe()
sdf = sdf.apply(a_func)
sdf = sdf.filter(another_func)
sdf = sdf.to_topic(topic_obj)
StreamingDataFrame.apply
def apply(func: Union[DataFrameFunc, DataFrameStatefulFunc],
stateful: bool = False,
expand: bool = False) -> Self
Apply a function to transform the value and return a new value.
The result will be passed downstream as an input value.
Example Snippet:
# This stores a string in state and capitalizes every column with a string value.
# A second apply then keeps only the string value columns (shows non-stateful).
def func(d: dict, state: State):
value = d["store_field"]
if value != state.get("my_store_key"):
state.set("my_store_key") = value
return {k: v.upper() if isinstance(v, str) else v for k, v in d.items()}
sdf = StreamingDataframe()
sdf = sdf.apply(func, stateful=True)
sdf = sdf.apply(lambda d: {k: v for k,v in d.items() if isinstance(v, str)})
Arguments:
func
: a function to applystateful
: ifTrue
, the function will be provided with a second argument of typeState
to perform stateful operations.expand
: if True, expand the returned iterable into individual values downstream. If returned value is not iterable,TypeError
will be raised. Default -False
.
StreamingDataFrame.update
Apply a function to mutate value in-place or to perform a side effect
that doesn't update the value (e.g. print a value to the console).
The result of the function will be ignored, and the original value will be passed downstream.
Example Snippet:
# Stores a value and mutates a list by appending a new item to it.
# Also prints to console.
def func(values: list, state: State):
value = values[0]
if value != state.get("my_store_key"):
state.set("my_store_key") = value
values.append("new_item")
sdf = StreamingDataframe()
sdf = sdf.update(func, stateful=True)
sdf = sdf.update(lambda value: print("Received value: ", value))
Arguments:
func
: function to update valuestateful
: ifTrue
, the function will be provided with a second argument of typeState
to perform stateful operations.
StreamingDataFrame.filter
Filter value using provided function.
If the function returns True-like value, the original value will be
passed downstream.
Otherwise, the Filtered
exception will be raised (further processing for that
message will be skipped).
Example Snippet:
# Stores a value and allows further processing only if the value is greater than
# what was previously stored.
def func(d: dict, state: State):
value = d["my_value"]
if value > state.get("my_store_key"):
state.set("my_store_key") = value
return True
return False
sdf = StreamingDataframe()
sdf = sdf.filter(func, stateful=True)
Arguments:
func
: function to filter valuestateful
: ifTrue
, the function will be provided with second argument of typeState
to perform stateful operations.
StreamingDataFrame.contains
Check if the key is present in the Row value.
Example Snippet:
# Add new column 'has_column' which contains a boolean indicating
# the presence of 'column_x'
sdf = StreamingDataframe()
sdf['has_column'] = sdf.contains('column_x')
Arguments:
key
: a column name to check.
Returns:
a Column object that evaluates to True if the key is present or False otherwise.
StreamingDataFrame.to_topic
Produce current value to a topic. You can optionally specify a new key.
NOTE: A
RowProducer
instance must be assigned toStreamingDataFrame.producer
if not using :class:quixstreams.app.Application
to facilitate the execution of StreamingDataFrame.
Example Snippet:
from quixstreams import Application
# Produce to two different topics, changing the key for one of them.
app = Application()
input_topic = app.topic("input_x")
output_topic_0 = app.topic("output_a")
output_topic_1 = app.topic("output_b")
sdf = app.dataframe(input_topic)
sdf = sdf.to_topic(output_topic_0)
sdf = sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])
Arguments:
topic
: instance ofTopic
key
: a callable to generate a new message key, optional. If passed, the return type of this callable must be serializable bykey_serializer
defined for this Topic object. By default, the current message key will be used.
StreamingDataFrame.compose
Compose all functions of this StreamingDataFrame into one big closure.
Closures are more performant than calling all the functions in the
StreamingDataFrame
one-by-one.
Generally not required by users; the quixstreams.app.Application
class will
do this automatically.
Example Snippet:
from quixstreams import Application
sdf = app.dataframe()
sdf = sdf.apply(apply_func)
sdf = sdf.filter(filter_func)
sdf = sdf.compose()
result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})
Returns:
a function that accepts "value" and returns a result of StreamingDataFrame
StreamingDataFrame.test
A shorthand to test StreamingDataFrame
with provided value
and MessageContext
.
Arguments:
value
: value to pass throughStreamingDataFrame
ctx
: instance ofMessageContext
, optional. Provide it if the StreamingDataFrame instance callsto_topic()
, has stateful functions or functions callingget_current_key()
. Default -None
.
Returns:
result of StreamingDataFrame
StreamingDataFrame.tumbling_window
def tumbling_window(duration_ms: Union[int, timedelta],
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None) -> TumblingWindowDefinition
Create a tumbling window transformation on this StreamingDataFrame.
Tumbling windows divide time into fixed-sized, non-overlapping windows.
They allow to perform stateful aggregations like sum
, reduce
, etc.
on top of the data and emit results downstream.
Notes:
- Every window is grouped by the current Kafka message key.
- Messages with
None
key will be ignored. - The time windows always use the current event time.
Example Snippet:
app = Application()
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 60s and grace period of 10s
sdf.tumbling_window(
duration_ms=timedelta(seconds=60), grace_ms=timedelta(seconds=10.0)
)
# Specify the aggregation function
.sum()
# Specify how the results should be emitted downstream.
# "all()" will emit results as they come for each updated window,
# possibly producing multiple messages per key-window pair
# "final()" will emit windows only when they are closed and cannot
# receive any updates anymore.
.all()
)
Arguments:
duration_ms
: The length of each window. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.grace_ms
: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either anint
representing milliseconds or as atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.name
: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
Returns:
TumblingWindowDefinition
instance representing the tumbling window
configuration.
This object can be further configured with aggregation functions
like sum
, count
, etc. applied to the StreamingDataFrame.
StreamingDataFrame.hopping_window
def hopping_window(duration_ms: Union[int, timedelta],
step_ms: Union[int, timedelta],
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None) -> HoppingWindowDefinition
Create a hopping window transformation on this StreamingDataFrame.
Hopping windows divide the data stream into overlapping windows based on time.
The overlap is controlled by the step_ms
parameter.
They allow to perform stateful aggregations like sum
, reduce
, etc.
on top of the data and emit results downstream.
Notes:
- Every window is grouped by the current Kafka message key.
- Messages with
None
key will be ignored. - The time windows always use the current event time.
Example Snippet:
app = Application()
sdf = app.dataframe(...)
sdf = (
# Define a hopping window of 60s with step 30s and grace period of 10s
sdf.hopping_window(
duration_ms=timedelta(seconds=60),
step_ms=timedelta(seconds=30),
grace_ms=timedelta(seconds=10)
)
# Specify the aggregation function
.sum()
# Specify how the results should be emitted downstream.
# "all()" will emit results as they come for each updated window,
# possibly producing multiple messages per key-window pair
# "final()" will emit windows only when they are closed and cannot
# receive any updates anymore.
.all()
)
Arguments:
duration_ms
: The length of each window. It defines the time span for which each window aggregates data. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.step_ms
: The step size for the window. It determines how much each successive window moves forward in time. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.grace_ms
: The grace period for data arrival. It allows late-arriving data to be included in the window, even if it arrives after the window has theoretically moved forward. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.name
: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
Returns:
HoppingWindowDefinition
instance representing the hopping
window configuration.
This object can be further configured with aggregation functions
like sum
, count
, etc. and applied to the StreamingDataFrame.
quixstreams.error_callbacks
quixstreams.exceptions
quixstreams.exceptions.base
quixstreams.exceptions.assignment
PartitionAssignmentError
Error happened during partition rebalancing.
Raised from on_assign
, on_revoke
and on_lost
callbacks
quixstreams.kafka
quixstreams.kafka.consumer
Consumer
Consumer.__init__
def __init__(broker_address: str,
consumer_group: Optional[str],
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
assignment_strategy: AssignmentStrategy = "range",
on_commit: Optional[Callable[
[Optional[KafkaError], List[TopicPartition]], None]] = None,
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Consumer
.
It initializes confluent_kafka.Consumer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Consumer
.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:- "earliest" - automatically reset the offset to the smallest offset
- "latest" - automatically reset the offset to the largest offset
- "error" - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages (used for testing)
auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.assignment_strategy
: The name of a partition assignment strategy. Available values: "range", "roundrobin", "cooperative-sticky".on_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.
Consumer.poll
Consumes a single message, calls callbacks and returns events.
The application must check the returned class:Message
object's func:Message.error()
method to distinguish between proper
messages (error() returns None), or an event or error.
Note: Callbacks may be called from this method, such as
on_assign
, on_revoke
, et al.
Arguments:
timeout
(float
): Maximum time in seconds to block waiting for message, event or callback. None or -1 is infinite. Default: None.
Raises:
None
: RuntimeError if called on a closed consumer
Returns:
A Message object or None on timeout
Consumer.subscribe
def subscribe(topics: List[str],
on_assign: Optional[RebalancingCallback] = None,
on_revoke: Optional[RebalancingCallback] = None,
on_lost: Optional[RebalancingCallback] = None)
Set subscription to supplied list of topics
This replaces a previous subscription.
Arguments:
topics
(list(str)
): List of topics (strings) to subscribe to.on_assign
(callable
): callback to provide handling of customized offsets on completion of a successful partition re-assignment.on_revoke
(callable
): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.on_lost
(callable
): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
Raises:
KafkaException
:None
: RuntimeError if called on a closed consumer .. py:function:: on_assign(consumer, partitions) .. py:function:: on_revoke(consumer, partitions) .. py:function:: on_lost(consumer, partitions)
:param Consumer consumer: Consumer instance. :param list(TopicPartition) partitions: Absolute list of partitions being assigned or revoked.
Consumer.unsubscribe
Remove current subscription.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.store_offsets
def store_offsets(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None)
.. py:function:: store_offsets([message=None], [offsets=None])
Store offsets for a message or a list of offsets.
message
and offsets
are mutually exclusive. The stored offsets
will be committed according to 'auto.commit.interval.ms' or manual
offset-less commit
.
Note that 'enable.auto.offset.store' must be set to False when using this API.
Arguments:
message
(confluent_kafka.Message
): Store message's offset+1.offsets
(list(TopicPartition)
): List of topic+partitions+offsets to store.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.commit
def commit(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None,
asynchronous: bool = True) -> Optional[List[TopicPartition]]
Commit a message or a list of offsets.
The message
and offsets
parameters are mutually exclusive.
If neither is set, the current partition assignment's offsets are used instead.
Use this method to commit offsets if you have 'enable.auto.commit' set to False.
Arguments:
message
(confluent_kafka.Message
): Commit the message's offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.offsets
(list(TopicPartition)
): List of topic+partitions+offsets to commit.asynchronous
(bool
): If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Consumer.committed
def committed(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
.. py:function:: committed(partitions, [timeout=None])
Retrieve committed offsets for the specified partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to query for stored offsets.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Default: None
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of topic+partitions with offset and possibly error set.
Consumer.get_watermark_offsets
def get_watermark_offsets(partition: TopicPartition,
timeout: Optional[float] = None,
cached: bool = False) -> Tuple[int, int]
Retrieve low and high offsets for the specified partition.
Arguments:
partition
(TopicPartition
): Topic+partition to return offsets for.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Ignored if cached=True. Default: Nonecached
(bool
): Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
tuple(int,int)
: Tuple of (low,high) on success or None on timeout.
The high offset is the offset of the last message + 1.
Consumer.list_topics
.. py:function:: list_topics([topic=None], [timeout=-1])
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Arguments:
topic
(str
): If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.timeout
(float
): The maximum response time before timing out None or -1 is infinite. Default: None
Raises:
None
: KafkaException
Consumer.memberid
Return this client's broker-assigned group member id.
The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.
:returns: Member id string or None :rtype: string :raises: RuntimeError if called on a closed consumer
Consumer.offsets_for_times
def offsets_for_times(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
:param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field. :param float timeout: The maximum response time before timing out. None or -1 is infinite. Default: None :returns: List of topic+partition with offset field set and possibly error set :rtype: list(TopicPartition) :raises: KafkaException :raises: RuntimeError if called on a closed consumer
Consumer.pause
Pause consumption for the provided list of partitions.
Paused partitions must be tracked manually.
Does NOT affect the result of Consumer.assignment().
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to pause.
Raises:
None
: KafkaException
Consumer.resume
.. py:function:: resume(partitions)
Resume consumption for the provided list of partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to resume.
Raises:
None
: KafkaException
Consumer.position
Retrieve current positions (offsets) for the specified partitions.
Arguments:
partitions
(list(TopicPartition)
): List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of topic+partitions with offset and possibly error set.
Consumer.seek
Set consume position for partition to offset.
The offset may be an absolute (>=0) or a
logical offset (const:OFFSET_BEGINNING
et.al).
seek() may only be used to update the consume offset of an
actively consumed partition (i.e., after const:assign()
),
to set the starting offset of partition not being consumed instead
pass the offset in an assign()
call.
Arguments:
partition
(TopicPartition
): Topic+partition+offset to seek to.
Raises:
None
: KafkaException
Consumer.assignment
Returns the current partition assignment.
Raises:
None
: KafkaExceptionNone
: RuntimeError if called on a closed consumer
Returns:
list(TopicPartition)
: List of assigned topic+partitions.
Consumer.set_sasl_credentials
Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
Consumer.incremental_assign
Assign new partitions.
Can be called outside the Consumer
on_assign
callback (multiple times).
Partitions immediately show on Consumer.assignment()
.
Any additional partitions besides the ones passed during the Consumer
on_assign
callback will NOT be associated with the consumer group.
Consumer.incremental_unassign
Revoke partitions.
Can be called outside an on_revoke callback.
Consumer.close
Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming.
- Commits offsets, unless the consumer property 'enable.auto.commit' is set to False.
- Leaves the consumer group.
Registered callbacks may be called from this method,
see poll()
for more info.
quixstreams.kafka.producer
Producer
Producer.__init__
def __init__(broker_address: str,
partitioner: Partitioner = "murmur2",
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Producer
.
It initializes confluent_kafka.Producer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Producer
.partitioner
: A function to be used to determine the outgoing message partition. Available values: "random", "consistent_random", "murmur2", "murmur2_random", "fnv1a", "fnv1a_random" Default - "murmur2".extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is. Note: values passed as arguments override values inextra_config
.
Producer.produce
def produce(topic: str,
value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3)
Produce message to topic.
It also polls Kafka for callbacks before producing in order to minimize
the probability of BufferError
.
If BufferError
still happens, the method will poll Kafka with timeout
to free up the buffer and try again.
Arguments:
topic
: topic namevalue
: message valuekey
: message keyheaders
: message headerspartition
: topic partitiontimestamp
: message timestamppoll_timeout
: timeout forpoll()
call in case ofBufferError
buffer_error_max_tries
: max retries forBufferError
. Pass0
to not retry afterBufferError
.
Producer.poll
Polls the producer for events and calls on_delivery
callbacks.
Arguments:
timeout
: poll timeout seconds; Default: 0 (unlike others)NOTE: -1 will hang indefinitely if there are no messages to acknowledge
Producer.flush
Wait for all messages in the Producer queue to be delivered.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None or -1 is infinite. Default: None
Returns:
number of messages remaining to flush
quixstreams.models
quixstreams.models.serializers
quixstreams.models.serializers.json
JSONSerializer
JSONSerializer.__init__
Serializer that returns data in json format.
Arguments:
dumps
: a function to serialize objects to json. Default - func:quixstreams.utils.json.dumps
JSONDeserializer
JSONDeserializer.__init__
def __init__(column_name: Optional[str] = None,
loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)
Deserializer that parses data from JSON
Arguments:
column_name
: if provided, the deserialized value will be wrapped into dictionary withcolumn_name
as a key.loads
: function to parse json from bytes. Default - func:quixstreams.utils.json.loads
.
quixstreams.models.serializers.simple_types
BytesDeserializer
A deserializer to bypass bytes without any changes
BytesSerializer
A serializer to bypass bytes without any changes
StringDeserializer
StringDeserializer.__init__
Deserializes bytes to strings using the specified encoding.
Arguments:
codec
: string encoding A wrapper aroundconfluent_kafka.serialization.StringDeserializer
.
IntegerDeserializer
Deserializes bytes to integers.
A wrapper around confluent_kafka.serialization.IntegerDeserializer
.
DoubleDeserializer
Deserializes float to IEEE 764 binary64.
A wrapper around confluent_kafka.serialization.DoubleDeserializer
.
StringSerializer
StringSerializer.__init__
Serializes strings to bytes using the specified encoding.
Arguments:
codec
: string encoding
IntegerSerializer
Serializes integers to bytes
DoubleSerializer
Serializes floats to bytes
quixstreams.models.serializers.quix
QuixDeserializer
Handles Deserialization for any Quix-formatted topic.
Parses JSON data from either TimeseriesData
and EventData
(ignores the rest).
QuixDeserializer.__init__
def __init__(column_name: Optional[str] = None,
loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)
Arguments:
column_name
: if provided, the deserialized value will be wrapped into dictionary withcolumn_name
as a key.loads
: function to parse json from bytes. Default - func:quixstreams.utils.json.loads
.
QuixDeserializer.split_values
Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.
QuixDeserializer.deserialize
Deserialization function for particular data types (Timeseries or EventData).
Arguments:
model_key
: value of "__Q_ModelKey" message headervalue
: deserialized JSON value of the message, list or dict
Returns:
Iterable of dicts
QuixSerializer
QuixSerializer.__init__
Serializer that returns data in json format.
Arguments:
as_legacy
: parse as the legacy format; Default = Truedumps
: a function to serialize objects to json. Default - func:quixstreams.utils.json.dumps
QuixTimeseriesSerializer
Serialize data to JSON formatted according to Quix Timeseries format.
The serializable object must be dictionary, and each item must be of str
, int
,
float
, bytes
or bytearray
type.
Otherwise, the SerializationError
will be raised.
Input:
Output:
{
"Timestamps": [123123123],
"NumericValues": {"a": [1], "b": [1.1]},
"StringValues": {"c": ["string"]},
"BinaryValues": {"d": ["Ynl0ZXM="]},
"TagValues": {"tag1": ["tag"]}
}
QuixEventsSerializer
Serialize data to JSON formatted according to Quix EventData format.
The input value is expected to be a dictionary with the following keys:
- "Id" (type str
, default - "")
- "Value" (type str
, default - ""),
- "Tags" (type dict
, default - {})
NOTE: All the other fields will be ignored.
Input:
Output:
{
"Id": "an_event",
"Value": "any_string",
"Tags": {"tag1": "tag"}},
"Timestamp":1692703362840389000
}
quixstreams.models.serializers.base
SerializationContext
Provides additional context for message serialization/deserialization.
Every Serializer
and Deserializer
receives an instance of SerializationContext
SerializationContext.to_confluent_ctx
Convert SerializationContext
to confluent_kafka.SerializationContext
in order to re-use serialization already provided by confluent_kafka
library.
Arguments:
field
: instance ofconfluent_kafka.serialization.MessageField
Returns:
instance of confluent_kafka.serialization.SerializationContext
Deserializer
Deserializer.__init__
A base class for all Deserializers
Arguments:
column_name
: if provided, the deserialized value will be wrapped into dictionary withcolumn_name
as a key.
Deserializer.split_values
Return True if the deserialized message should be considered as Iterable and each item in it should be processed as a separate message.
Serializer
A base class for all Serializers
Serializer.extra_headers
Informs producer to set additional headers
for the message it will be serializing
Must return a dictionary with headers. Keys must be strings, and values must be strings, bytes or None.
Returns:
dict with headers
quixstreams.models.serializers.exceptions
IgnoreMessage
Raise this exception from Deserializer.call in order to ignore the processing of the particular message.
quixstreams.models.topics
quixstreams.models.topics.manager
affirm_ready_for_create
Validate a list of topics is ready for creation attempt
Arguments:
topics
: list ofTopic
s
TopicManager
The source of all topic management with quixstreams.
Generally initialized and managed automatically by an Application
,
but allows a user to work with it directly when needed, such as using it alongside
a plain Producer
to create its topics.
See methods for details.
TopicManager.__init__
Arguments:
topic_admin
: anAdmin
instance (required for some functionality)create_timeout
: timeout for topic creation
TopicManager.changelog_topics
Note: Topic
s are the changelogs.
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.topic_config
def topic_config(num_partitions: Optional[int] = None,
replication_factor: Optional[int] = None,
extra_config: Optional[dict] = None) -> TopicConfig
Convenience method for generating a TopicConfig
with default settings
Arguments:
num_partitions
: the number of topic partitionsreplication_factor
: the topic replication factorextra_config
: other optional configuration settings
Returns:
a TopicConfig object
TopicManager.topic
def topic(name: str,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = "bytes",
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = "bytes",
config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
A convenience method for generating a Topic
. Will use default config options
as dictated by the TopicManager.
Arguments:
name
: topic namevalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keysconfig
: optional topic configurations (for creation/validation)timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Returns:
Topic object with creation configs
TopicManager.changelog_topic
Performs all the logic necessary to generate a changelog topic based on a
"source topic" (aka input/consumed topic).
Its main goal is to ensure partition counts of the to-be generated changelog
match the source topic, and ensure the changelog topic is compacted. Also
enforces the serialization type. All Topic
objects generated with this are
stored on the TopicManager.
If source topic already exists, defers to the existing topic settings, else
uses the settings as defined by the Topic
(and its defaults) as generated
by the TopicManager
.
In general, users should NOT need this; an Application knows when/how to
generate changelog topics. To turn off changelogs, init an Application with
"use_changelog_topics"=False
.
Arguments:
consumer_group
: name of consumer group (for this app)topic_name
: name of consumed topic (app input topic)NOTE: normally contain any prefixes added by TopicManager.topic()
store_name
: name of the store this changelog belongs to (default, rolling10s, etc.)
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.create_topics
Creates topics via an explicit list of provided Topics
.
Exists as a way to manually specify what topics to create; otherwise,
create_all_topics()
is generally simpler.
Arguments:
topics
: list ofTopic
s
TopicManager.create_all_topics
A convenience method to create all Topic objects stored on this TopicManager.
TopicManager.validate_all_topics
Validates all topics exist and changelogs have correct topic and rep factor.
Issues are pooled and raised as an Exception once inspections are complete.
quixstreams.models.topics.admin
convert_topic_list
Converts Topic
s to ConfluentTopic
s as required for Confluent's
AdminClient.create_topic()
.
Arguments:
topics
: list ofTopic
s
Returns:
list of confluent_kafka ConfluentTopic
s
TopicAdmin
For performing "admin"-level operations on a Kafka cluster, mostly around topics.
Primarily used to create and inspect topic configurations.
TopicAdmin.__init__
Arguments:
broker_address
: the address for the brokerextra_config
: optional configs (generally accepts producer configs)
TopicAdmin.list_topics
Get a list of topics and their metadata from a Kafka cluster
Returns:
a dict of topic names and their metadata objects
TopicAdmin.inspect_topics
A simplified way of getting the topic configurations of the provided topics
from the cluster (if they exist).
Arguments:
topic_names
: a list of topic names
Returns:
a dict with topic names and their respective TopicConfig
TopicAdmin.create_topics
Create the given list of topics and confirm they are ready.
Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).
Arguments:
topics
: a list ofTopic
timeout
: timeout of the creation broker requestfinalize_timeout
: the timeout of the topic finalizing ("ready")
quixstreams.models.topics.exceptions
quixstreams.models.topics.topic
TopicConfig
Represents all kafka-level configuration for a kafka topic.
Generally used by Topic and any topic creation procedures.
Topic
A definition of a Kafka topic.
Typically created with an app = quixstreams.app.Application()
instance via
app.topic()
, and used by quixstreams.dataframe.StreamingDataFrame
instance.
Topic.__init__
def __init__(
name: str,
config: TopicConfig,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = BytesSerializer(),
timestamp_extractor: Optional[TimestampExtractor] = None)
Arguments:
name
: topic namevalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keysconfig
: optional topic configs viaTopicConfig
(creation/validation)timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Topic.name
Topic name
Topic.row_serialize
Serialize Row to a Kafka message structure
Arguments:
row
: Row to serializekey
: message key to serialize, optional. Default - current Row key.
Returns:
KafkaMessage object with serialized values
Topic.row_deserialize
Deserialize incoming Kafka message to a Row.
Arguments:
message
: an object with interface ofconfluent_kafka.Message
Returns:
Row, list of Rows or None if the message is ignored.
quixstreams.models.messagecontext
MessageContext
An object with Kafka message properties.
It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.
quixstreams.models.messages
quixstreams.models.rows
Row
Row is a dict-like interface on top of the message data + some Kafka props
Row.keys
Also allows unpacking row.value via **row
Row.clone
Manually clone the Row; doing it this way is much faster than doing a deepcopy on the entire Row object.
quixstreams.models.timestamps
TimestampType
TIMESTAMP_NOT_AVAILABLE
timestamps not supported by broker
TIMESTAMP_CREATE_TIME
message creation time (or source / producer time)
TIMESTAMP_LOG_APPEND_TIME
broker receive time
MessageTimestamp
Represents a timestamp of incoming Kafka message.
It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.
MessageTimestamp.create
Create a Timestamp object based on data
from confluent_kafka.Message.timestamp()
.
If timestamp type is "TIMESTAMP_NOT_AVAILABLE", the milliseconds are set to None
Arguments:
timestamp_type
: a timestamp type represented as a number Can be one of:- "0" - TIMESTAMP_NOT_AVAILABLE, timestamps not supported by broker.
- "1" - TIMESTAMP_CREATE_TIME, message creation time (or source / producer time).
- "2" - TIMESTAMP_LOG_APPEND_TIME, broker receive time.
milliseconds
: the number of milliseconds since the epoch (UTC).
Returns:
Timestamp object
quixstreams.models.types
ConfluentKafkaMessageProto
An interface of confluent_kafka.Message
.
Use it to not depend on exact implementation and simplify testing.
Instances of confluent_kafka.Message
cannot be directly created from Python,
see https://github.com/confluentinc/confluent-kafka-python/issues/1535.
quixstreams.platforms
quixstreams.platforms.quix.checks
check_state_management_enabled
Check if State Management feature is enabled for the current deployment on Quix platform. If it's disabled, the exception will be raised.
check_state_dir
Check if Application "state_dir" matches the state dir on Quix platform.
If it doesn't match, the warning will be logged.
Arguments:
state_dir
: application state_dir path
quixstreams.platforms.quix.config
TopicCreationConfigs
name
Required when not created by a Quix App.
strip_workspace_id_prefix
Remove the workspace ID from a given string if it starts with it,
typically a topic or consumer group id
Arguments:
workspace_id
: the workspace ids
: the string to append to
Returns:
the string with workspace_id prefix removed
prepend_workspace_id
Add the workspace ID as a prefix to a given string if it does not have it,
typically a topic or consumer group it
Arguments:
workspace_id
: the workspace ids
: the string to append to
Returns:
the string with workspace_id prepended
QuixKafkaConfigsBuilder
Retrieves all the necessary information from the Quix API and builds all the objects required to connect a confluent-kafka client to the Quix Platform.
If not executed within the Quix platform directly, you must provide a Quix "streaming" (aka "sdk") token, or Personal Access Token.
Ideally you also know your workspace name or id. If not, you can search for it using a known topic name, but note the search space is limited to the access level of your token.
It also currently handles the app_auto_create_topics setting for Application.Quix.
QuixKafkaConfigsBuilder.__init__
def __init__(quix_sdk_token: Optional[str] = None,
workspace_id: Optional[str] = None,
workspace_cert_path: Optional[str] = None,
quix_portal_api_service: Optional[QuixPortalApiService] = None)
Arguments:
quix_portal_api_service
: A QuixPortalApiService instance (else generated)workspace_id
: A valid Quix Workspace ID (else searched for)workspace_cert_path
: path to an existing workspace cert (else retrieved)
QuixKafkaConfigsBuilder.strip_workspace_id_prefix
Remove the workspace ID from a given string if it starts with it,
typically a topic or consumer group id
Arguments:
s
: the string to append to
Returns:
the string with workspace_id prefix removed
QuixKafkaConfigsBuilder.prepend_workspace_id
Add the workspace ID as a prefix to a given string if it does not have it,
typically a topic or consumer group it
Arguments:
s
: the string to append to
Returns:
the string with workspace_id prepended
QuixKafkaConfigsBuilder.search_for_workspace
Search for a workspace given an expected workspace name or id.
Arguments:
workspace_name_or_id
: the expected name or id of a workspace
Returns:
the workspace data dict if search success, else None
QuixKafkaConfigsBuilder.get_workspace_info
Queries for workspace data from the Quix API, regardless of instance cache,
and updates instance attributes from query result.
Arguments:
known_workspace_topic
: a topic you know to exist in some workspace
QuixKafkaConfigsBuilder.search_workspace_for_topic
Search through all the topics in the given workspace id to see if there is a
match with the provided topic.
Arguments:
workspace_id
: the workspace to search intopic
: the topic to search for
Returns:
the workspace_id if success, else None
QuixKafkaConfigsBuilder.search_for_topic_workspace
Find what workspace a topic belongs to.
If there is only one workspace altogether, it is assumed to be the workspace. More than one means each workspace will be searched until the first hit.
Arguments:
topic
: the topic to search for
Returns:
workspace data dict if topic search success, else None
QuixKafkaConfigsBuilder.get_workspace_ssl_cert
Gets and extracts zipped certificate from the API to provided folder if the
SSL certificate is specified in broker configuration.
If no path was provided, will dump to /tmp. Expects cert named 'ca.cert'.
Arguments:
extract_to_folder
: path to folder to dump zipped cert file to
Returns:
full cert filepath as string or None
if certificate is not specified
QuixKafkaConfigsBuilder.create_topics
Create topics in a Quix cluster.
Arguments:
topics
: a list ofTopic
objectsfinalize_timeout_seconds
: How long to wait for the topics to be marked as "Ready" (and thus ready to produce to/consume from).
QuixKafkaConfigsBuilder.confirm_topics_exist
Confirm whether the desired set of topics exists in the Quix workspace.
Arguments:
topics
: a list ofTopic
or topic names
QuixKafkaConfigsBuilder.get_confluent_broker_config
Get the full client config dictionary required to authenticate a confluent-kafka
client to a Quix platform broker/workspace.
The returned config can be used directly by any confluent-kafka-python consumer/ producer (add your producer/consumer-specific configs afterward).
Arguments:
known_topic
: a topic known to exist in some workspace
Returns:
a dict of confluent-kafka-python client settings (see librdkafka config for more details)
QuixKafkaConfigsBuilder.get_confluent_client_configs
def get_confluent_client_configs(
topics: list,
consumer_group_id: Optional[str] = None
) -> Tuple[dict, List[str], Optional[str]]
Get all the values you need in order to use a confluent_kafka-based client
with a topic on a Quix platform broker/workspace.
The returned config can be used directly by any confluent-kafka-python consumer/ producer (add your producer/consumer-specific configs afterward).
The topics and consumer group are appended with any necessary values.
Arguments:
topics
: list of topicsconsumer_group_id
: consumer group id, if needed
Returns:
a tuple with configs and altered versions of the topics and consumer group name
quixstreams.platforms.quix.env
QuixEnvironment
Class to access various Quix platform environment settings
QuixEnvironment.state_management_enabled
Check whether "State management" is enabled for the current deployment
Returns:
True if state management is enabled, otherwise False
QuixEnvironment.deployment_id
Return current Quix deployment id.
This variable is meant to be set only by Quix Platform and only when the application is deployed.
Returns:
deployment id or None
QuixEnvironment.workspace_id
Return Quix workspace id if set
Returns:
workspace id or None
QuixEnvironment.portal_api
Return Quix Portal API url if set
Returns:
portal API URL or None
QuixEnvironment.state_dir
Return application state directory on Quix.
Returns:
path to state dir
quixstreams.platforms.quix.topic_manager
QuixTopicManager
The source of all topic management with quixstreams.
This is specifically for Applications using the Quix platform.
Generally initialized and managed automatically by an Application.Quix
,
but allows a user to work with it directly when needed, such as using it alongside
a plain Producer
to create its topics.
See methods for details.
QuixTopicManager.__init__
def __init__(topic_admin: TopicAdmin,
quix_config_builder: QuixKafkaConfigsBuilder,
create_timeout: int = 60)
Arguments:
topic_admin
: anAdmin
instancecreate_timeout
: timeout for topic creationquix_config_builder
: A QuixKafkaConfigsBuilder instance, else one is generated for you.
quixstreams.platforms.quix
quixstreams.platforms.quix.api
QuixPortalApiService
A light wrapper around the Quix Portal Api. If used in the Quix Platform, it will use that workspaces auth token and portal endpoint, else you must provide it.
Function names closely reflect the respective API endpoint, each starting with the method [GET, POST, etc.] followed by the endpoint path.
Results will be returned in the form of request's Response.json(), unless something else is required. Non-200's will raise exceptions.
See the swagger documentation for more info about the endpoints.
QuixPortalApiService.get_workspace_certificate
Get a workspace TLS certificate if available.
Returns None
if certificate is not specified.
Arguments:
workspace_id
: workspace id, optional
Returns:
certificate as bytes if present, or None
quixstreams.platforms.quix.exceptions
quixstreams.state.rocksdb.serialization
quixstreams.state.rocksdb.windowed
quixstreams.state.rocksdb.windowed.serialization
parse_window_key
Parse the window key from Rocksdb into (message_key, start, end) structure.
Expected window key format:
Arguments:
key
: a key from Rocksdb
Returns:
a tuple with message key, start timestamp, end timestamp
encode_window_key
Encode window start and end timestamps into bytes of the following format:
<start>|<end>
Encoding window keys this way make them sortable in RocksDB within the same prefix.
Arguments:
start_ms
: window start in millisecondsend_ms
: window end in milliseconds
Returns:
window timestamps as bytes
encode_window_prefix
Encode window prefix and start time to iterate over keys in RocksDB
Format:
<prefix>|<start>
Arguments:
prefix
: transaction prefixstart_ms
: window start time in milliseconds
Returns:
bytes
quixstreams.state.rocksdb.windowed.state
WindowedTransactionState
WindowedTransactionState.__init__
A windowed state to be provided into StreamingDataFrame
window functions.
Arguments:
transaction
: instance ofWindowedRocksDBPartitionTransaction
WindowedTransactionState.get_window
Get the value of the window defined by start
and end
timestamps
if the window is present in the state, else default
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsdefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
WindowedTransactionState.update_window
Set a value for the window.
This method will also update the latest observed timestamp in state partition
using the provided timestamp
.
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsvalue
: value of the windowtimestamp_ms
: current message timestamp in milliseconds
WindowedTransactionState.get_latest_timestamp
Get the latest observed timestamp for the current state partition.
Use this timestamp to determine if the arriving event is late and should be discarded from the processing.
Returns:
latest observed event timestamp in milliseconds
WindowedTransactionState.expire_windows
Get a list of expired windows from RocksDB considering the current latest timestamp, window duration and grace period.
It also marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".
quixstreams.state.rocksdb.windowed.metadata
quixstreams.state.rocksdb.windowed.partition
WindowedRocksDBStorePartition
A base class to access windowed state in RocksDB.
It represents a single RocksDB database.
Besides the data, it keeps track of the latest observed timestamp and stores the expiration index to delete expired windows.
Arguments:
path
: an absolute path to the RocksDB folderoptions
: RocksDB options. IfNone
, the default options will be used.
quixstreams.state.rocksdb.windowed.store
WindowedRocksDBStore
RocksDB-based windowed state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
WindowedRocksDBStore.__init__
def __init__(
name: str,
topic: str,
base_dir: str,
changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
options: Optional[RocksDBOptionsType] = None)
Arguments:
name
: a unique store nametopic
: a topic name for this storebase_dir
: path to a directory with the statechangelog_producer_factory
: a ChangelogProducerFactory instance if using changelogsoptions
: RocksDB options. IfNone
, the default options will be used.
quixstreams.state.rocksdb.windowed.transaction
WindowedRocksDBPartitionTransaction
WindowedRocksDBPartitionTransaction.expire_windows
Get a list of expired windows from RocksDB considering latest timestamp,
window size and grace period. It marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".
How it works: - First, it looks for the start time of the last expired window for the current prefix using expiration cache. If it's found, it will be used to reduce the search space and to avoid returning already expired windows. - Then it goes over window segments and fetches the windows that should be expired. - At last, it updates the expiration cache with the start time of the latest found windows
Returns:
sorted list of tuples in format ((start, end), value)
quixstreams.state.rocksdb
quixstreams.state.rocksdb.metadata
quixstreams.state.rocksdb.transaction
RocksDBPartitionTransaction
A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single RocksDB partition.
Serialization
RocksDBTransaction
automatically serializes keys and values to bytes.
Prefixing
RocksDBTransaction
allows to set prefixes for the keys in the given code block
using :meth:with_prefix()
context manager.
Normally, StreamingDataFrame
class will use message keys as prefixes
in order to namespace the stored keys across different messages.
Transactional properties
RocksDBTransaction
uses a combination of in-memory update cache
and RocksDB's WriteBatch in order to accumulate all the state mutations
in a single batch, flush them atomically, and allow the updates be visible
within the transaction before it's flushed (aka "read-your-own-writes" problem).
If any mutation fails during the transaction
(e.g. we failed to write the updates to the RocksDB), the whole transaction
will be marked as failed and cannot be used anymore.
In this case, a new RocksDBTransaction
should be created.
RocksDBTransaction
can be used only once.
RocksDBPartitionTransaction.__init__
Arguments:
partition
: instance ofRocksDBStatePartition
to be used for accessing the underlying RocksDBdumps
: a function to serialize data to bytes.loads
: a function to deserialize data from bytes.
RocksDBPartitionTransaction.with_prefix
A context manager set the prefix for all keys in the scope.
Normally, it's called by Streaming DataFrames engine to ensure that every message key is stored separately.
The with_prefix
calls should not be nested.
Only one prefix can be set at a time.
Arguments:
prefix
: a prefix string to be used. Should be eitherbytes
or object serializable tobytes
bydumps
function. The prefix doesn't need to contain the separator, it will be added automatically between the key and the prefix if the prefix is not empty.
RocksDBPartitionTransaction.get
@_validate_transaction_state
def get(key: Any,
default: Any = None,
cf_name: str = "default") -> Optional[Any]
Get a key from the store.
It first looks up the key in the update cache in case it has been updated but not flushed yet.
It returns None
if the key is not found and default
is not provided.
Arguments:
key
: a key to get from DBdefault
: value to return if the key is not present in the state. It can be of any type.cf_name
: rocksdb column family name. Default - "default"
Returns:
value or default
RocksDBPartitionTransaction.set
Set a key to the store.
It first updates the key in the update cache.
Arguments:
key
: key to store in DBvalue
: value to store in DBcf_name
: rocksdb column family name. Default - "default"
RocksDBPartitionTransaction.delete
Delete a key from the store.
It first deletes the key from the update cache.
Arguments:
key
: key to delete from DBcf_name
: rocksdb column family name. Default - "default"
RocksDBPartitionTransaction.exists
Check if a key exists in the store.
It first looks up the key in the update cache.
Arguments:
key
: a key to check in DBcf_name
: rocksdb column family name. Default - "default"
Returns:
True
if the key exists, False
otherwise.
RocksDBPartitionTransaction.completed
Check if the transaction is completed.
It doesn't indicate whether transaction is successful or not.
Use RocksDBTransaction.failed
for that.
The completed transaction should not be re-used.
Returns:
True
if transaction is completed, False
otherwise.
RocksDBPartitionTransaction.failed
Check if the transaction has failed.
The failed transaction should not be re-used because the update cache and
Returns:
True
if transaction is failed, False
otherwise.
RocksDBPartitionTransaction.maybe_flush
Flush the recent updates to the database and empty the update cache.
It writes the WriteBatch to RocksDB and marks itself as finished.
If writing fails, the transaction will be also marked as "failed" and cannot be used anymore.
NOTE: If no keys have been modified during the transaction (i.e. no "set" or "delete" have been called at least once), it will not flush ANY data to the database including the offset in order to optimize I/O.
Arguments:
offset
: offset of the last processed message, optional.
quixstreams.state.rocksdb.partition
RocksDBStorePartition
A base class to access state in RocksDB.
It represents a single RocksDB database.
Responsibilities: 1. Managing access to the RocksDB instance 2. Creating transactions to interact with data 3. Flushing WriteBatches to the RocksDB 4. Producing state-related changelog messages
It opens the RocksDB on __init__
. If the db is locked by another process,
it will retry according to open_max_retries
and open_retry_backoff
options.
Arguments:
path
: an absolute path to the RocksDB folderoptions
: RocksDB options. IfNone
, the default options will be used.
RocksDBStorePartition.begin
Create a new RocksDBTransaction
object.
Using RocksDBTransaction
is a recommended way for accessing the data.
Returns:
an instance of RocksDBTransaction
RocksDBStorePartition.recover_from_changelog_message
Updates state from a given changelog message.
Arguments:
changelog_message
: A raw Confluent message read from a changelog topic.
RocksDBStorePartition.set_changelog_offset
Set the changelog offset based on a message (usually an "offset-only" message).
Used during recovery.
Arguments:
changelog_offset
: A changelog offset
RocksDBStorePartition.produce_to_changelog
def produce_to_changelog(key: bytes,
value: Optional[bytes] = None,
headers: Optional[MessageHeadersMapping] = None)
Produce a message to the StorePartitions respective changelog.
RocksDBStorePartition.write
Write WriteBatch
to RocksDB
Arguments:
batch
: an instance ofrocksdict.WriteBatch
RocksDBStorePartition.get
Get a key from RocksDB.
Arguments:
key
: a key encoded tobytes
default
: a default value to return if the key is not found.cf_name
: rocksdb column family name. Default - "default"
Returns:
a value if the key is present in the DB. Otherwise, default
RocksDBStorePartition.exists
Check if a key is present in the DB.
Arguments:
key
: a key encoded tobytes
.cf_name
: rocksdb column family name. Default - "default"
Returns:
True
if the key is present, False
otherwise.
RocksDBStorePartition.get_processed_offset
Get last processed offset for the given partition
Returns:
offset or None
if there's no processed offset yet
RocksDBStorePartition.get_changelog_offset
Get offset that the changelog is up-to-date with.
Returns:
offset or None
if there's no processed offset yet
RocksDBStorePartition.close
Close the underlying RocksDB
RocksDBStorePartition.path
Absolute path to RocksDB database folder
Returns:
file path
RocksDBStorePartition.destroy
Delete underlying RocksDB database
The database must be closed first.
Arguments:
path
: an absolute path to the RocksDB folder
RocksDBStorePartition.get_column_family_handle
Get a column family handle to pass to it WriteBatch.
This method will cache the CF handle instance to avoid creating them repeatedly.
Arguments:
cf_name
: column family name
Returns:
instance of rocksdict.ColumnFamily
RocksDBStorePartition.get_column_family
Get a column family instance.
This method will cache the CF instance to avoid creating them repeatedly.
Arguments:
cf_name
: column family name
Returns:
instance of rocksdict.Rdict
for the given column family
quixstreams.state.rocksdb.store
RocksDBStore
RocksDB-based state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
RocksDBStore.__init__
def __init__(
name: str,
topic: str,
base_dir: str,
changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
options: Optional[options_type] = None)
Arguments:
name
: a unique store nametopic
: a topic name for this storebase_dir
: path to a directory with the statechangelog_producer_factory
: a ChangelogProducerFactory instance if using changelogsoptions
: RocksDB options. IfNone
, the default options will be used.
RocksDBStore.topic
Store topic name
RocksDBStore.name
Store name
RocksDBStore.partitions
Mapping of assigned store partitions
RocksDBStore.assign_partition
Open and assign store partition.
If the partition is already assigned, it will not re-open it and return the existing partition instead.
Arguments:
partition
: partition number
Returns:
instance ofRocksDBStorePartition
RocksDBStore.revoke_partition
Revoke and close the assigned store partition.
If the partition is not assigned, it will log the message and return.
Arguments:
partition
: partition number
RocksDBStore.start_partition_transaction
Start a new partition transaction.
RocksDBPartitionTransaction
is the primary interface for working with data in
the underlying RocksDB.
Arguments:
partition
: partition number
Returns:
instance of RocksDBPartitionTransaction
RocksDBStore.close
Close the store and revoke all assigned partitions
quixstreams.state.rocksdb.exceptions
quixstreams.state.rocksdb.types
quixstreams.state.rocksdb.options
RocksDBOptions
RocksDB database options.
Arguments:
dumps
: function to dump data to JSONloads
: function to load data from JSONopen_max_retries
: number of times to retry opening the database if it's locked by another process. To disable retrying, pass 0open_retry_backoff
: number of seconds to wait between each retry. Please seerocksdict.Options
for a complete description of other options.
RocksDBOptions.to_options
Convert parameters to rocksdict.Options
Returns:
instance of rocksdict.Options
quixstreams.state.state
TransactionState
TransactionState.__init__
Simple key-value state to be provided into StreamingDataFrame
functions
Arguments:
transaction
: instance ofPartitionTransaction
TransactionState.get
Get the value for key if key is present in the state, else default
Arguments:
key
: keydefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
TransactionState.set
Set value for the key.
Arguments:
key
: keyvalue
: value
TransactionState.delete
Delete value for the key.
This function always returns None
, even if value is not found.
Arguments:
key
: key
TransactionState.exists
Check if the key exists in state.
Arguments:
key
: key
Returns:
True if key exists, False otherwise
quixstreams.state
quixstreams.state.manager
StateStoreManager
Class for managing state stores and partitions.
StateStoreManager is responsible for: - reacting to rebalance callbacks - managing the individual state stores - providing access to store transactions
StateStoreManager.stores
Map of registered state stores
Returns:
dict in format {topic: {store_name: store}}
StateStoreManager.recovery_required
Whether recovery needs to be done.
StateStoreManager.using_changelogs
Whether the StateStoreManager is using changelog topics
Returns:
using changelogs, as bool
StateStoreManager.do_recovery
Perform a state recovery, if necessary.
StateStoreManager.stop_recovery
Stop recovery (called during app shutdown).
StateStoreManager.get_store
Get a store for given name and topic
Arguments:
topic
: topic namestore_name
: store name
Returns:
instance of Store
StateStoreManager.register_store
Register a state store to be managed by StateStoreManager.
During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.
Each store can be registered only once for each topic.
Arguments:
topic_name
: topic namestore_name
: store name
StateStoreManager.register_windowed_store
Register a windowed state store to be managed by StateStoreManager.
During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.
Each window store can be registered only once for each topic.
Arguments:
topic_name
: topic namestore_name
: store name
StateStoreManager.clear_stores
Delete all state stores managed by StateStoreManager.
StateStoreManager.on_partition_assign
Assign store partitions for each registered store for the given TopicPartition
and return a list of assigned StorePartition
objects.
Arguments:
tp
:TopicPartition
from Kafka consumer
Returns:
list of assigned StorePartition
StateStoreManager.on_partition_revoke
Revoke store partitions for each registered store for the given TopicPartition
Arguments:
tp
:TopicPartition
from Kafka consumer
StateStoreManager.on_partition_lost
Revoke and close store partitions for each registered store for the given
TopicPartition
Arguments:
tp
:TopicPartition
from Kafka consumer
StateStoreManager.init
Initialize StateStoreManager
and create a store directory
StateStoreManager.close
Close all registered stores
StateStoreManager.get_store_transaction
Get active PartitionTransaction
for the store
Arguments:
store_name
:
StateStoreManager.start_store_transaction
@contextlib.contextmanager
def start_store_transaction(topic: str, partition: int,
offset: int) -> Iterator["_MultiStoreTransaction"]
Starting the multi-store transaction for the Kafka message.
This transaction will keep track of all used stores and flush them in the end. If any exception is caught during this transaction, none of them will be flushed as a best effort to keep stores consistent in "at-least-once" setting.
There can be only one active transaction at a time. Starting a new transaction before the end of the current one will fail.
Arguments:
topic
: message topicpartition
: message partitionoffset
: message offset
quixstreams.state.recovery
RecoveryPartition
A changelog topic partition mapped to a respective StorePartition
with helper
methods to determine its current recovery status.
Since StorePartition
s do recovery directly, it also handles recovery transactions.
RecoveryPartition.offset
Get the changelog offset from the underlying StorePartition
.
Returns:
changelog offset (int)
RecoveryPartition.needs_recovery
Determine whether recovery is necessary for underlying StorePartition
.
RecoveryPartition.needs_offset_update
Determine if an offset update is required.
Usually checked during assign if recovery was not required.
RecoveryPartition.update_offset
Update only the changelog offset of a StorePartition.
RecoveryPartition.recover_from_changelog_message
Recover the StorePartition using a message read from its respective changelog.
Arguments:
changelog_message
: A confluent kafka message (everything as bytes)
RecoveryPartition.set_watermarks
Set the changelog watermarks as gathered from Consumer.get_watermark_offsets()
Arguments:
lowwater
: topic partition lowwaterhighwater
: topic partition highwater
ChangelogProducerFactory
Generates ChangelogProducers, which produce changelog messages to a StorePartition.
ChangelogProducerFactory.__init__
Arguments:
changelog_name
: changelog topic nameproducer
: a RowProducer (not shared withApplication
instance)
Returns:
a ChangelogWriter instance
ChangelogProducerFactory.get_partition_producer
Generate a ChangelogProducer for producing to a specific partition number
(and thus StorePartition).
Arguments:
partition_num
: source topic partition number
ChangelogProducer
Generated for a StorePartition
to produce state changes to its respective
kafka changelog partition.
ChangelogProducer.__init__
Arguments:
changelog_name
: A changelog topic namepartition_num
: source topic partition numberproducer
: a RowProducer (not shared withApplication
instance)
ChangelogProducer.produce
def produce(key: bytes,
value: Optional[bytes] = None,
headers: Optional[MessageHeadersMapping] = None)
Produce a message to a changelog topic partition.
Arguments:
key
: message key (same as state key, including prefixes)value
: message value (same as state value)headers
: message headers (includes column family info)
RecoveryManager
Manages all consumer-related aspects of recovery, including: - assigning/revoking, pausing/resuming topic partitions (especially changelogs) - consuming changelog messages until state is updated fully.
Also tracks/manages RecoveryPartitions
, which are assigned/tracked only if
recovery for that changelog partition is required.
Recovery is attempted from the Application
after any new partition assignment.
RecoveryManager.has_assignments
Whether the Application has assigned RecoveryPartitions
Returns:
has assignments, as bool
RecoveryManager.recovering
Whether the Application is currently recovering
Returns:
is recovering, as bool
RecoveryManager.register_changelog
Register a changelog Topic with the TopicManager.
Arguments:
topic_name
: source topic namestore_name
: name of the storeconsumer_group
: name of the consumer group
RecoveryManager.do_recovery
If there are any active RecoveryPartitions, do a recovery procedure.
After, will resume normal Application
processing.
RecoveryManager.assign_partition
def assign_partition(topic_name: str, partition_num: int,
store_partitions: Dict[str, StorePartition])
Assigns StorePartition
s (as RecoveryPartition
s) ONLY IF recovery required.
Pauses active consumer partitions as needed.
RecoveryManager.revoke_partition
revoke ALL StorePartitions (across all Stores) for a given partition number
Arguments:
partition_num
: partition number of source topic
quixstreams.state.exceptions
quixstreams.state.types
Store
Abstract state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
Store.topic
Topic name
Store.name
Store name
Store.partitions
Mapping of assigned store partitions
Returns:
dict of "{partition:
Store.assign_partition
Assign new store partition
Arguments:
partition
: partition number
Returns:
instance of StorePartition
Store.revoke_partition
Revoke assigned store partition
Arguments:
partition
: partition number
Store.start_partition_transaction
Start a new partition transaction.
PartitionTransaction
is the primary interface for working with data in Stores.
Arguments:
partition
: partition number
Returns:
instance of PartitionTransaction
Store.close
Close store and revoke all store partitions
StorePartition
A base class to access state in the underlying storage. It represents a single instance of some storage (e.g. a single database for the persistent storage).
StorePartition.path
Absolute path to RocksDB database folder
StorePartition.begin
State new PartitionTransaction
StorePartition.recover_from_changelog_message
Updates state from a given changelog message.
Arguments:
changelog_message
: A raw Confluent message read from a changelog topic.
StorePartition.produce_to_changelog
def produce_to_changelog(key: bytes,
value: Optional[bytes] = None,
headers: Optional[MessageHeadersMapping] = None)
Produce a message to the StorePartitions respective changelog.
StorePartition.get_processed_offset
Get last processed offset for the given partition
Returns:
offset or None
if there's no processed offset yet
StorePartition.get_changelog_offset
Get offset that the changelog is up-to-date with.
Returns:
offset or None
if there's no processed offset yet
StorePartition.set_changelog_offset
Set the changelog offset based on a message (usually an "offset-only" message).
Used during recovery.
Arguments:
changelog_offset
: A changelog offset
State
Primary interface for working with key-value state data from StreamingDataFrame
State.get
Get the value for key if key is present in the state, else default
Arguments:
key
: keydefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
State.set
Set value for the key.
Arguments:
key
: keyvalue
: value
State.delete
Delete value for the key.
This function always returns None
, even if value is not found.
Arguments:
key
: key
State.exists
Check if the key exists in state.
Arguments:
key
: key
Returns:
True if key exists, False otherwise
PartitionTransaction
A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single storage partition.
PartitionTransaction.state
An instance of State to be provided to StreamingDataFrame
functions
PartitionTransaction.failed
Return True
if transaction failed to update data at some point.
Failed transactions cannot be re-used.
Returns:
bool
PartitionTransaction.completed
Return True
if transaction is completed.
Completed transactions cannot be re-used.
Returns:
bool
PartitionTransaction.with_prefix
A context manager set the prefix for all keys in the scope.
Normally, it's called by StreamingDataFrame
internals to ensure that every
message key is stored separately.
Arguments:
prefix
: key prefix
Returns:
context manager
PartitionTransaction.maybe_flush
Flush the recent updates and last processed offset to the storage.
Arguments:
offset
: offset of the last processed message, optional.
WindowedState
A windowed state to be provided into StreamingDataFrame
window functions.
WindowedState.get_window
Get the value of the window defined by start
and end
timestamps
if the window is present in the state, else default
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsdefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
WindowedState.update_window
Set a value for the window.
This method will also update the latest observed timestamp in state partition
using the provided timestamp
.
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsvalue
: value of the windowtimestamp_ms
: current message timestamp in milliseconds
WindowedState.get_latest_timestamp
Get the latest observed timestamp for the current state partition.
Use this timestamp to determine if the arriving event is late and should be discarded from the processing.
Returns:
latest observed event timestamp in milliseconds
WindowedState.expire_windows
Get a list of expired windows from RocksDB considering the current
latest timestamp, window duration and grace period.
It also marks the latest found window as expired in the expiration index, so calling this method multiple times will yield different results for the same "latest timestamp".
Arguments:
duration_ms
: duration of the windows in millisecondsgrace_ms
: grace period in milliseconds. Default - "0"
WindowedPartitionTransaction
WindowedPartitionTransaction.failed
Return True
if transaction failed to update data at some point.
Failed transactions cannot be re-used.
Returns:
bool
WindowedPartitionTransaction.completed
Return True
if transaction is completed.
Completed transactions cannot be re-used.
Returns:
bool
WindowedPartitionTransaction.with_prefix
A context manager set the prefix for all keys in the scope.
Normally, it's called by StreamingDataFrame
internals to ensure that every
message key is stored separately.
Arguments:
prefix
: key prefix
Returns:
context manager
WindowedPartitionTransaction.maybe_flush
Flush the recent updates and last processed offset to the storage.
Arguments:
offset
: offset of the last processed message, optional.
PartitionRecoveryTransaction
A class for managing recovery for a StorePartition from a changelog message
PartitionRecoveryTransaction.flush
Flush the recovery update and last processed offset to the storage.
quixstreams.utils
quixstreams.utils.json
dumps
Serialize to JSON using orjson
package.
Arguments:
value
: value to serialize to JSON
Returns:
bytes
loads
Deserialize from JSON using orjson
package.
Main differences:
- It returns bytes
- It doesn't allow non-str keys in dictionaries
Arguments:
value
: value to deserialize from
Returns:
object
quixstreams.utils.dicts
dict_values
Recursively unpacks a set of nested dicts to get a flattened list of leaves,
where "leaves" are the first non-dict item.
i.e {"a": {"b": {"c": 1}, "d": 2}, "e": 3} becomes [1, 2, 3]
Arguments:
d
: initially, a dict (with potentially nested dicts)
Returns:
a list with all the leaves of the various contained dicts
quixstreams.types
quixstreams.logging
configure_logging
Configure "quixstreams" logger.
NOTE: If "quixstreams" logger already has pre-defined handlers (e.g. logging has already been configured via
logging
, or the function is called twice), it will skip configuration and returnFalse
.
Arguments:
loglevel
: a valid log level as a string or None. If None passed, this function is no-op and no logging will be configured.
Returns:
True if logging config has been updated, otherwise False.
quixstreams.context
set_message_context
Set a MessageContext for the current message in the given contextvars.Context
NOTE: This is for advanced usage only. If you need to change the message key,
StreamingDataFrame.to_topic()
has an argument for it.
Example Snippet:
from quixstreams import Application, set_message_context, message_context
# Changes the current sdf value based on what the message partition is.
def alter_context(value):
context = message_context()
if value > 1:
context.headers = context.headers + (b"cool_new_header", value.encode())
set_message_context(context)
app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))
Arguments:
context
: instance ofMessageContext
message_context
Get a MessageContext for the current message, which houses most of the message
metadata, like: - key - timestamp - partition - offset
Example Snippet:
from quixstreams import Application, message_context
# Changes the current sdf value based on what the message partition is.
app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)
Returns:
instance of MessageContext
message_key
Get the current message's key.
Example Snippet:
from quixstreams import Application, message_key
# Changes the current sdf value based on what the message key is.
app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_key() == b'1' else 0)
Returns:
a deserialized message key
quixstreams.rowconsumer
RowConsumer
RowConsumer.__init__
def __init__(broker_address: str,
consumer_group: str,
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
assignment_strategy: AssignmentStrategy = "range",
on_commit: Callable[[Optional[KafkaError], List[TopicPartition]],
None] = None,
extra_config: Optional[dict] = None,
on_error: Optional[ConsumerErrorCallback] = None)
A consumer class that is capable of deserializing Kafka messages to Rows
according to the Topics deserialization settings.
It overrides .subscribe()
method of Consumer class to accept Topic
objects instead of strings.
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Consumer
.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:- "earliest" - automatically reset the offset to the smallest offset
- "latest" - automatically reset the offset to the largest offset
auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.assignment_strategy
: The name of a partition assignment strategy. Available values: "range", "roundrobin", "cooperative-sticky".on_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.on_error
: a callback triggered whenRowConsumer.poll_row
fails. If consumer fails and the callback returnsTrue
, the exception will be logged but not propagated. The default callback logs an exception and returnsFalse
.
RowConsumer.subscribe
def subscribe(topics: List[Topic],
on_assign: Optional[RebalancingCallback] = None,
on_revoke: Optional[RebalancingCallback] = None,
on_lost: Optional[RebalancingCallback] = None)
Set subscription to supplied list of topics.
This replaces a previous subscription.
This method also updates the internal mapping with topics that is used to deserialize messages to Rows.
Arguments:
topics
: list ofTopic
instances to subscribe to.on_assign
(callable
): callback to provide handling of customized offsets on completion of a successful partition re-assignment.on_revoke
(callable
): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.on_lost
(callable
): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
RowConsumer.poll_row
Consumes a single message and deserialize it to Row or a list of Rows.
The message is deserialized according to the corresponding Topic.
If deserializer raises IgnoreValue
exception, this method will return None.
If Kafka returns an error, it will be raised as exception.
Arguments:
timeout
: poll timeout seconds
Returns:
single Row, list of Rows or None
quixstreams.rowproducer
RowProducer
A producer class that is capable of serializing Rows to bytes and send them to Kafka.
The serialization is performed according to the Topic serialization settings.
It overrides `.subscribe()` method of Consumer class to accept `Topic`
objects instead of strings.
:param broker_address: Kafka broker host and port in format `<host>:<port>`.
Passed as `bootstrap.servers` to `confluent_kafka.Producer`.
:param partitioner: A function to be used to determine the outgoing message
partition.
Available values: "random", "consistent_random", "murmur2", "murmur2_random",
"fnv1a", "fnv1a_random"
Default - "murmur2".
:param extra_config: A dictionary with additional options that
will be passed to `confluent_kafka.Producer` as is.
Note: values passed as arguments override values in `extra_config`.
:param on_error: a callback triggered when `RowProducer.produce_row()`
or `RowProducer.poll()` fail`.
If producer fails and the callback returns `True`, the exception
will be logged but not propagated.
The default callback logs an exception and returns `False`.
RowProducer.produce_row
def produce_row(row: Row,
topic: Topic,
key: Optional[Any] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None)
Serialize Row to bytes according to the Topic serialization settings
and produce it to Kafka
If this method fails, it will trigger the provided "on_error" callback.
Arguments:
row
: Row objecttopic
: Topic objectkey
: message key, optionalpartition
: partition number, optionaltimestamp
: timestamp in milliseconds, optional
RowProducer.poll
Polls the producer for events and calls on_delivery
callbacks.
If poll fails, it will trigger the provided "on_error" callback
Arguments:
timeout
: timeout in seconds
quixstreams.app
Application
The main Application class.
Typically, the primary object needed to get a kafka application up and running.
Most functionality is explained the various methods, except for "column assignment".
What it Does:
- On init:
- Provides defaults or helper methods for commonly needed objects
- If
quix_sdk_token
is passed, configures the app to use the Quix Cloud.
- When executed via
.run()
(after setup):- Initializes Topics and StreamingDataFrames
- Facilitates processing of Kafka messages with a
StreamingDataFrame
- Handles all Kafka client consumer/producer responsibilities.
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value))
app.run(dataframe=df)
Application.__init__
def __init__(broker_address: Optional[str] = None,
quix_sdk_token: Optional[str] = None,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
state_dir: str = "state",
rocksdb_options: Optional[RocksDBOptionsType] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
on_message_processed: Optional[MessageProcessedCallback] = None,
consumer_poll_timeout: float = 1.0,
producer_poll_timeout: float = 0.0,
loglevel: Optional[LogLevel] = "INFO",
auto_create_topics: bool = True,
use_changelog_topics: bool = True,
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
topic_manager: Optional[TopicManager] = None)
Arguments:
broker_address
: Kafka broker host and port in format<host>:<port>
. Passed asbootstrap.servers
toconfluent_kafka.Consumer
. Either this ORquix_sdk_token
must be set to useApplication
(not both). Linked Environment Variable:Quix__Broker__Address
. Default:None
quix_sdk_token
: If using the Quix Cloud, the SDK token to connect with. Either this ORbroker_address
must be set to use Application (not both). Linked Environment Variable:Quix__Sdk__Token
. Default: None (if not run on Quix Cloud)NOTE: the environment variable is set for you in the Quix Cloud
consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
. Linked Environment Variable:Quix__Consumer__Group
. Default - "quixstreams-default" (set during init)NOTE: Quix Applications will prefix it with the Quix workspace id.
auto_offset_reset
: Consumerauto.offset.reset
settingauto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.partitioner
: A function to be used to determine the outgoing message partition.consumer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is.producer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is.state_dir
: path to the application state directory. Default -".state"
.rocksdb_options
: RocksDB options. IfNone
, the default options will be used.consumer_poll_timeout
: timeout forRowConsumer.poll()
. Default -1.0
sproducer_poll_timeout
: timeout forRowProducer.poll()
. Default -0
s.on_message_processed
: a callback triggered when message is successfully processed.loglevel
: a log level for "quixstreams" logger. Should be a string or None. IfNone
is passed, no logging will be configured. You may passNone
and configure "quixstreams" logger externally usinglogging
library. Default -"INFO"
.auto_create_topics
: Create allTopic
s made via Application.topic() Default -True
use_changelog_topics
: Use changelog topics to back stateful operations Default -True
topic_manager
: ATopicManager
instance
Error Handlers
To handle errors,Application
accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returnsTrue
, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop.on_consumer_error
: triggered when internalRowConsumer
fails to poll Kafka or cannot deserialize a message.on_processing_error
: triggered when exception is raised withinStreamingDataFrame.process()
.on_producer_error
: triggered whenRowProducer
fails to serialize or to produce a message to Kafka.
Quix Cloud Parametersquix_config_builder
: instance ofQuixKafkaConfigsBuilder
to be used instead of the default one.NOTE: It is recommended to just use
quix_sdk_token
instead.
Application.Quix
@classmethod
def Quix(cls,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
state_dir: str = "state",
rocksdb_options: Optional[RocksDBOptionsType] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
on_message_processed: Optional[MessageProcessedCallback] = None,
consumer_poll_timeout: float = 1.0,
producer_poll_timeout: float = 0.0,
loglevel: Optional[LogLevel] = "INFO",
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
auto_create_topics: bool = True,
use_changelog_topics: bool = True,
topic_manager: Optional[QuixTopicManager] = None) -> Self
NOTE: DEPRECATED: use Application with
quix_sdk_token
argument instead.
Initialize an Application to work with Quix Cloud, assuming environment is properly configured (by default in Quix Cloud).
It takes the credentials from the environment and configures consumer and producer to properly connect to the Quix Cloud.
NOTE: Quix Cloud requires
consumer_group
and topic names to be prefixed with workspace id. If the application is created viaApplication.Quix()
, the real consumer group will be<workspace_id>-<consumer_group>
, and the real topic names will be<workspace_id>-<topic_name>
.
Example Snippet:
from quixstreams import Application
# Set up an `app = Application.Quix` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything. Also shows off how to
# use the quix-specific serializers and deserializers.
app = Application.Quix()
input_topic = app.topic("topic-in", value_deserializer="quix")
output_topic = app.topic("topic-out", value_serializer="quix_timeseries")
df = app.dataframe(topic_in)
df = df.to_topic(output_topic)
app.run(dataframe=df)
Arguments:
consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
. Linked Environment Variable:Quix__Consumer__Group
. Default - "quixstreams-default" (set during init).NOTE: Quix Applications will prefix it with the Quix workspace id.
auto_offset_reset
: Consumerauto.offset.reset
settingauto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.partitioner
: A function to be used to determine the outgoing message partition.consumer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is.producer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is.state_dir
: path to the application state directory. Default -".state"
.rocksdb_options
: RocksDB options. IfNone
, the default options will be used.consumer_poll_timeout
: timeout forRowConsumer.poll()
. Default -1.0
sproducer_poll_timeout
: timeout forRowProducer.poll()
. Default -0
s.on_message_processed
: a callback triggered when message is successfully processed.loglevel
: a log level for "quixstreams" logger. Should be a string orNone
. IfNone
is passed, no logging will be configured. You may passNone
and configure "quixstreams" logger externally usinglogging
library. Default -"INFO"
.auto_create_topics
: Create allTopic
s made viaApplication.topic()
Default -True
use_changelog_topics
: Use changelog topics to back stateful operations Default -True
topic_manager
: AQuixTopicManager
instance
Error Handlers
To handle errors,Application
accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returnsTrue
, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop.on_consumer_error
: triggered when internalRowConsumer
fails to poll Kafka or cannot deserialize a message.on_processing_error
: triggered when exception is raised withinStreamingDataFrame.process()
.on_producer_error
: triggered when RowProducer fails to serialize or to produce a message to Kafka.
Quix Cloud Parametersquix_config_builder
: instance ofQuixKafkaConfigsBuilder
to be used instead of the default one.
Returns:
Application
object
Application.topic
def topic(name: str,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes",
value_serializer: SerializerType = "json",
key_serializer: SerializerType = "bytes",
config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
Create a topic definition.
Allows you to specify serialization that should be used when consuming/producing to the topic in the form of a string name (i.e. "json" for JSON) or a serialization class instance directly, like JSONSerializer().
Example Snippet:
from quixstreams import Application
# Specify an input and output topic for a `StreamingDataFrame` instance,
# where the output topic requires adjusting the key serializer.
app = Application()
input_topic = app.topic("input-topic", value_deserializer="json")
output_topic = app.topic(
"output-topic", key_serializer="str", value_serializer=JSONSerializer()
)
sdf = app.dataframe(input_topic)
sdf.to_topic(output_topic)
Arguments:
name
: topic nameNOTE: If the application is created via
Quix.Application()
, the topic name will be prefixed by Quix workspace id, and it will be<workspace_id>-<name>
value_deserializer
: a deserializer type for values; default="json"key_deserializer
: a deserializer type for keys; default="bytes"value_serializer
: a serializer type for values; default="json"key_serializer
: a serializer type for keys; default="bytes"config
: optional topic configurations (for creation/validation)NOTE: will not create without Application's auto_create_topics set to True (is True by default)
timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message. Default -None
.
Example Snippet:
app = Application(...)
def custom_ts_extractor(
value: Any,
headers: Optional[List[Tuple[str, bytes]]],
timestamp: float,
timestamp_type: TimestampType,
) -> int:
return value["timestamp"]
topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
Returns:
Topic
object
Application.dataframe
A simple helper method that generates a StreamingDataFrame
, which is used
to define your message processing pipeline.
See :class:quixstreams.dataframe.StreamingDataFrame
for more details.
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)
app.run(dataframe=df)
Arguments:
topic
: aquixstreams.models.Topic
instance to be used as an input topic.
Returns:
StreamingDataFrame
object
Application.stop
Stop the internal poll loop and the message processing.
Only necessary when manually managing the lifecycle of the Application
(
likely through some sort of threading).
To otherwise stop an application, either send a SIGTERM
to the process
(like Kubernetes does) or perform a typical KeyboardInterrupt
(Ctrl+C
).
Application.get_producer
Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application.
It's useful for producing data to Kafka outside the standard Application processing flow, (e.g. to produce test data into a topic). Using this within the StreamingDataFrame functions is not recommended, as it creates a new Producer instance each time, which is not optimized for repeated use in a streaming pipeline.
Example Snippet:
from quixstreams import Application
app = Application.Quix(...)
topic = app.topic("input")
with app.get_producer() as producer:
for i in range(100):
producer.produce(topic=topic.name, key=b"key", value=b"value")
Application.get_consumer
Create and return a pre-configured Consumer instance. The Consumer is initialized with params passed to Application.
It's useful for consuming data from Kafka outside the standard Application processing flow. (e.g. to consume test data from a topic). Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance each time, which is not optimized for repeated use in a streaming pipeline.
Note: By default this consumer does not autocommit consumed offsets to allow exactly-once processing.
To store the offset call store_offsets() after processing a message.
If autocommit is necessary set enable.auto.offset.store
to True in the consumer config when creating the app.
Example Snippet:
from quixstreams import Application
app = Application.Quix(...)
topic = app.topic("input")
with app.get_consumer() as consumer:
consumer.subscribe([topic.name])
while True:
msg = consumer.poll(timeout=1.0)
if msg is not None:
# Process message
# Optionally commit the offset
# consumer.store_offsets(msg)
Application.clear_state
Clear the state of the application.
Application.run
Start processing data from Kafka using provided StreamingDataFrame
One started, can be safely terminated with a SIGTERM
signal
(like Kubernetes does) or a typical KeyboardInterrupt
(Ctrl+C
).
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)
app.run(dataframe=df)
Arguments:
dataframe
: instance ofStreamingDataFrame