MongoDB Sink
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This sink writes data to a MongoDB Database, with a few ways to dump data.
How To Install
To use the MongoDB sink, you need to install the required dependencies:
How It Works
MongoDBSink
is a streaming sink that publishes messages to MongoDB in batches.
There are a few different ways to handle/export them with MongoDBSink
, but the most common
(and default) approach is having a 1:1 correspondence between Kafka message key
and
document _id
.
Regardless of approach, the following is always true:
- The Kafka message
value
fields are dumped as whatever objects they are. - The sink ensures that the order of messages is preserved within each partition.
- This means that messages are sent to MongoDB in the same order they are received from Kafka for each specific partition.
Export Behavior
How data is dumped with MongoDBSink
primarily depends on two parameters:
update_method
and document_matcher
.
Default: kafka key
== MongoDB _id
The default document_matcher
assumes the message key
corresponds to an equivalently
named document _id
, so it attempts to match on that for updating.
If no such _id
exists (and MongoDBSink
has upsert=True
), the document will instead
be created with that _id
.
Also by default, only the fields present in the Kafka message will be updated
in the document (update_method="UpdateOne"
).
However, you can do full document replacement/set by
setting update_method="ReplaceOne"
, which will drop any fields that are not present in
the message.
Using A Custom _id
A custom _id
can be used by simply providing your own
document_matcher
to MongoDBSink
(which should include {"_id": <YOUR_VALUE>}
).
If no match is found (and assuming upsert=True
), the document will instead be created
with that _id
.
If no document_matcher
or _id
specification is specified (and upsert=True
), MongoDB
will
create a new document where _id
will be assigned an ObjectID
(default MongoDB behavior).
Example
from quixstreams.sinks.community.mongodb import MongoDBSink
from quixstreams.sinks.base.item import SinkItem
def match_on_last_name(batch_item: SinkItem):
return {"_id": batch_item.value["name"]["last"]}
sink = MongoDBSink(
..., # other required stuff
document_matcher=match_on_last_name,
)
Alternate behavior: pattern-based updates
The document_matcher
can alternatively be used to match more than one document at a time; it
simply has to return any valid MongoDB "query filter" (what is used by MongoDB's .find()
).
This approach enables updating multiple documents from one message.
To accomplish this, you must additionally set update_method="UpdateMany"
, otherwise
only the first encountered match will be updated.
If no match is made, it will instead create a new document with a random _id
(assuming upsert=True
) with the provided updates.
You can see an example with UpdateMany pattern matching below.
Include Message Metadata
You can include topic
(topic, partition, offset) and message
(key, headers, timestamp) metadata using the flags
add_topic_metadata=True
and add_message_metadata=True
for MongoDBSink
.
They will be included as
__{field}
in the document.
Example document with add_message_metadata=True
:
{
"field_x": "value_a",
"field_y": "value_b",
"__key": b"my_key",
"__headers": {},
"__timestamp": 1234567890,
}
Final Outgoing Value Editing
In case other callables need access to fields you would otherwise exclude in the
document, you can optionally provide a callable to value_selector
that receives the
current document as an argument, and returns the desired finalized outgoing document.
Note: any of the
add_*_metadata
flags will have already added their data.
Example
from quixstreams.sinks.community.mongodb import MongoDBSink
def edit_doc(my_doc: dict):
return {k: v for k,v in my_doc.items() if k not in ["age", "zip_code"]}
sink = MongoDBSink(
..., # other required stuff
value_selector=edit_doc,
)
How To Use
Create an instance of MongoDBSink
and pass it to the StreamingDataFrame.sink()
method:
import os
from quixstreams import Application
from quixstreams.sinks.community.mongodb import MongoDBSink
app = Application(broker_address="localhost:9092")
topic = app.topic("topic-name")
# Message structured as:
# key: "CID_12345"
# value: {"name": {"first": "John", "last": "Doe"}, "age": 28, "city": "Los Angeles"}
# Configure the sink
mongodb_sink = MongoDBSink(
url="mongodb://localhost:27017",
db="my_mongodb",
collection="people",
)
sdf = app.dataframe(topic=topic)
sdf.sink(mongodb_sink)
# MongoDB Document:
# {"_id": "CID_12345", "name": {"first": "John", "last": "Doe"}, "age": 28, "city": "Los Angeles"}
if __name__ == "__main__":
app.run()
An UpdateMany
example
Imagine we get messages that update various product offerings.
# Kafka message
key = "product_updates"
value = {"product_category": "Shirts", "color_options": "blue,black,red"}
document_matcher
to find all other products that match
this product_category
("Shirts") and update them to have these new color_options
:
mongodb_sink = MongoDBSink(
url="mongodb://localhost:27017",
db="my_mongodb",
collection="clothing",
# find all other documents with "Shirts" product category
document_matcher=lambda item: {"product_category": item.value["product_category"]},
# update every document that document_matcher finds
update_method="UpdateMany",
)
Configuration Options
url
: MongoDB url; most commonlymongodb://username:password@host:port
db
: MongoDB database namecollection
: MongoDB collection namedocument_matcher
: How documents are selected to update.
A callable that accepts aBatchItem
and returns a MongoDB "Filter Query".
If no match, will insert ifupsert=True
, where_id
will be either the included value if specified, else a randomObjectId
.
Default: matches on_id
, with_id
assumed to be the kafka key.upsert
: Create documents if no matches withdocument_matcher
.
Default: Trueupdate_method
: How documents found withdocument_matcher
are updated.
'Update' options will only update fields included in the kafka message.
'Replace' option fully replaces the document with the contents of kafka message.- "UpdateOne": Updates the first matching document (usually based on
_id
). - "UpdateMany": Updates ALL matching documents (usually NOT based on
_id
). - "ReplaceOne": Replaces the first matching document (usually based on
_id
).
Default: "UpdateOne".
- "UpdateOne": Updates the first matching document (usually based on
add_message_metadata
: include key, timestamp, and headers as__{field}
Default: Falseadd_topic_metadata
: include topic, partition, and offset as__{field}
Default: Falsevalue_selector
: An optional callable that allows final editing of the outgoing document (right before submitting it).
Largely used when a field is necessary fordocument_matcher
, but not otherwise.
NOTE: metadata is added before this step, so don't accidentally exclude it here!- Additional keyword arguments are passed to the
MongoClient
.
Error Handling and Delivery Guarantees
The sink provides at-least-once delivery guarantees, which means:
- Messages are published in batches for better performance
- During checkpointing, the sink waits for all pending publishes to complete
- If any messages fail to publish after several retries, a
SinkBackpressureError
is raised - When
SinkBackpressureError
occurs:- The application will retry the entire batch from the last successful offset
- Some messages that were successfully published in the failed batch may be published again
- This ensures no messages are lost, but some might be delivered more than once
This behavior makes the sink reliable but the downstream systems must be prepared to handle duplicate messages. If your application requires exactly-once semantics, you'll need to implement deduplication logic in your consumer.
Testing Locally
You can test your application using a local MongoDB host via Docker:
-
Execute in terminal:
-
Connect using the url:
mongodb://localhost:27017