MQTT Transport
Granit.IoT’s MQTT bridge connects your application to any MQTT 3.1.1 or 5.0 broker — Mosquitto, EMQX, HiveMQ, VerneMQ, Scaleway IoT Hub in MQTT mode, AWS IoT Core via forwarding, Azure IoT Hub via the MQTT endpoint, or a self-hosted broker. Two opt-in packages, mTLS via Vault, exponential-backoff reconnection, QoS 0–2.
When to use MQTT instead of webhooks
Section titled “When to use MQTT instead of webhooks”| Use the MQTT bridge | Use the webhook path |
|---|---|
| You consume from a broker (broker handles device fan-in, auth, retention) | Provider natively pushes HTTP (Scaleway Routes, AWS SNS) |
| You have no inbound port (firewalls, on-prem) | You have a public HTTPS endpoint |
| You need broker-level QoS 2 (exactly-once delivery) | Transport-level dedup is enough |
| You want messages buffered while your app is down | Provider retries are sufficient |
The two paths share exactly the same downstream pipeline — see Telemetry ingestion. Switching transports never touches your domain code.
Packages
Section titled “Packages”| Package | Role |
|---|---|
Granit.IoT.Mqtt | Abstractions — IIoTMqttBridge, IoTMqttConnectionStatus, message parser, signature validator |
Granit.IoT.Mqtt.Mqttnet | Implementation on top of MQTTnet — broker connection, mTLS, backoff, hosted service |
Both are opt-in — they are not pulled in by Granit.Bundle.IoT. Add
them explicitly:
dotnet add package Granit.IoT.Mqtt.MqttnetRegistration
Section titled “Registration”using Granit.IoT.Mqtt.Extensions;using Granit.IoT.Mqtt.Mqttnet.Extensions;
builder.Services .AddGranit(builder.Configuration) .AddIoT();
builder.Services.AddGranitIoTMqtt(); // abstractions + parserbuilder.Services.AddGranitIoTMqttMqttnet(); // MQTTnet implementationGranitIoTMqttMqttnetModule registers a hosted service — the broker
connection starts when the app boots and stops on shutdown.
Configuration — IoT:Mqtt
Section titled “Configuration — IoT:Mqtt”{ "IoT": { "Mqtt": { "BrokerUri": "mqtts://mqtt.example.com:8883", "ClientId": "granit-iot", "DefaultQoS": 1, "MaxPayloadBytes": 262144, "KeepAliveSeconds": 60, "FeatureFlagCacheSeconds": 30, "MaxPendingMessages": 1000, "CertificateExpiryWarningMinutes": 5 } }}| Key | Default | Purpose |
|---|---|---|
BrokerUri | (required) | Must use mqtts://. Plain mqtt:// is rejected — IoT telemetry is never sent in clear |
ClientId | granit-iot | MQTT client identifier sent to the broker |
DefaultQoS | 1 | Default QoS for subscriptions (0 = at-most-once, 1 = at-least-once, 2 = exactly-once) |
MaxPayloadBytes | 262144 (256 KB) | Oversize messages dropped, counted on granit.iot.ingestion.signature_rejected |
KeepAliveSeconds | 60 | Broker keep-alive interval |
FeatureFlagCacheSeconds | 30 | TTL for the “is the MQTT bridge enabled for tenant X” cache |
MaxPendingMessages | 1000 | In-flight message buffer; backpressure applied above this |
CertificateExpiryWarningMinutes | 5 | How early to re-fetch a rotating mTLS cert from Vault |
Connection lifecycle
Section titled “Connection lifecycle”stateDiagram-v2 [*] --> Stopped Stopped --> Starting : StartAsync() Starting --> Connected : mTLS OK + CONNACK Starting --> Faulted : Auth failure, invalid cert Connected --> Reconnecting : Disconnect / keep-alive timeout Reconnecting --> Connected : Backoff (1s → 30s max) Connected --> Stopped : StopAsync() Reconnecting --> Stopped : StopAsync() Faulted --> Stopped : StopAsync()
Reconnection uses exponential backoff capped at 30 seconds. The bridge
never gives up on a transient failure — only terminal auth or cert errors
move it to Faulted.
mTLS and Vault integration
Section titled “mTLS and Vault integration”The MQTTnet bridge loads its client certificate from
Granit.Encryption.Vault — never from disk or configuration. A
certificate rotated in Vault is picked up automatically
CertificateExpiryWarningMinutes before its NotAfter:
sequenceDiagram participant V as Vault participant B as MqttnetIoTBridge participant M as MQTT broker B->>V: Fetch cert (at startup) V-->>B: Cert (valid until T) B->>M: CONNECT (mTLS) M-->>B: CONNACK Note over B: Timer fires at T − 5 min B->>V: Re-fetch cert V-->>B: New cert (valid until T') B->>M: Reconnect with new cert
This removes the entire class of “expired TLS cert took down my IoT fleet” incidents.
Consuming telemetry — same pipeline as webhooks
Section titled “Consuming telemetry — same pipeline as webhooks”The MQTT bridge wires into the exact same ingestion pipeline used by
webhook providers. MqttMessageParser produces a ParsedTelemetryBatch,
the deduplicator runs against Redis, the outbox publishes
TelemetryIngestedEto, and TelemetryIngestedHandler persists — none of
the downstream code needs to care whether the message arrived over HTTP or
MQTT.
flowchart LR D["Devices"] -->|MQTT publish| B["MQTT broker"] B -->|subscription| M["MqttnetIoTBridge"] M --> P["MqttMessageParser"] P --> DEDUP["Redis dedup"] DEDUP --> OUT["Wolverine outbox"] OUT --> H["TelemetryIngestedHandler"] H --> DB["iot_telemetry_points"]
Observability
Section titled “Observability”| Metric | Meaning |
|---|---|
granit.iot.telemetry.ingested (source = mqtt) | Message processed end-to-end |
granit.iot.ingestion.signature_rejected (source = mqtt) | Oversize payload, invalid topic, missing auth |
granit.iot.ingestion.duplicate_skipped (source = mqtt) | Redis dedup hit (duplicate packet ID) |
MQTT connection state is also exported as an OpenTelemetry gauge — wire it
to Grafana to see Reconnecting storms before users notice.
Broker compatibility matrix
Section titled “Broker compatibility matrix”| Broker | Status | Notes |
|---|---|---|
| Mosquitto | Tested | Reference broker — works with any client cert chain |
| EMQX | Tested | Supports both 3.1.1 and 5.0 |
| HiveMQ | Tested | Tested with HiveMQ Cloud |
| VerneMQ | Tested | Tested with VerneMQ 1.x cluster |
| Scaleway IoT Hub (MQTT mode) | Tested | Alternative to the Scaleway webhook provider |
| AWS IoT Core | Works | mTLS + per-topic IAM policy. For full provisioning / shadow / jobs see AWS bridge |
| Azure IoT Hub | Works | SAS token on CONNECT |
Anti-patterns to avoid
Section titled “Anti-patterns to avoid”See also
Section titled “See also”- Telemetry ingestion — the shared pipeline the bridge feeds
- Device management — the
Deviceaggregate that getsLastHeartbeatAtupdated - Operations — heartbeat timeout detection
- Security overview — framework security baseline including Vault-backed secrets