Skip to content

State API

quixstreams.state.base.state

State

class State(ABC)

[VIEW SOURCE]

Primary interface for working with key-value state data from StreamingDataFrame



State.get

@abstractmethod
def get(key: Any, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



State.set

@abstractmethod
def set(key: Any, value: Any)

[VIEW SOURCE]

Set value for the key.


Arguments:

  • key: key
  • value: value



State.delete

@abstractmethod
def delete(key: Any)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.


Arguments:

  • key: key



State.exists

@abstractmethod
def exists(key: Any) -> bool

[VIEW SOURCE]

Check if the key exists in state.


Arguments:

  • key: key


Returns:

True if key exists, False otherwise

TransactionState

class TransactionState(State)

[VIEW SOURCE]



TransactionState.__init__

def __init__(prefix: bytes, transaction: "PartitionTransaction")

[VIEW SOURCE]

Simple key-value state to be provided into StreamingDataFrame functions


Arguments:

  • transaction: instance of PartitionTransaction



TransactionState.get

def get(key: Any, default: Any = None) -> Optional[Any]

[VIEW SOURCE]

Get the value for key if key is present in the state, else default


Arguments:

  • key: key
  • default: default value to return if the key is not found


Returns:

value or None if the key is not found and default is not provided



TransactionState.set

def set(key: Any, value: Any)

[VIEW SOURCE]

Set value for the key.


Arguments:

  • key: key
  • value: value



TransactionState.delete

def delete(key: Any)

[VIEW SOURCE]

Delete value for the key.

This function always returns None, even if value is not found.


Arguments:

  • key: key



TransactionState.exists

def exists(key: Any) -> bool

[VIEW SOURCE]

Check if the key exists in state.


Arguments:

  • key: key


Returns:

True if key exists, False otherwise

quixstreams.state.rocksdb.options

RocksDBOptions

@dataclasses.dataclass(frozen=True)
class RocksDBOptions(RocksDBOptionsType)

[VIEW SOURCE]

RocksDB database options.


Arguments:

  • dumps: function to dump data to JSON
  • loads: function to load data from JSON
  • open_max_retries: number of times to retry opening the database if it's locked by another process. To disable retrying, pass 0
  • open_retry_backoff: number of seconds to wait between each retry. Please see rocksdict.Options for a complete description of other options.



RocksDBOptions.to_options

def to_options() -> rocksdict.Options

[VIEW SOURCE]

Convert parameters to rocksdict.Options


Returns:

instance of rocksdict.Options