Skip to content

State API

quixstreams.state.base.state

State

class State(ABC, Generic[K, V])

[VIEW SOURCE]

Primary interface for working with key-value state data from StreamingDataFrame



State.get

@abstractmethod
def get(key: K, default: Optional[V] = None) -> Optional[V]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



State.get_bytes

def get_bytes(key: K, default: Optional[bytes] = None) -> Optional[bytes]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value as bytes or None if the key is not found and default is not provided



State.set

@abstractmethod
def set(key: K, value: V, ttl: Optional[timedelta] = None) -> None

[VIEW SOURCE]

Set value for the key, optionally with a per-write expiry.


Arguments:

  • key: key
  • value: value
  • ttl: optional event-time TTL. When set, the entry expires ttl after the current record's event-time and is filtered from subsequent reads. None (default) writes a sentinel stamp meaning "never expires", overwriting any prior TTL on the same key.



State.set_bytes

@abstractmethod
def set_bytes(key: K, value: bytes, ttl: Optional[timedelta] = None) -> None

[VIEW SOURCE]

Set bytes value for the key, optionally with a per-write expiry.


Arguments:

  • key: key
  • value: value as bytes
  • ttl: see :meth:set.



State.delete

@abstractmethod
def delete(key: K)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.


Arguments:

  • key: key



State.exists

@abstractmethod
def exists(key: K) -> bool

[VIEW SOURCE]

Check if the key exists in state.


Arguments:

  • key: key


Returns:

True if key exists, False otherwise

TransactionState

class TransactionState(State)

[VIEW SOURCE]



TransactionState.__init__

def __init__(prefix: bytes,
             transaction: "PartitionTransaction",
             timestamp: Optional[int] = None)

[VIEW SOURCE]

Simple key-value state to be provided into StreamingDataFrame functions


Arguments:

  • transaction: instance of PartitionTransaction
  • prefix: serialized key prefix shared across calls
  • timestamp: optional event-time of the current record (ms). Used by TTL-aware partitions to stamp values on set() with record.timestamp + ttl and to filter expired entries on get(). The framework injects this on every record via the StreamingDataFrame stateful wrapper.



TransactionState.get

def get(key: K, default: Optional[V] = None) -> Optional[V]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



TransactionState.get_bytes

def get_bytes(key: K, default: Optional[bytes] = None) -> Optional[bytes]

[VIEW SOURCE]

Get the bytes value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



TransactionState.set

def set(key: K, value: V, ttl: Optional[timedelta] = None) -> None

[VIEW SOURCE]

Set value for the key, optionally with a per-write expiry.


Arguments:

  • key: key
  • value: value
  • ttl: optional event-time TTL. See :class:State.set.



TransactionState.set_bytes

def set_bytes(key: K, value: bytes, ttl: Optional[timedelta] = None) -> None

[VIEW SOURCE]

Set bytes value for the key, optionally with a per-write expiry.


Arguments:

  • key: key
  • value: value as bytes
  • ttl: optional event-time TTL. See :class:State.set.



TransactionState.delete

def delete(key: K)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.


Arguments:

  • key: key



TransactionState.exists

def exists(key: K) -> bool

[VIEW SOURCE]

Check if the key exists in state.


Arguments:

  • key: key


Returns:

True if key exists, False otherwise

quixstreams.state.rocksdb.options

RocksDBOptions

@dataclasses.dataclass(frozen=True)
class RocksDBOptions(RocksDBOptionsType)

[VIEW SOURCE]

RocksDB database options.


Arguments:

  • dumps: function to dump data to JSON
  • loads: function to load data from JSON
  • open_max_retries: number of times to retry opening the database if it's locked by another process. To disable retrying, pass 0
  • open_retry_backoff: number of seconds to wait between each retry.
  • on_corrupted_recreate: when True, the corrupted DB will be destroyed if the use_changelog_topics=True is also set on the Application. If this option is True, but use_changelog_topics=False, the DB won't be destroyed. Note: risk of data loss! Make sure that the changelog topics are up-to-date before disabling it in production. Default - True.
  • max_evictions_per_flush: cap on TTL-driven evictions performed during a single flush() for stores with TTL enabled. Larger values increase per-flush latency but let the sweep keep up with higher steady-state expiration rates. Only meaningful for TTL-enabled stores; ignored otherwise. Default - 10_000.

Please see rocksdict.Options for a complete description of other options.



RocksDBOptions.to_options

def to_options() -> rocksdict.Options

[VIEW SOURCE]

Convert parameters to rocksdict.Options


Returns:

instance of rocksdict.Options