Fluvio MQTT Connector
Official Infinyon MQTT connector
Reads record from MQTT topic and writes to Fluvio topic.
Supports MQTT V3.1.1 and V5 protocols.
See docs here. Tutorial for MQTT to SQL Pipeline.
Option | default | type | description |
---|---|---|---|
timeout | 60s | Duration | mqtt broker connect timeout in seconds and nanoseconds |
url | - | SecretString | MQTT url which includes schema, domain, port and credentials such as username and password. |
topic | - | String | mqtt topic to subscribe and source events from |
client_id | UUID V4 | String | mqtt client ID. Using same client id in different connectors may close connection |
payload_output_type | binary | String | controls how the output of payload field is produced |
url
option with type SecretString
can be set as raw string value:
url: "mqtt://test.mosquitto.org/"
or, as a reference to a secret with the given name:
url:
secret:
name: "URL_SECRET_NAME"
JSON Serialized string with fields mqtt_topic
and payload
Value | Output |
---|---|
binary | Array of bytes |
json | UTF-8 JSON Serialized String |
This is an example of connector config file:
# sample-config.yaml
apiVersion: 0.1.0
meta:
version: 0.2.5
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json
Run connector locally using cdk
tool (from root directory or any sub-directory):
fluvio install cdk
cdk deploy start --config sample-config.yaml
cdk deploy list # to see the status
cdk deploy log my-mqtt-connector # to see connector's logs
Install MQTT Client such as
# for mac , this takes while....
brew install mosquitto
Insert records:
mosquitto_pub -h test.mosquitto.org -t mqtt-to-fluvio -m '{"device": {"device_id":1, "name":"device1"}}'
The produced record in Fluvio topic will be:
{
"mqtt_topic": "mqtt-to-fluvio",
"payload": {
"device": {
"device_id": 1,
"name": "device1"
}
}
}
Fluvio MQTT Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.
The previous example can be extended to add extra transformations to outgoing records:
# sample-config.yaml
apiVersion: 0.1.0
meta:
version: 0.2.5
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/jolt@0.1.0
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
source: "mqtt-connector"
The object device
in the resulting record will be “unwrapped” and the addition field source
with value mqtt-connector
will be added.
Read more about JSON to JSON transformations.