Context API
quixstreams.context
set_message_context
Set a MessageContext for the current message in the given contextvars.Context
NOTE: This is for advanced usage only. If you need to change the message key,
StreamingDataFrame.to_topic()
has an argument for it.
Example Snippet:
from quixstreams import Application, set_message_context, message_context
# Changes the current sdf value based on what the message partition is.
def alter_context(value):
context = message_context()
if value > 1:
context.headers = context.headers + (b"cool_new_header", value.encode())
set_message_context(context)
app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))
Arguments:
context
: instance ofMessageContext
message_context
Get a MessageContext for the current message, which houses most of the message
metadata, like: - key - timestamp - partition - offset
Example Snippet:
from quixstreams import Application, message_context
# Changes the current sdf value based on what the message partition is.
app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)
Returns:
instance of MessageContext