Skip to content

IoT Telemetry Ingestion

This guide covers Ring 2 of Granit.IoT — the ingestion pipeline. Four packages build a three-stage flow (validate → parse → dedup → publish) that returns 202 Accepted in under a second at P99, even at 100k devices emitting every 10 seconds.

Teams wiring their first IoT provider hit the same five problems every time:

  • Blocking webhooks. Inline persistence and threshold evaluation slow the HTTP response and trip provider retry timers. The provider retries, you persist twice.
  • Forgotten signature validation. A webhook endpoint without HMAC validation is a public insert into your database.
  • Duplicate messages. MQTT redelivery, provider retries, and network flaps push the same payload multiple times. Without dedup, alerts fire twice.
  • Provider lock-in. Scaleway ships one envelope, AWS another, Azure a third. Handling each in your domain code makes the code rot.
  • Silent thresholds. Alert rules live in spreadsheets or in handler if-chains that can’t be overridden per tenant.

Ring 2 closes all five with a three-stage pipeline plus the Wolverine transactional outbox — validate-and-acknowledge stays synchronous, persist and evaluate happen asynchronously.

flowchart LR
  IN["POST /iot/ingest/{source}"] --> SIG["1. Signature validation<br/>(HMAC-SHA256 / SigV4)"]
  SIG -- valid --> PARSE["2. Provider parser<br/>→ ParsedTelemetryBatch"]
  SIG -- invalid --> R401["401 Unauthorized"]
  PARSE --> DEDUP{"3. Dedup<br/>Redis SETNX<br/>TTL 5 min"}
  DEDUP -- new --> LOOKUP["Device lookup<br/>by serial number"]
  DEDUP -- duplicate --> R202A["202 Accepted (no-op)"]
  LOOKUP -- known --> PUBLISH["TelemetryIngestedEto<br/>→ Wolverine outbox"]
  LOOKUP -- unknown --> UNKNOWN["DeviceUnknownEto<br/>→ Wolverine outbox"]
  PUBLISH --> R202B["202 Accepted"]
  UNKNOWN --> R202B
  PUBLISH -. async .-> HANDLER["TelemetryIngestedHandler"]
  HANDLER --> DB["INSERT TelemetryPoint"]
  HANDLER --> HB["UPDATE Device.LastHeartbeatAt"]
  HANDLER --> TH["Evaluate thresholds"]
  TH --> THEXC["TelemetryThresholdExceededEto<br/>→ Notifications"]

Stages 1 and 2 are synchronous — they must succeed before returning 202. Stage 3 and everything downstream are asynchronous via Wolverine’s transactional outbox, so a slow database never blocks the HTTP response.

Each provider plugs into three extension points:

public interface IPayloadSignatureValidator
{
string SourceName { get; } // "scaleway", "awsiotsns", ...
ValueTask<SignatureValidationResult> ValidateAsync(
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string> headers,
CancellationToken ct = default);
}
public interface IInboundMessageParser
{
string SourceName { get; } // matches validator's SourceName
ValueTask<ParsedTelemetryBatch> ParseAsync(
ReadOnlyMemory<byte> body,
CancellationToken ct = default);
}
public interface IInboundMessageDeduplicator
{
Task<bool> TryAcquireAsync(string sanitizedMessageId, CancellationToken ct = default);
}

IIngestionPipeline.ProcessAsync(source, body, headers, ct) looks up the validator and parser by SourceName (case-insensitive), runs the three stages, and publishes the right ETO. Adding a new provider is two new classes.

Whatever the provider ships, the parser produces:

public sealed record ParsedTelemetryBatch(
string MessageId,
string DeviceExternalId,
DateTimeOffset RecordedAt,
IReadOnlyDictionary<string, double> Metrics,
string Source,
IReadOnlyDictionary<string, string>? Tags);

Downstream handlers only ever see this shape — they never care that it came from a Scaleway Base64 envelope or an AWS SNS message.

The ingestion endpoint — POST /iot/ingest/{source}

Section titled “The ingestion endpoint — POST /iot/ingest/{source}”

Mapped by MapGranitIoTIngestionEndpoints():

StatusMeaning
202 AcceptedSignature valid, payload parsed, dispatched to outbox (or duplicate silently accepted)
400 Bad RequestMalformed payload (Base64 fail, missing field, invalid JSON)
401 UnauthorizedSignature validation failed
415 Unsupported Media TypeNon-JSON content type
422 Unprocessable EntityNo parser registered for the {source} path parameter

The endpoint buffers the request body up to 256 KB (enough for even the chattiest MQTT payload), snapshots headers into a case-insensitive dictionary, and delegates to IIngestionPipeline.ProcessAsync().

Rate limiting is enforced at the route group level via Granit.RateLimiting (iot-ingest policy by default) — configure per-tenant limits under RateLimiting:Policies:iot-ingest in appsettings.json.

Granit.IoT.Ingestion.Scaleway is the production-ready provider for Scaleway IoT Hub. It ships three internal services registered by AddGranitIoTIngestionScaleway():

ServiceResponsibility
ScalewaySignatureValidatorValidates X-Scaleway-Signature (HMAC-SHA256) via CryptographicOperations.FixedTimeEquals
ScalewayMessageParserDecodes the JSON envelope, Base64-decodes the payload, extracts metrics
ScalewayTopicMapperExtracts the device serial from the MQTT topic (devices/{serial}/... by default)
{
"topic": "devices/ACME-TH-001/telemetry",
"message_id": "550e8400-e29b-41d4-a716-446655440000",
"payload": "eyJ0ZW1wIjoyMi41LCJodW1pZGl0eSI6NDUuMH0=",
"qos": 1,
"timestamp": "2026-05-20T12:34:56Z"
}

The payload decodes to {"temp": 22.5, "humidity": 45.0}, which becomes the Metrics dictionary in ParsedTelemetryBatch.

{
"IoT": {
"Ingestion": {
"Scaleway": {
"SharedSecret": "__FROM_SECRET_STORE__",
"TopicDeviceSegmentIndex": 1
}
}
}
}

Options bind via IOptionsMonitor<ScalewayIoTOptions> — rotating the shared secret is a configuration reload, no restart required.

Granit.IoT.Ingestion.Aws ships three inbound sources:

Source keyPathSignature
awsiotsnsSNS HTTP subscription endpointRSA-SHA256 (SNS signing cert, cached behind a CDN-guarded validator)
awsiotdirectDevice → API directly (mTLS-fronted)SigV4
awsiotapigwDevice → API Gateway → appSigV4

All three terminate at the same POST /iot/ingest/{source} route and emit the same ParsedTelemetryBatch. SigV4 is verified against the AWS public get-vanilla test vector byte-for-byte.

Deduplication — transport-level, Redis-backed, fail-open

Section titled “Deduplication — transport-level, Redis-backed, fail-open”

IdempotencyStoreInboundMessageDeduplicator backs onto Redis via IIdempotencyStore (from Granit.Http.Idempotency). The key is the sanitised transport message ID (trimmed, capped at 128 chars, non-alphanumeric replaced with -), prefixed iot-msg:.

ParameterDefaultKey
TTL5 minutesIoT:Ingestion:DeduplicationWindowMinutes

TelemetryIngestedHandler consumes TelemetryIngestedEto from the outbox:

sequenceDiagram
  participant O as Wolverine outbox
  participant H as TelemetryIngestedHandler
  participant TW as ITelemetryWriter
  participant DW as IDeviceWriter
  participant TE as IDeviceThresholdEvaluator
  participant EB as IDistributedEventBus
  O->>H: TelemetryIngestedEto
  H->>TW: AppendAsync(TelemetryPoint)
  H->>DW: UpdateHeartbeatAsync(deviceId, now)
  H->>TE: EvaluateAsync(metrics, ...)
  TE-->>H: TelemetryThresholdExceededEto[]
  loop per breach
    H->>EB: PublishAsync(TelemetryThresholdExceededEto)
  end

Everything in the handler runs inside a single Wolverine transaction — if the threshold publication fails, the telemetry persist rolls back and the message is retried. No half-written state.

IDeviceThresholdEvaluator is the extension point. The default implementation SettingsDeviceThresholdEvaluator reads thresholds via Granit.Settings:

public interface IDeviceThresholdEvaluator
{
Task<IReadOnlyList<TelemetryThresholdExceededEto>> EvaluateAsync(
Guid deviceId, Guid? tenantId,
IReadOnlyDictionary<string, double> metrics,
DateTimeOffset recordedAt,
CancellationToken ct = default);
}

Settings key format: IoT:Threshold:{metricName} — for example IoT:Threshold:temperature = 28.5. The cascade order is User → Tenant → Global → Configuration → Default, so a single operator can raise the threshold on one noisy device without touching the global baseline.

Every pipeline stage emits OpenTelemetry counters. Scrape them with the standard Prometheus exporter registered by Granit.Diagnostics:

CounterTagsFires when
granit.iot.telemetry.ingestedtenant_id, sourceTelemetryPoint persisted
granit.iot.ingestion.signature_rejectedtenant_id, sourceHMAC / SigV4 validation failed
granit.iot.ingestion.duplicate_skippedtenant_id, sourceRedis dedup hit
granit.iot.ingestion.unknown_devicetenant_id, sourceSerial not registered
granit.iot.ingestion.threshold_exceededtenant_id, metric_nameThreshold breached
granit.iot.alerts.throttledtenant_id, metric_nameNotification suppressed by the bridge
granit.iot.device.offline_detectedtenant_idHeartbeat job flagged a device
granit.iot.background.telemetry_purgedtenant_idPurge job deleted rows
granit.iot.background.partition_createdpartition_namePartition maintenance job ran

Alert routing goes through Granit.Notifications — see Notifications bridge.