Inspecting Data and Debugging
Sometimes, there is a need for more direct insight around your data processing with
a StreamingDataFrame
, especially with specific operations.
Here are some helpful tools or strategies for inspecting your Application
and data,
whether for debugging or confirming things are working as expected!
Printing Data
This is generally the simplest approach for inspecting your Application
.
You can print the current record's value at desired points in a StreamingDataFrame
.
Single Record Printing
The most log-friendly approach (especially if you do not have a live terminal
session, such as when running in Kubernetes) is StreamingDataFrame.print()
,
which prints the current record value wherever it's called.
It's a multi-line print by default, but it can be single line with kwarg pretty=False
.
It can additionally include the record metadata with kwarg metadata=True
:
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Print the current record's value, key, timestamp and headers
sdf.print(metadata=True)
# It will print the record's data wrapped into a dict for readability:
# { 'value': {'number': 12183},
# 'key': b'key',
# 'timestamp': 1721129697951,
# 'headers': [('header_name', b'header-value')]
# }
Table Printing
A more user-friendly way to monitor your data stream is using StreamingDataFrame.print_table()
.
It creates a live-updating table that shows the most recent records:
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Show last 5 records with metadata columns
sdf.print_table(size=5, title="My Stream")
# For wide datasets, limit columns to improve readability
sdf.print_table(
size=5,
title="My Stream",
columns=["id", "name", "value"],
column_widths={"name": 20}
)
This will produce a live table like:
My Stream
┏━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ _key ┃ _timestamp ┃ id ┃ name ┃ value ┃
┡━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ b'53fe8e4' │ 1738685136 │ 876 │ Charlie │ 42.5 │
│ b'91bde51' │ 1738685137 │ 11 │ Alice │ 18.3 │
│ b'6617dfe' │ 1738685138 │ 133 │ Bob │ 73.1 │
│ b'f47ac93' │ 1738685139 │ 244 │ David │ 55.7 │
│ b'038e524' │ 1738685140 │ 567 │ Eve │ 31.9 │
└────────────┴────────────┴────────┴──────────────────────┴─────────┘
Note that the "name" column is resized to desired width of 20 characters.
You can monitor multiple points in your pipeline by adding multiple print_table calls:
sdf = app.dataframe(topic)
sdf.print_table(title="Raw Input")
sdf = sdf.filter(lambda value: ...)
sdf.print_table(title="Filtered Values")
sdf = sdf.apply(lambda value: ...)
sdf.print_table(title="Final Output")
Interacting with Data
If you'd like to store or manipulate data from Application
processing directly, you can
do an "interactive" Application
by executing Application.run()
within an
iPython session (terminal/Pycharm/VS Code etc.) or Jupiter Notebook cell.
To do so, you'll need to both:
- Provide stop conditions to
Application.run()
- Use Quix Streams
ListSink
+StreamingDataFrame.sink()
to store data.
Basically:
- The
Application
runs for the specified period in the interactive session. - While running, data is stored in the specified
ListSink
variable(s). - Once the Application stops, those variable(s) are now accessible as normal.
The details of this pattern are further explained below.
Application.run() stop conditions
In a production setting, Application.run()
should be called only once with no
arguments, which means it will run indefinitely (until it encounters an error or
is manually stopped by the user).
However, for debugging, the following kwargs can be passed to stop the Application
when the applicable condition is met:
timeout
: maximum time to wait for a new message (default0.0
== infinite)count
: number of messages to process from main SDF input topics (default0
== infinite)
If used together (which is the recommended pattern for debugging), either condition will trigger the stop.
Count Behavior
There are a few of things to be aware of with count
:
-
It only counts messages from (input) topics passed by the user to a
StreamingDatFrame
.- this means things like repartition topics (group by) are NOT counted.
-
It's a total message count across all input topics, NOT for each input topic
- ex: for
count=20
,topic_a
could get 5 messages, andtopic_b
15 messages
- ex: for
-
Things like
SDF.apply(expand=True
) and branching do not affect counts. -
AFTER the count is reached, the
Application
flushes any respective repartition topics so all downstream processing is included.- repartition highwaters are recorded when condition is met and is consumed up to those watermarks.
Timeout Behavior
A couple things to note about timeout
:
-
Though it can be used standalone, it's recommended to be paired with a
count
. -
Tracking starts once the first partition assignment (or recovery, if needed) finishes.
-
There is a 60s wait buffer for the first assignment to trigger.
-
Using only
timeout
when collecting data from a high-volume topic with aListSink
could cause out-of-memory errors.
Multiple Application.run() calls
It is safe to do subsequent Application.run()
calls (even with
new arguments!); it will simply pick up where it left off.
There is no need to do any manual cleanup when finished; each run cleans up after itself.
NOTE: You do not need to re-execute the entire Application setup, just
.run()
.
ListSink
ListSink
primarily exists to use alongside Application.run()
stop conditions.
It collects data where it's used, which can then be interacted with like
a list once the Application
stops.
Using ListSink
To use a ListSink
, simply store one in a separate variable and pass it
like any other Sink
:
from quixstreams import Application
from quixstreams.sinks.core.list import ListSink
app = Application(broker_address="localhost:9092")
topic = app.topic("some-topic")
list_sink = ListSink() # sink will be a list-like object
sdf = app.dataframe(topic=topic).sink(list_sink)
app.run(count=50, timeout=10) # get up to 50 records (stops if no messages for 10s)
You can then interact with it once the Application
stops:
NOTE: Though the result is actually a
ListSink
, it behaves like a list...you can even pass it toPandas.DataFrame()
and it will work as expected!
ListSink Limitations
- You can use any number of
ListSink
in anApplication
-
each one must have its own variable.
-
ListLink
does not limit it's own size -
Be sure to use it with
Application.run()
stopping conditions. -
ListSink
does not "refresh" itself per.run()
; it collects data indefinitely. - You can remove the current data stored by doing
list_sink.clear()
.
Interactive limitations
Currently, Quix Streams Source
functionality is limited due
to multiprocessing
if __name__ == '__main__':
limitations; specifically:
- You cannot run any source-based
Application
"interactively" (i.e. using ListSink to inspect values) - There is potentially a way around this for Jupiter Notebooks
- You can only run a plain source (no SDF) with the
timeout
argument.
Using Breakpoints
For detailed examination, you can set breakpoints using StreamingDataFrame.update()
:
import pdb
sdf = app.dataframe(...)
# some SDF transformations happening here ...
# Set a breakpoint
sdf.update(lambda value: pdb.set_trace())
Application loglevel to DEBUG
Though likely not helpful for the average user, it is possible to change the
Application
loglevel to DEBUG
for more under-the-hood insight.
Just do Application(loglevel="DEBUG")
.
NOTE: This does NOT grant any insights into the message data directly.