Create an MQTT to SQL Data Pipeline
At the end of this tutorial, we will see data starting from an MQTT broker and ending in a PostgreSQL table.
We'll use 2 connectors:
- Inbound MQTT connector
- Outbound SQL connector
- There will be an example of combining multiple SmartModules, known as SmartModule chaining
The Outbound connector will be using a PostgreSQL database. It will listen to the topic for new records and insert them into a table.
You can use your own PostgreSQL instance, if it can be reached over the internet. But you can still follow along by creating a PostgreSQL database at a hosting service, such as ElephantSQL.
Setup
Start MQTT Connector
This connector expects to take json
input from the MQTT broker, from an MQTT topic named ag-mqtt-topic
. These parameters will be reflected in the final JSON payload that gets produced to the fluvio topic mqtt-topic
MQTT connector config: mqtt.yml
All versions are marked with x.y.z
. To find the latest version, run:
fluvio hub connector list
fluvio hub smartmodule list
# mqtt.yml
apiVersion: 0.1.0
meta:
version: x.y.z
name: fluvio-mqtt-connector
type: mqtt-source
topic: mqtt-topic
direction: source
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "ag-mqtt-topic"
timeout:
secs: 30
nanos: 0
payload_output_type: json
Create MQTT connector
$ fluvio cloud connector create --config mqtt.yml
Install mosquito
- MQTT client
First install mosquito to follow later steps for sending JSON to our test MQTT broker
-> On MacOS, you can install mosquitto
with homebrew with the following command:
brew install mosquitto
Start SQL connector(s)
You can start one of both of the following connectors
- Connector with no transformation
- Download SmartModule for example
- Example connector config
- Start connector
- Connector with extra JSON to JSON transformation
- Download SmartModules for example
- Example connector config
- Start connector
SQL Connector with no transformation
Download json-sql SmartModule
Example output
$ fluvio hub sm download infinyon/json-sql@x.y.z
downloading infinyon/json-sql@x.y.z to infinyon-json-sql-x.y.z.ipkg
... downloading complete
... checking package
trying connection to fluvio router.dev.infinyon.cloud:9003
... cluster smartmodule install complete
SQL Connector with no transformation config
# sql.yml
apiVersion: 0.1.0
meta:
name: fluvio-sql-connector
type: sql-sink
version: x.y.z
topic: mqtt-topic
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
- uses: infinyon/json-sql@x.y.z
with:
invoke: insert
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "payload.device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
Start No transformation connector SQL connector
$ fluvio cloud connector create --config sql.yml
Connector with JSON to JSON transformation
Download the Jolt and Json-Sql SmartModules used by this example connector
$ fluvio hub sm download infinyon/json-sql@0.2.1
$ fluvio hub smartmodule download infinyon/jolt@0.4.1
Expect the following output:
trying connection to fluvio router.infinyon.cloud:9003
downloading infinyon/json-sql@0.2.1 to infinyon-json-sql-0.2.1.ipkg
... downloading complete
... checking package
... cluster smartmodule install complete
trying connection to fluvio router.infinyon.cloud:9003
downloading infinyon/jolt@0.4.1 to infinyon-jolt-0.4.1.ipkg
... downloading complete
... checking package
... cluster smartmodule install complete
Connector with JSON to JSON transformation config
# sql-chain.yml
apiVersion: 0.1.0
meta:
name: fluvio-sql-connector-chain
type: sql-sink
version: x.y.z
topic: mqtt-topic
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
rust_log: "sql_sink=INFO,sqlx=WARN"
transforms:
- uses: infinyon/jolt@x.y.z
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
device:
type: "mobile"
- uses: infinyon/json-sql@x.y.z
with:
invoke: insert
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
Start SQL connector with JSON transformation
$ fluvio cloud connector create --config sql-chain.yml