Amazon Kinesis Source
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This source reads data from an Amazon Kinesis stream, dumping it to a
kafka topic using desired StreamingDataFrame-based transformations.
How To Install
To use the Kinesis sink, you need to install the required dependencies:
How It Works
KinesisSource reads from a Kinesis stream and produces its messages to a Kafka topic.
Records are read in a streaming fashion and committed intermittently, offering at-least-once guarantees.
Each shard in the Kinesis stream is consumed in a round-robin fashion to ensure reads are equally distributed.
You can learn more details about the expected kafka message format below.
How To Use
To use Kinesis Source, hand KinesisSource to app.dataframe().
For more details around various settings, see configuration.
from quixstreams import Application
from quixstreams.sources.community.kinesis import KinesisSource
kinesis = KinesisSource(
stream_name="<YOUR STREAM>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
auto_offset_reset="earliest", # start from the beginning of the stream (vs end)
)
app = Application(
broker_address="<YOUR BROKER INFO>",
consumer_group="<YOUR GROUP>",
)
sdf = app.dataframe(source=kinesis).print(metadata=True)
# YOUR LOGIC HERE!
if __name__ == "__main__":
app.run()
Configuration
Here are some important configurations to be aware of (see Kinesis Source API for all parameters).
Required:
stream_name: the name of the desired stream to consume.aws_region: AWS region (ex: us-east-1).
Note: can alternatively set theAWS_REGIONenvironment variable.aws_access_key_id: AWS User key ID. Note: can alternatively set theAWS_ACCESS_KEY_IDenvironment variable.aws_secret_access_key: AWS secret key.
Note: can alternatively set theAWS_SECRET_ACCESS_KEYenvironment variable.
Optional:
aws_endpoint_url: Only fill when testing against a locally-hosted Kinesis instance.
Note: can leave otherawssettings blank when doing so.
Note: can alternatively set theAWS_ENDPOINT_URL_KINESISenvironment variable.commit_interval: How often to commit stream reads.
Default:5.0s
Message Data Format/Schema
This is the default format of messages handled by Application:
-
Message
keywill be the Kinesis recordPartitionKeyas astring. -
Message
valuewill be the Kinesis recordDatainbytes(transform accordingly with yourSDFas desired). -
Message
timestampwill be the Kinesis recordArrivalTimestamp(ms).
Processing/Delivery Guarantees
The Kinesis Source offers "at-least-once" guarantees: offsets are managed using an internal Quix Streams changelog topic.
As such, in rare circumstances where topic flushing ends up failing, messages may be processed (produced) more than once.
Topic
The default topic name the Application dumps to is source-kinesis_<stream name>.
Testing Locally
Rather than connect to AWS, you can alternatively test your application using a local Kinesis host via Docker:
-
Execute in terminal:
-
Set
aws_endpoint_urlforKinesisSourceOR theAWS_ENDPOINT_URL_KINESISenvironment variable tohttp://localhost:4566 -
Set all other
aws_parameters forKinesisSourceto any string. They will not be used, but they must still be populated!