Skip to content

MQTT Transport

Granit.IoT’s MQTT bridge connects your application to any MQTT 3.1.1 or 5.0 broker — Mosquitto, EMQX, HiveMQ, VerneMQ, Scaleway IoT Hub in MQTT mode, AWS IoT Core via forwarding, Azure IoT Hub via the MQTT endpoint, or a self-hosted broker. Two opt-in packages, mTLS via Vault, exponential-backoff reconnection, QoS 0–2.

Use the MQTT bridgeUse the webhook path
You consume from a broker (broker handles device fan-in, auth, retention)Provider natively pushes HTTP (Scaleway Routes, AWS SNS)
You have no inbound port (firewalls, on-prem)You have a public HTTPS endpoint
You need broker-level QoS 2 (exactly-once delivery)Transport-level dedup is enough
You want messages buffered while your app is downProvider retries are sufficient

The two paths share exactly the same downstream pipeline — see Telemetry ingestion. Switching transports never touches your domain code.

PackageRole
Granit.IoT.MqttAbstractions — IIoTMqttBridge, IoTMqttConnectionStatus, message parser, signature validator
Granit.IoT.Mqtt.MqttnetImplementation on top of MQTTnet — broker connection, mTLS, backoff, hosted service

Both are opt-in — they are not pulled in by Granit.Bundle.IoT. Add them explicitly:

Terminal window
dotnet add package Granit.IoT.Mqtt.Mqttnet
using Granit.IoT.Mqtt.Extensions;
using Granit.IoT.Mqtt.Mqttnet.Extensions;
builder.Services
.AddGranit(builder.Configuration)
.AddIoT();
builder.Services.AddGranitIoTMqtt(); // abstractions + parser
builder.Services.AddGranitIoTMqttMqttnet(); // MQTTnet implementation

GranitIoTMqttMqttnetModule registers a hosted service — the broker connection starts when the app boots and stops on shutdown.

{
"IoT": {
"Mqtt": {
"BrokerUri": "mqtts://mqtt.example.com:8883",
"ClientId": "granit-iot",
"DefaultQoS": 1,
"MaxPayloadBytes": 262144,
"KeepAliveSeconds": 60,
"FeatureFlagCacheSeconds": 30,
"MaxPendingMessages": 1000,
"CertificateExpiryWarningMinutes": 5
}
}
}
KeyDefaultPurpose
BrokerUri(required)Must use mqtts://. Plain mqtt:// is rejected — IoT telemetry is never sent in clear
ClientIdgranit-iotMQTT client identifier sent to the broker
DefaultQoS1Default QoS for subscriptions (0 = at-most-once, 1 = at-least-once, 2 = exactly-once)
MaxPayloadBytes262144 (256 KB)Oversize messages dropped, counted on granit.iot.ingestion.signature_rejected
KeepAliveSeconds60Broker keep-alive interval
FeatureFlagCacheSeconds30TTL for the “is the MQTT bridge enabled for tenant X” cache
MaxPendingMessages1000In-flight message buffer; backpressure applied above this
CertificateExpiryWarningMinutes5How early to re-fetch a rotating mTLS cert from Vault
stateDiagram-v2
  [*] --> Stopped
  Stopped --> Starting : StartAsync()
  Starting --> Connected : mTLS OK + CONNACK
  Starting --> Faulted : Auth failure, invalid cert
  Connected --> Reconnecting : Disconnect / keep-alive timeout
  Reconnecting --> Connected : Backoff (1s → 30s max)
  Connected --> Stopped : StopAsync()
  Reconnecting --> Stopped : StopAsync()
  Faulted --> Stopped : StopAsync()

Reconnection uses exponential backoff capped at 30 seconds. The bridge never gives up on a transient failure — only terminal auth or cert errors move it to Faulted.

The MQTTnet bridge loads its client certificate from Granit.Encryption.Vaultnever from disk or configuration. A certificate rotated in Vault is picked up automatically CertificateExpiryWarningMinutes before its NotAfter:

sequenceDiagram
  participant V as Vault
  participant B as MqttnetIoTBridge
  participant M as MQTT broker
  B->>V: Fetch cert (at startup)
  V-->>B: Cert (valid until T)
  B->>M: CONNECT (mTLS)
  M-->>B: CONNACK
  Note over B: Timer fires at T − 5 min
  B->>V: Re-fetch cert
  V-->>B: New cert (valid until T')
  B->>M: Reconnect with new cert

This removes the entire class of “expired TLS cert took down my IoT fleet” incidents.

Consuming telemetry — same pipeline as webhooks

Section titled “Consuming telemetry — same pipeline as webhooks”

The MQTT bridge wires into the exact same ingestion pipeline used by webhook providers. MqttMessageParser produces a ParsedTelemetryBatch, the deduplicator runs against Redis, the outbox publishes TelemetryIngestedEto, and TelemetryIngestedHandler persists — none of the downstream code needs to care whether the message arrived over HTTP or MQTT.

flowchart LR
  D["Devices"] -->|MQTT publish| B["MQTT broker"]
  B -->|subscription| M["MqttnetIoTBridge"]
  M --> P["MqttMessageParser"]
  P --> DEDUP["Redis dedup"]
  DEDUP --> OUT["Wolverine outbox"]
  OUT --> H["TelemetryIngestedHandler"]
  H --> DB["iot_telemetry_points"]
MetricMeaning
granit.iot.telemetry.ingested (source = mqtt)Message processed end-to-end
granit.iot.ingestion.signature_rejected (source = mqtt)Oversize payload, invalid topic, missing auth
granit.iot.ingestion.duplicate_skipped (source = mqtt)Redis dedup hit (duplicate packet ID)

MQTT connection state is also exported as an OpenTelemetry gauge — wire it to Grafana to see Reconnecting storms before users notice.

BrokerStatusNotes
MosquittoTestedReference broker — works with any client cert chain
EMQXTestedSupports both 3.1.1 and 5.0
HiveMQTestedTested with HiveMQ Cloud
VerneMQTestedTested with VerneMQ 1.x cluster
Scaleway IoT Hub (MQTT mode)TestedAlternative to the Scaleway webhook provider
AWS IoT CoreWorksmTLS + per-topic IAM policy. For full provisioning / shadow / jobs see AWS bridge
Azure IoT HubWorksSAS token on CONNECT