Skip to content

StreamingDataFrame API

quixstreams.dataframe.dataframe

StreamingDataFrame

class StreamingDataFrame(BaseStreaming)

[VIEW SOURCE]

StreamingDataFrame is the main object you will use for ETL work.

Typically created with an app = quixstreams.app.Application() instance, via sdf = app.dataframe().


What it Does:

  • Builds a data processing pipeline, declaratively (not executed immediately)
    • Executes this pipeline on inputs at runtime (Kafka message values)
  • Provides functions/interface similar to Pandas Dataframes/Series
  • Enables stateful processing (and manages everything related to it)


How to Use:

Define various operations while continuously reassigning to itself (or new fields).

These operations will generally transform your data, access/update state, or produce to kafka topics.

We recommend your data structure to be "columnar" (aka a dict/JSON) in nature so that it works with the entire interface, but simple types like ints, str, etc. are also supported.

See the various methods and classes for more specifics, or for a deep dive into usage, see streamingdataframe.md under the docs/ folder.

NOTE: column referencing like sdf["a_column"] and various methods often create other object types (typically quixstreams.dataframe.StreamingSeries), which is expected; type hinting should alert you to any issues should you attempt invalid operations with said objects (however, we cannot infer whether an operation is valid with respect to your data!).


Example Snippet:

sdf = StreamingDataframe()
sdf = sdf.apply(a_func)
sdf = sdf.filter(another_func)
sdf = sdf.to_topic(topic_obj)



StreamingDataFrame.apply

def apply(func: Union[
    ApplyCallback,
    ApplyCallbackStateful,
    ApplyWithMetadataCallback,
    ApplyWithMetadataCallbackStateful,
],
          *,
          stateful: bool = False,
          expand: bool = False,
          metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to transform the value and return a new value.

The result will be passed downstream as an input value.


Example Snippet:

# This stores a string in state and capitalizes every column with a string value.
# A second apply then keeps only the string value columns (shows non-stateful).
def func(d: dict, state: State):
    value = d["store_field"]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return {k: v.upper() if isinstance(v, str) else v for k, v in d.items()}

sdf = StreamingDataframe()
sdf = sdf.apply(func, stateful=True)
sdf = sdf.apply(lambda d: {k: v for k,v in d.items() if isinstance(v, str)})


Arguments:

  • func: a function to apply
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • expand: if True, expand the returned iterable into individual values downstream. If returned value is not iterable, TypeError will be raised. Default - False.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.



StreamingDataFrame.update

def update(func: Union[
    UpdateCallback,
    UpdateCallbackStateful,
    UpdateWithMetadataCallback,
    UpdateWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to mutate value in-place or to perform a side effect

(e.g., printing a value to the console).

The result of the function will be ignored, and the original value will be passed downstream.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).


Example Snippet:

# Stores a value and mutates a list by appending a new item to it.
# Also prints to console.

def func(values: list, state: State):
    value = values[0]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    values.append("new_item")

sdf = StreamingDataframe()
sdf = sdf.update(func, stateful=True)
# does not require reassigning
sdf.update(lambda v: v.append(1))


Arguments:

  • func: function to update value
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.


Returns:

the updated StreamingDataFrame instance (reassignment NOT required).



StreamingDataFrame.filter

def filter(func: Union[
    FilterCallback,
    FilterCallbackStateful,
    FilterWithMetadataCallback,
    FilterWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Filter value using provided function.

If the function returns True-like value, the original value will be passed downstream.


Example Snippet:

# Stores a value and allows further processing only if the value is greater than
# what was previously stored.

def func(d: dict, state: State):
    value = d["my_value"]
    if value > state.get("my_store_key"):
        state.set("my_store_key") = value
        return True
    return False

sdf = StreamingDataframe()
sdf = sdf.filter(func, stateful=True)


Arguments:

  • func: function to filter value
  • stateful: if True, the function will be provided with second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.



StreamingDataFrame.group_by

def group_by(key: Union[str, Callable[[Any], Any]],
             name: Optional[str] = None,
             value_deserializer: Optional[DeserializerType] = "json",
             key_deserializer: Optional[DeserializerType] = "json",
             value_serializer: Optional[SerializerType] = "json",
             key_serializer: Optional[SerializerType] = "json") -> Self

[VIEW SOURCE]

"Groups" messages by re-keying them via the provided group_by operation

on their message values.

This enables things like aggregations on messages with non-matching keys.

You can provide a column name (uses the column's value) or a custom function to generate this new key.

.groupby() can only be performed once per StreamingDataFrame instance.

NOTE: group_by generates a topic that copies the original topic's settings.


Example Snippet:

# We have customer purchase events where the message key is the "store_id",
# but we want to calculate sales per customer (by "customer_account_id").

def func(d: dict, state: State):
    current_total = state.get("customer_sum", 0)
    new_total = current_total + d["customer_spent"]
    state.set("customer_sum", new_total)
    d["customer_total"] = new_total
    return d

sdf = StreamingDataframe()
sdf = sdf.group_by("customer_account_id")
sdf = sdf.apply(func, stateful=True)


Arguments:

  • key: how the new key should be generated from the message value; requires a column name (string) or a callable that takes the message value.
  • name: a name for the op (must be unique per group-by), required if key is a custom callable.
  • value_deserializer: a deserializer type for values; default - JSON
  • key_deserializer: a deserializer type for keys; default - JSON
  • value_serializer: a serializer type for values; default - JSON
  • key_serializer: a serializer type for keys; default - JSON


Returns:

a clone with this operation added (assign to keep its effect).



StreamingDataFrame.contains

@staticmethod
def contains(key: str) -> StreamingSeries

[VIEW SOURCE]

Check if the key is present in the Row value.


Example Snippet:

# Add new column 'has_column' which contains a boolean indicating
# the presence of 'column_x'

sdf = StreamingDataframe()
sdf['has_column'] = sdf.contains('column_x')


Arguments:

  • key: a column name to check.


Returns:

a Column object that evaluates to True if the key is present or False otherwise.



StreamingDataFrame.to_topic

def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self

[VIEW SOURCE]

Produce current value to a topic. You can optionally specify a new key.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).


Example Snippet:

from quixstreams import Application

# Produce to two different topics, changing the key for one of them.

app = Application()
input_topic = app.topic("input_x")
output_topic_0 = app.topic("output_a")
output_topic_1 = app.topic("output_b")

sdf = app.dataframe(input_topic)
sdf = sdf.to_topic(output_topic_0)
# does not require reassigning
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])


Arguments:

  • topic: instance of Topic
  • key: a callable to generate a new message key, optional. If passed, the return type of this callable must be serializable by key_serializer defined for this Topic object. By default, the current message key will be used.


Returns:

the updated StreamingDataFrame instance (reassignment NOT required).



StreamingDataFrame.set_timestamp

def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self

[VIEW SOURCE]

Set a new timestamp based on the current message value and its metadata.

The new timestamp will be used in windowed aggregations and when producing messages to the output topics.

The new timestamp must be in milliseconds to conform Kafka requirements.


Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's timestamp based on the value
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: value['new_timestamp'])


Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new timestamp as integer in milliseconds.


Returns:

a new StreamingDataFrame instance



StreamingDataFrame.set_headers

def set_headers(
    func: Callable[
        [Any, Any, int, List[Tuple[str, HeaderValue]]],
        Collection[Tuple[str, HeaderValue]],
    ]
) -> Self

[VIEW SOURCE]

Set new message headers based on the current message value and metadata.

The new headers will be used when producing messages to the output topics.

The provided callback must accept value, key, timestamp, and headers, and return a new collection of (header, value) tuples.


Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's headers based on the value and metadata
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [('id', value['id'])])


Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new set of headers as a collection of (header, value) tuples.


Returns:

a new StreamingDataFrame instance



StreamingDataFrame.print

def print(pretty: bool = True, metadata: bool = False) -> Self

[VIEW SOURCE]

Print out the current message value (and optionally, the message metadata) to

stdout (console) (like the built-in print function).

Can also output a more dict-friendly format with pretty=True.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

NOTE: prints the current (edited) values, not the original values.


Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
sdf["edited_col"] = sdf["orig_col"] + "edited"
# print the updated message value with the newly added column
sdf.print()


Arguments:

  • pretty: Whether to use "pprint" formatting, which uses new-lines and indents for easier console reading (but might be worse for log parsing).
  • metadata: Whether to additionally print the key, timestamp, and headers


Returns:

the updated StreamingDataFrame instance (reassignment NOT required).



StreamingDataFrame.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]

[VIEW SOURCE]

Compose all functions of this StreamingDataFrame into one big closure.

Closures are more performant than calling all the functions in the StreamingDataFrame one-by-one.

Generally not required by users; the quixstreams.app.Application class will do this automatically.


Example Snippet:

from quixstreams import Application
sdf = app.dataframe()
sdf = sdf.apply(apply_func)
sdf = sdf.filter(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})


Arguments:

  • sink: callable to accumulate the results of the execution, optional.


Returns:

a function that accepts "value" and returns a result of StreamingDataFrame



StreamingDataFrame.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None,
         topic: Optional[Topic] = None) -> List[Any]

[VIEW SOURCE]

A shorthand to test StreamingDataFrame with provided value

and MessageContext.


Arguments:

  • value: value to pass through StreamingDataFrame
  • key: key to pass through StreamingDataFrame
  • timestamp: timestamp to pass through StreamingDataFrame
  • ctx: instance of MessageContext, optional. Provide it if the StreamingDataFrame instance calls to_topic(), has stateful functions or windows. Default - None.
  • topic: optionally, a topic branch to test with


Returns:

result of StreamingDataFrame



StreamingDataFrame.tumbling_window

def tumbling_window(duration_ms: Union[int, timedelta],
                    grace_ms: Union[int, timedelta] = 0,
                    name: Optional[str] = None) -> TumblingWindowDefinition

[VIEW SOURCE]

Create a tumbling window transformation on this StreamingDataFrame.

Tumbling windows divide time into fixed-sized, non-overlapping windows.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.


Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a tumbling window of 60s and grace period of 10s
    sdf.tumbling_window(
        duration_ms=timedelta(seconds=60), grace_ms=timedelta(seconds=10.0)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "all()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .all()
)


Arguments:

  • duration_ms: The length of each window. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either an int representing milliseconds or as a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.


Returns:

TumblingWindowDefinition instance representing the tumbling window configuration. This object can be further configured with aggregation functions like sum, count, etc. applied to the StreamingDataFrame.



StreamingDataFrame.hopping_window

def hopping_window(duration_ms: Union[int, timedelta],
                   step_ms: Union[int, timedelta],
                   grace_ms: Union[int, timedelta] = 0,
                   name: Optional[str] = None) -> HoppingWindowDefinition

[VIEW SOURCE]

Create a hopping window transformation on this StreamingDataFrame.

Hopping windows divide the data stream into overlapping windows based on time. The overlap is controlled by the step_ms parameter.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.


Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a hopping window of 60s with step 30s and grace period of 10s
    sdf.hopping_window(
        duration_ms=timedelta(seconds=60),
        step_ms=timedelta(seconds=30),
        grace_ms=timedelta(seconds=10)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "all()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .all()
)


Arguments:

  • duration_ms: The length of each window. It defines the time span for which each window aggregates data. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • step_ms: The step size for the window. It determines how much each successive window moves forward in time. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data to be included in the window, even if it arrives after the window has theoretically moved forward. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.


Returns:

HoppingWindowDefinition instance representing the hopping window configuration. This object can be further configured with aggregation functions like sum, count, etc. and applied to the StreamingDataFrame.



StreamingDataFrame.drop

def drop(columns: Union[str, List[str]]) -> Self

[VIEW SOURCE]

Drop column(s) from the message value (value must support del, like a dict).

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).


Example Snippet:

# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}

sdf = StreamingDataframe()
sdf.drop(["x", "y"])


Arguments:

  • columns: a single column name or a list of names, where names are str


Returns:

a new StreamingDataFrame instance

quixstreams.dataframe.series

StreamingSeries

class StreamingSeries(BaseStreaming)

[VIEW SOURCE]

StreamingSeries are typically generated by StreamingDataframes when getting elements from, or performing certain operations on, a StreamingDataframe, thus acting as a representation of "column" value.

They share some operations with the StreamingDataframe, but also provide some additional functionality.

Most column value operations are handled by this class, and StreamingSeries can generate other StreamingSeries as a result of said operations.


What it Does:

  • Allows ways to do simple operations with dataframe "column"/dictionary values:
    • Basic ops like add, subtract, modulo, etc.
  • Enables comparisons/inequalities:
    • Greater than, equals, etc.
    • and/or, is/not operations
  • Can check for existence of columns in StreamingDataFrames
  • Enables chaining of various operations together


How to Use:

For the most part, you may not even notice this class exists! They will naturally be created as a result of typical StreamingDataFrame use.

Auto-complete should help you with valid methods and type-checking should alert you to invalid operations between StreamingSeries.

In general, any typical Pands dataframe operation between columns should be valid with StreamingSeries, and you shouldn't have to think about them explicitly.


Example Snippet:

# Random methods for example purposes. More detailed explanations found under
# various methods or in the docs folder.

sdf = StreamingDataframe()
sdf = sdf["column_a"].apply(a_func).apply(diff_func, stateful=True)
sdf["my_new_bool_field"] = sdf["column_b"].contains("this_string")
sdf["new_sum_field"] = sdf["column_c"] + sdf["column_d"] + 2
sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]



StreamingSeries.from_apply_callback

@classmethod
def from_apply_callback(cls, func: ApplyWithMetadataCallback) -> Self

[VIEW SOURCE]

Create a StreamingSeries from a function.

The provided function will be wrapped into Apply


Arguments:

  • func: a function to apply


Returns:

instance of StreamingSeries



StreamingSeries.apply

def apply(func: ApplyCallback) -> Self

[VIEW SOURCE]

Add a callable to the execution list for this series.

The provided callable should accept a single argument, which will be its input. The provided callable should similarly return one output, or None

They can be chained together or included with other operations.


Example Snippet:

# The `StreamingSeries` are generated when `sdf["COLUMN_NAME"]` is called.
# This stores a string in state and capitalizes the column value; the result is
# assigned to a new column.
#  Another apply converts a str column to an int, assigning it to a new column.

def func(value: str, state: State):
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return v.upper()

sdf = StreamingDataframe()
sdf["new_col"] = sdf["a_column"]["nested_dict_key"].apply(func, stateful=True)
sdf["new_col_2"] = sdf["str_col"].apply(lambda v: int(v)) + sdf["str_col2"] + 2


Arguments:

  • func: a callable with one argument and one output


Returns:

a new StreamingSeries with the new callable added



StreamingSeries.compose_returning

def compose_returning() -> ReturningExecutor

[VIEW SOURCE]

Compose a list of functions from this StreamingSeries and its parents into one

big closure that always returns the transformed record.

This closure is to be used to execute the functions in the stream and to get the result of the transformations.

Stream may only contain simple "apply" functions to be able to compose itself into a returning function.


Returns:

a callable accepting value, key and timestamp and returning a tuple "(value, key, timestamp)



StreamingSeries.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any],
                            None]] = None) -> VoidExecutor

[VIEW SOURCE]

Compose all functions of this StreamingSeries into one big closure.

Generally not required by users; the quixstreams.app.Application class will do this automatically.


Example Snippet:

from quixstreams import Application

app = Application(...)

sdf = app.dataframe()
sdf = sdf["column_a"].apply(apply_func)
sdf = sdf["column_b"].contains(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})


Arguments:

  • sink: callable to accumulate the results of the execution.

Raises:

  • ValueError: if disallowed functions are present in the tree of underlying Stream.


Returns:

a callable accepting value, key and timestamp and returning None



StreamingSeries.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None) -> Any

[VIEW SOURCE]

A shorthand to test StreamingSeries with provided value

and MessageContext.


Arguments:

  • value: value to pass through StreamingSeries
  • ctx: instance of MessageContext, optional. Provide it if the StreamingSeries instance has functions calling get_current_key(). Default - None.


Returns:

result of StreamingSeries



StreamingSeries.isin

def isin(other: Container) -> Self

[VIEW SOURCE]

Check if series value is in "other".

Same as "StreamingSeries in other".

Runtime result will be a bool.


Example Snippet:

from quixstreams import Application

# Check if "str_column" is contained in a column with a list of strings and
# assign the resulting `bool` to a new column: "has_my_str".

sdf = app.dataframe()
sdf["has_my_str"] = sdf["str_column"].isin(sdf["column_with_list_of_strs"])


Arguments:

  • other: a container to check


Returns:

new StreamingSeries



StreamingSeries.contains

def contains(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value contains "other"

Same as "other in StreamingSeries".

Runtime result will be a bool.


Example Snippet:

from quixstreams import Application

# Check if "column_a" contains "my_substring" and assign the resulting
# `bool` to a new column: "has_my_substr"

sdf = app.dataframe()
sdf["has_my_substr"] = sdf["column_a"].contains("my_substring")


Arguments:

  • other: object to check


Returns:

new StreamingSeries



StreamingSeries.is_

def is_(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value refers to the same object as other

Runtime result will be a bool.


Example Snippet:

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
#  to a new column: "is_same"

from quixstreams import Application
sdf = app.dataframe()
sdf["is_same"] = sdf["column_a"].is_(sdf["column_b"])


Arguments:

  • other: object to check for "is"


Returns:

new StreamingSeries



StreamingSeries.isnot

def isnot(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value does not refer to the same object as other

Runtime result will be a bool.


Example Snippet:

from quixstreams import Application

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_not_same"

sdf = app.dataframe()
sdf["is_not_same"] = sdf["column_a"].isnot(sdf["column_b"])


Arguments:

  • other: object to check for "is_not"


Returns:

new StreamingSeries



StreamingSeries.isnull

def isnull() -> Self

[VIEW SOURCE]

Check if series value is None.

Runtime result will be a bool.


Example Snippet:

from quixstreams import Application

# Check if "column_a" is null and assign the resulting `bool` to a new column:
# "is_null"

sdf = app.dataframe()
sdf["is_null"] = sdf["column_a"].isnull()


Returns:

new StreamingSeries



StreamingSeries.notnull

def notnull() -> Self

[VIEW SOURCE]

Check if series value is not None.

Runtime result will be a bool.


Example Snippet:

from quixstreams import Application

# Check if "column_a" is not null and assign the resulting `bool` to a new column:
# "is_not_null"

sdf = app.dataframe()
sdf["is_not_null"] = sdf["column_a"].notnull()


Returns:

new StreamingSeries



StreamingSeries.abs

def abs() -> Self

[VIEW SOURCE]

Get absolute value of the series value.


Example Snippet:

from quixstreams import Application

# Get absolute value of "int_col" and add it to "other_int_col".
# Finally, assign the result to a new column: "abs_col_sum".

sdf = app.dataframe()
sdf["abs_col_sum"] = sdf["int_col"].abs() + sdf["other_int_col"]


Returns:

new StreamingSeries