Inspecting Data and Debugging
Sometimes, there is a need for more direct insight around your data processing with
a StreamingDataFrame
, especially with specific operations.
Here are some helpful tools or strategies for inspecting your Application
and data,
whether for debugging or confirming things are working as expected!
Printing Data
This is generally the simplest approach for inspecting your Application
.
You can print the current record's value at desired points in a StreamingDataFrame
.
Single Record Printing
The most log-friendly approach (especially if you do not have a live terminal
session, such as when running in Kubernetes) is StreamingDataFrame.print()
,
which prints the current record value wherever it's called.
It's a multi-line print by default, but it can be single line with kwarg pretty=False
.
It can additionally include the record metadata with kwarg metadata=True
:
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Print the current record's value, key, timestamp and headers
sdf.print(metadata=True)
# It will print the record's data wrapped into a dict for readability:
# { 'value': {'number': 12183},
# 'key': b'key',
# 'timestamp': 1721129697951,
# 'headers': [('header_name', b'header-value')]
# }
Table Printing
A more user-friendly way to monitor your data stream is using StreamingDataFrame.print_table()
.
It creates a live-updating table that shows the most recent records:
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Show last 5 records with metadata columns
sdf.print_table(size=5, title="My Stream")
# For wide datasets, limit columns to improve readability
sdf.print_table(
size=5,
title="My Stream",
columns=["id", "name", "value"],
column_widths={"name": 20}
)
This will produce a live table like:
My Stream
┏━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ _key ┃ _timestamp ┃ id ┃ name ┃ value ┃
┡━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ b'53fe8e4' │ 1738685136 │ 876 │ Charlie │ 42.5 │
│ b'91bde51' │ 1738685137 │ 11 │ Alice │ 18.3 │
│ b'6617dfe' │ 1738685138 │ 133 │ Bob │ 73.1 │
│ b'f47ac93' │ 1738685139 │ 244 │ David │ 55.7 │
│ b'038e524' │ 1738685140 │ 567 │ Eve │ 31.9 │
└────────────┴────────────┴────────┴──────────────────────┴─────────┘
Note that the "name" column is resized to desired width of 20 characters.
You can monitor multiple points in your pipeline by adding multiple print_table calls:
sdf = app.dataframe(topic)
sdf.print_table(title="Raw Input")
sdf = sdf.filter(lambda value: ...)
sdf.print_table(title="Filtered Values")
sdf = sdf.apply(lambda value: ...)
sdf.print_table(title="Final Output")
Interacting with Data
If you'd like to store or manipulate data from Application
processing directly, you can
do an "interactive" Application
by executing Application.run()
with stop conditions within an
iPython session (terminal/Pycharm/VS Code etc.) or Jupyter Notebook cell.
Basically:
- The
Application
runs for the specified period in the interactive session. - Once the Application stops, the
run
method can return the accumulated outputs of the application for all registered dataframes.
The details of this pattern are further explained below.
Stopping the application early and collecting the outputs
In a production setting, Application.run()
should be called only once with no
arguments, which means it will run indefinitely (until it encounters an error or
is manually stopped by the user).
However, for debugging, the following kwargs can be passed to Application.run()
to stop it
when the applicable condition is met:
timeout
: maximum time to wait for a new message to arrive (default0.0
== infinite)count
: a number of outputs to process across all dataframes and input topics (default0
== infinite)
If timeout
and count
are passed together (which is the recommended pattern for debugging), either condition
will trigger the stop.
Also, you can collect the outputs of your application and examine them after
the Application.run()
call:
collect
: ifTrue
(default), collect the outputs and return them as a list of dictionaries from theApplication.run()
call.
This setting is effective only whentimeout
orcount
are passed.metadata
: ifTrue
, the collected outputs will include values, keys, timestamps, topics, partitions, and offsets.
Otherwise, only values are included (the default).
Example:
from quixstreams import Application
app = Application(broker_address="localhost:9092")
topic = app.topic("some-topic")
# Assume the topic has one partition and three JSON messages:
# {"temperature": 30}
# {"temperature": 40}
# {"temperature": 50}
sdf = app.dataframe(topic=topic)
# Process one output and collect the value (stops if no messages for 10s)
result_values_only = app.run(count=1, timeout=10, collect=True)
# >>> result_values_only = [
# {"temperature": 30}
# ]
# Process one output and collect the value with metadata (stops if no messages for 10s)
result_values_and_metadata = app.run(count=1, timeout=10, collect=True, metadata=True)
# >>> result_values_and_metadata = [
# {"temperature": 40, "_key": "<message_key>", "_timestamp": 123, "_offset": 1, "_topic": "some-topic", "_partition": 1, "_headers": None},
# ]
# Process one output and without collecting (stops if no messages for 10s)
result_empty = app.run(count=1, timeout=10, collect=False)
# >>> result_empty = []
Count Behavior
There are a few things to be aware of with count
:
- It counts outputs processed by all
StreamingDataFrames
.
Under the hood, every message may generate from 0 to N outputs as it is passing through the topology generated byStreamingDataFrames
: - Operations like filtering (e.g.,
StreamingDataFrame.filter()
ordataframe[dataframe["<field>"] == "<value>"]
) and "final" windowed aggregations reduce the number of outputs. -
Operations like
StreamingDataFrame.apply(..., expand=True)
and branching may increase the total number of outputs -
The total number of outputs may be higher than the passed
count
parameter because every message is processed fully before stopping the app. - After the count is reached, the
Application
stops and returns the accumulated outputs according to thecollection_mode
setting.
Timeout Behavior
A couple of things to note about timeout
:
-
Though it can be used standalone, it's recommended to be paired with a
count
. -
Tracking starts once the first partition assignment (or recovery, if needed) finishes.
-
There is a 60s wait buffer for the first assignment to trigger.
-
Using only
timeout
when collecting data from high-volume topics may cause out-of-memory errors whencollect=True
(default).
Multiple Application.run() calls
It is safe to do subsequent Application.run()
calls with different arguments; it will simply pick up where it left off.
There is no need to do any manual cleanup when finished; each run cleans up after itself.
NOTE: You do not need to re-execute the entire Application setup, just
.run()
.
ListSink
ListSink
primarily exists to use alongside Application.run()
stop conditions.
You may use it to collect and examine data after some specific operations in the DataFrame.
It can be interacted with like a list once the Application
stops.
Using ListSink
To use a ListSink
, simply store one in a separate variable and pass it
like any other Sink
:
from quixstreams import Application
from quixstreams.sinks.core.list import ListSink
app = Application(broker_address="localhost:9092")
topic = app.topic("some-topic")
list_sink = ListSink() # sink will be a list-like object
sdf = app.dataframe(topic=topic).sink(list_sink)
app.run(count=50, timeout=10) # get up to 50 records (stops if no messages for 10s)
You can then interact with it once the Application
stops:
>>> print(list_sink)
[{"thing": "x"}, {"thing": "y"}, {"thing": "z"}]
>>> list_sink[0]
{"thing": "x"}
NOTE: Though the result is actually a
ListSink
, it behaves like a list...you can even pass it toPandas.DataFrame()
and it will work as expected!
Including Message Metadata
To include Kafka message data beyond fields contained in value
, do ListSink(metadata=True)
,
which will include all the Kafka message parameters like key
, timestamp
, etc.
as additional fields as _{param}
, like so:
> list_sink[0]
{"thing": "x", "_key": "id-123", "_timestamp": 1234567890, "_topic": "some-topic", ... }
ListSink Limitations
- You can use any number of
ListSink
in anApplication
-
each one must have its own variable.
-
ListLink
does not limit its own size -
Be sure to use it with
Application.run()
stopping conditions. -
ListSink
does not "refresh" itself per.run()
; it collects data indefinitely. - You can remove the current data stored by doing
list_sink.clear()
.
Interactive limitations
Currently, Quix Streams Source
functionality is limited due
to multiprocessing
if __name__ == '__main__':
limitations; specifically:
- You cannot run any source-based
Application
"interactively" (i.e. using ListSink to inspect values) - There is potentially a way around this for Jupiter Notebooks
- You can only run a plain source (no SDF) with the
timeout
argument.
Using Breakpoints
For detailed examination, you can set breakpoints using StreamingDataFrame.update()
:
import pdb
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Set a breakpoint
sdf.update(lambda value: pdb.set_trace())
Application loglevel to DEBUG
Though likely not helpful for the average user, it is possible to change the
Application
loglevel to DEBUG
for more under-the-hood insight.
Just do Application(loglevel="DEBUG")
.
NOTE: This does NOT grant any insights into the message data directly.