Stream Timeout Tracker
StreamTimeoutTracker is a sink-agnostic, per-key silence detector. It fires a user callback when a tracked "stream" (typically a Kafka message key) has been silent past a configurable threshold. The module is stdlib-only and has zero imports from any specific sink or from Quix-platform-specific types, so any quixstreams sink (core, community, or third-party) can compose it.
It lives at quixstreams.sinks.core.stream_timeout_tracker.
When to use it
Use the tracker when your sink needs any of the following:
- Detect when individual message keys stop arriving (sensor drop-out, stream finished, session idle).
- Fire a callback exactly once per silence period per key, with fire-and-evict + re-arm semantics.
- Keep the detection logic identical across sinks — no need to reimplement timer loops, TTL sweeps, or lock discipline.
Use it as a drop-in component; do not subclass it.
How It Works
One "stream" equals one key. Callers invoke touch(stream_key) every time a record arrives; the tracker records the last-seen wall-clock time and fires the configured callback exactly once per silence period per key.
Raw-key pass-through
Keys are stored and surfaced as-is. Any hashable value is accepted: str, bytes, int, etc. The callback receives the exact object that was passed to touch. None keys are silently skipped.
This differs from earlier versions that decoded bytes → str before firing. Consumers that care about the string form of a bytes key should decode inside the callback.
Fire-and-evict + re-arm
When a key crosses the silence threshold:
- The entry is evicted from the in-memory dict.
- The callback is invoked with the raw key.
- An INFO log line is emitted.
A later touch() for the same key resurrects it as a fresh stream, eligible to fire again.
TTL safety sweep
Any entry older than 3 × stream_timeout_ms is dropped silently (WARNING logged, no callback). This bounds the dict size in degenerate cases.
Check cadence
Two code paths drive checks:
- Synchronous: the host sink calls
check_now()at the end of each flush. - Background daemon thread: started by
start(), runscheck_now()everymax(100, min(1000, stream_timeout_ms // 5))ms.
The background thread self-terminates after idle_exit_cycles (default 3) consecutive empty cycles and is respawned by the next touch() that records a stamp. This keeps the sink at zero overhead while idle.
Concurrency
touch() and check_now() can run on different threads. A single threading.Lock guards the per-key dict and timer-thread reference. Critical sections are tiny; user callbacks are invoked outside the lock, so a blocking callback cannot stall touch().
Public API
from quixstreams.sinks.core.stream_timeout_tracker import StreamTimeoutTracker
tracker = StreamTimeoutTracker(
stream_timeout_ms=60_000, # positive int to enable, None to disable
on_stream_timeout=lambda key: ..., # Callable[[Any], None]
check_interval_ms=None, # override periodic cadence
idle_exit_cycles=3, # empty cycles before self-terminate
thread_name="StreamTimeoutTracker-check",
logger=None, # defaults to module logger
)
tracker.enabled # True iff both params are valid
tracker.touch(key) # record a key (call from the host sink's `add()`)
tracker.check_now() # synchronous check (call from the host sink's `flush()`)
tracker.start() # start the background daemon thread (call from `setup()`)
tracker.stop() # signal the thread to exit (call from `cleanup()`)
Both stream_timeout_ms and on_stream_timeout must be provided together. Mismatched pairs raise ValueError. Passing None for both silently disables the tracker — every public method becomes a no-op and the per-key dict is never allocated.
Composing it into a custom sink
The recommended lifecycle wiring mirrors what QuixTSDataLakeSink does:
from quixstreams.sinks.base import BatchingSink
from quixstreams.sinks.core.stream_timeout_tracker import StreamTimeoutTracker
class MyCustomSink(BatchingSink):
def __init__(self, *, stream_timeout_ms=None, on_stream_timeout=None, **kwargs):
super().__init__(**kwargs)
self._timeout = StreamTimeoutTracker(
stream_timeout_ms=stream_timeout_ms,
on_stream_timeout=on_stream_timeout,
thread_name="MyCustomSink-timeout-check",
)
def add(self, value, key, timestamp, headers, topic, partition, offset):
super().add(value, key, timestamp, headers, topic, partition, offset)
self._timeout.touch(key, topic=topic, partition=partition, offset=offset)
def flush(self):
super().flush()
self._timeout.check_now()
def setup(self):
super().setup()
self._timeout.start()
def cleanup(self):
self._timeout.stop()
super().cleanup()
The tracker treats topic/partition/offset as opaque kwargs; pass any per-record context you want — the tracker currently ignores them (reserved for future log enrichment).
Callback contract
- Signature:
Callable[[Any], None]. The argument is the raw key passed totouch(). - Runs on: the flush thread (for
check_now()triggers) or the background thread (for periodic triggers). Both are the same in practice from a throughput standpoint — the consumer is not polling while either runs. - Must not block. A slow or blocking callback stalls the consumer heartbeat and can trigger a rebalance. Use fire-and-forget patterns (e.g.
producer.produce()without a subsequentflush()). - May raise. Exceptions are logged and swallowed; the entry is not re-inserted, so the key won't fire again on retry. A later
touch()re-arms it.
Fire latency
Worst-case fire latency is stream_timeout_ms + check_interval_ms. With a 60-second threshold and the default 1-second check interval, expect a fire 60–61 seconds after the last message for a given key.
With a flush-only cadence, fire latency would be unbounded on a fully silent topic — BatchingSink stops calling flush() with no batches to process. The background thread closes that gap.
Restart / rebalance behaviour
Tracking state is in-memory only. On process restart or Kafka partition rebalance, the per-key dict for affected keys is lost. Keys that remain dormant after a restart or rebalance will not fire for their current silence cycle; they resume normal tracking the next time they publish.
If you need fire-once-across-restarts semantics, persist "already fired" state outside the tracker (e.g. in the application's state store or a downstream dedupe layer).