MQTT Source
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This source enables reading from an MQTT broker, dumping it to a
kafka topic using desired StreamingDataFrame-based transformations.
How To Install
To use the MQTT source, you need to install the required dependencies:
How It Works
MQTTSource subscribes to an MQTT topic and produces its messages to a Kafka topic.
Messages are read in a streaming fashion and processed as they arrive, offering real-time data ingestion from MQTT brokers.
The source supports various MQTT protocol versions and provides customizable message handling through key, value, and timestamp setters.
You can learn more details about the expected kafka message format below.
How To Use
To use MQTT Source, hand MQTTSource to app.dataframe().
For more details around various settings, see configuration.
from quixstreams import Application
from quixstreams.sources.community.mqtt import MQTTSource
mqtt_source = MQTTSource(
topic="sensors/temperature",
client_id="my-mqtt-client",
server="mqtt.broker.com",
port=1883,
username="your_username",
password="your_password",
version="3.1.1",
)
app = Application(
broker_address="localhost:9092",
consumer_group="mqtt-consumer",
)
sdf = app.dataframe(source=mqtt_source).print(metadata=True)
# YOUR LOGIC HERE!
if __name__ == "__main__":
app.run()
Configuration
Here are some important configurations to be aware of (see MQTT Source API for all parameters).
Required:
topic: MQTT topic to subscribe to. Use '#' as a wildcard for consuming from a base/prefix (e.g., "my-topic-base/#").client_id: MQTT client identifier.server: MQTT broker server address.port: MQTT broker server port.
Optional:
username: Username for MQTT broker authentication.password: Password for MQTT broker authentication.version: MQTT protocol version ("3.1", "3.1.1", or "5"). Default:"3.1.1"tls_enabled: Whether to use TLS encryption. Default:Trueqos: Quality of Service level (0 or 1; 2 not yet supported). Default:1payload_deserializer: An optional payload deserializer. Useful when payloads are used by key, value, or timestamp setters. Default: JSON deserializerkey_setter: Function to extract message key from MQTT message. Default: Uses "_key" from payload or topic namevalue_setter: Function to extract message value from MQTT message. Default: Raw message payloadtimestamp_setter: Function to extract timestamp from MQTT message. Default: Uses "_timestamp" from payload or message timestampproperties: MQTT properties (MQTT v5 only).on_client_connect_success: Optional callback for successful client authentication.on_client_connect_failure: Optional callback for failed client authentication.
Message Data Format/Schema
This is the default format of messages handled by Application:
-
Message
keywill be extracted using thekey_setterfunction. By default, it uses the "_key" field from the payload (if JSON) or falls back to the MQTT topic name. -
Message
valuewill be extracted using thevalue_setterfunction. By default, it returns the raw MQTT message payload in bytes. -
Message
timestampwill be extracted using thetimestamp_setterfunction. By default, it uses the "_timestamp" field from the payload (if JSON) or falls back to the MQTT message timestamp.
Processing/Delivery Guarantees
MQTTSource processing guarantees depend on the configured QoS level:
- QoS 0: At most once delivery - messages are considered read without any explicit acknowledgement
- QoS 1: At least once delivery - messages are considered read after successful client acknowledgement
NOTE: Kafka-level guarantees are
at-least-once.
Testing Locally
You can test MQTTSource locally using a local MQTT broker like Mosquitto:
-
Run Mosquitto with custom config:
-
Configure
MQTTSourceto connect to it: