Apache Iceberg Sink
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This sink writes batches of data to an Apache Iceberg table.
By default, the data will include the kafka message key, value, and timestamp.
Currently, supports Apache Iceberg hosted in:
- AWS
Supported data catalogs:
- AWS Glue
How the Iceberg Sink Works
IcebergSink
is a batching sink.
It batches processed records in memory per topic partition, serializes incoming data batches into Parquet format, and appends them to the Iceberg table, updating the table schema as necessary.
How To Use Iceberg Sink
Create an instance of IcebergSink
and pass
it to the StreamingDataFrame.sink()
method.
For the full description of expected parameters, see the Iceberg Sink API page.
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)
# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
data_catalog_spec="aws_glue",
)
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')
# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)
# Sink results to the IcebergSink
sdf.sink(iceberg_sink)
if __name__ == "__main__":
# Start the application
app.run()
Retrying Failures
IcebergSink
will retry failed commits automatically with a random delay up to 5 seconds.
Delivery Guarantees
IcebergSink
provides at-least-once guarantees, and the results may contain duplicated rows of data if there were errors during processing.