Mergeable — Generic Aggregate Merge Framework
Multi-channel data entry produces duplicates: the same customer created twice via two CRM imports, a supplier mapped under two external providers, a party captured both as an individual and as their employer. Without a merge primitive those duplicates corrupt every aggregate that references them — invoices issued against the wrong party, ledger fragmented across two balances, mappings in conflict.
Granit.Mergeable is a generic merge framework: define an IMergeable<TSelf>
aggregate, register one IReferenceRewriter<TSelf> per module that holds an FK to
it, and the orchestrator handles the rest — preview, validation, advisory locking,
bulk-SQL fan-out, tombstone, audit, integration event. The first consumer is
Granit.Parties.Mergeable; the same shape applies to Granit.Catalog.Product
deduplication and any future Lead / Opportunity / Asset aggregate.
How it works
Section titled “How it works”The orchestrator (EfMergeService<TAggregate> in
Granit.Mergeable.EntityFrameworkCore) runs every merge inside a single
TransactionScope at IsolationLevel.Serializable — under the framework’s
single-PostgreSQL deployment assumption, every participating DbContext enrols
into the same physical transaction (no DTC). Rewriter failure rolls back
everything atomically.
sequenceDiagram
participant Endpoint as POST /parties/{id}/merge
participant Orch as EfMergeService<TAggregate>
participant Idem as merge_idempotency
participant Lock as pg_advisory_xact_lock
participant Adapter as IMergeableAggregateAdapter<T>
participant Rewriters as IReferenceRewriter<T>... (DI list)
participant Outbox as Wolverine Outbox
Endpoint->>Orch: MergeAsync(MergeRequest)
Orch->>Idem: Lookup (key, hash)
alt Hit + same hash
Idem-->>Orch: cached MergeResult
Orch-->>Endpoint: 200 (replay)
else
Orch->>Lock: BEGIN tx (Serializable) + advisory lock per tenant
Orch->>Adapter: LoadAsync(survivor) + LoadAsync(loser)
Orch->>Orch: validate invariants + GetConflicts
alt DryRun
Rewriters-->>Orch: CountAsync (no UPDATE)
Orch->>Lock: ROLLBACK
Orch-->>Endpoint: MergeResult { DryRun=true, Conflicts, Counts }
else Live
Orch->>Orch: survivor.MergeFrom(loser, choices)
Orch->>Adapter: ApplyTombstone(loser)
Orch->>Adapter: CollapseChainTombstonesAsync
Rewriters-->>Orch: RewriteAsync (bulk UPDATE)
Orch->>Adapter: PersistMergedPairAsync
Orch->>Outbox: PartyMergedEto enqueued (same SaveChanges)
Orch->>Lock: COMMIT
Orch->>Idem: UPSERT (best-effort, post-commit)
Orch-->>Endpoint: 200 MergeResult
end
end
Two key design points:
- Locking is tenant-wide. A
pg_advisory_xact_lockkeyed onhashtext('party-merge-' || tenantId)serialises concurrent merges within a tenant; merges across tenants stay parallel. Layered on top,SELECT … FOR UPDATEon both aggregate rows protects against intra-tenant interleaving (e.g. two admins picking the same loser). - Audit log is not rewritten. Bulk-SQL rewriters target persisted typed FK
columns. The audit log is intentionally immutable — the query layer joins on
parties.MergedIntoIdto render “Party X (merged into Y on …)”. Same goes for in-flight payloads (Wolverine outbox/inbox, scheduled jobs, webhooks): they are resolved on receipt viaResolveCurrentAsyncrather than rewritten in place. See tombstone follow-through.
Authoring a mergeable aggregate
Section titled “Authoring a mergeable aggregate”Two interfaces, both in the framework:
flowchart LR
A[Aggregate] -->|state-only marker| B[IHasMergeTombstone]
A -->|behavior + state| C[IMergeable<TSelf>]
B -->|EF auto-applies| D[MergedIntoId column<br/>+ MergedAt column<br/>+ index<br/>+ MergeTombstone query filter]
C -->|inherits| B
style A fill:#0ea5e9,color:#fff
style C fill:#6366f1,color:#fff
IHasMergeTombstone — state only
Section titled “IHasMergeTombstone — state only”Marker for the tombstone state, decoupled from the merge service so EF Core,
listings, query filters, and a future un-merge endpoint can target it without
referencing Granit.Mergeable:
public interface IHasMergeTombstone{ Guid? MergedIntoId { get; } // null on survivor; survivor.Id on loser DateTimeOffset? MergedAt { get; }}Implementing it is enough to get all the EF plumbing for free —
ApplyGranitConventions detects the interface and:
- Adds a nullable
MergedIntoId(Guid?) andMergedAt(DateTimeOffset?) column. - Indexes
MergedIntoId(powers chain-collapse and admin “merged-out” listings). - Registers the named query filter
GranitFilterNames.MergeTombstonewith expressione.MergedIntoId == null— every standard query excludes tombstoned rows by default, the same wayISoftDeletableexcludes soft-deleted ones.
No per-module mapping code, no migration template to copy.
IMergeable<TSelf> — behavior + state
Section titled “IMergeable<TSelf> — behavior + state”public interface IMergeable<TSelf> : IHasMergeTombstone where TSelf : Entity, IMergeable<TSelf>{ IReadOnlyList<FieldConflict> GetConflicts(TSelf loser); void MergeFrom(TSelf loser, MergeFieldChoices choices);}GetConflicts returns a list of FieldConflict(FieldPath, SurvivorValue, LoserValue, Default) — empty when survivor and loser are scalar-equal; one entry
per differing scalar field otherwise. The orchestrator surfaces the list as
MergeResult.Conflicts for the preview UI.
MergeFrom folds the loser into the survivor using MergeFieldChoices —
overrides for individual fields, defaulting to the recommended WinnerSide from
GetConflicts when unspecified. Implementations throw MergeException for
hard invariants (tenant mismatch, kind mismatch, default-currency mismatch,
already-archived aggregates). Hard invariants cannot be overridden by an admin
choice — they translate to HTTP 422 at the endpoint boundary.
Party — reference implementation
Section titled “Party — reference implementation”public class Party : AuditedAggregateRoot, IMultiTenant, IHasMetadata, IMergeable<Party>{ public Guid? MergedIntoId { get; private set; } public DateTimeOffset? MergedAt { get; private set; }
public IReadOnlyList<FieldConflict> GetConflicts(Party loser) => // Compare Name, Website, TaxStatus, ExternalMappings, Metadata… Empty when equal. BuildConflictList(loser);
public void MergeFrom(Party loser, MergeFieldChoices choices) { // Hard invariants — non-overridable, translate to 422. if (loser.TenantId != TenantId) throw new MergeException("tenant mismatch"); if (loser.Kind != Kind) throw new MergeException("kind mismatch"); if (loser.DefaultCurrency != DefaultCurrency) throw new MergeException("currency mismatch");
// Per-field choices — defaults documented per field, override at merge time. if (choices.ResolveOrDefault("Name", WinnerSide.Survivor) == WinnerSide.Loser) Rename(loser.Name);
// Roles always union (bitwise OR — no conflict possible). AddRoles(loser.Roles);
// Metadata: merge dictionaries with per-key overrides ("Metadata.segment" → Loser). MergeMetadataFrom(loser, choices); }
// Internal mutation point used by IMergeableAggregateAdapter.ApplyTombstone — the // orchestrator never touches private setters via reflection. internal void MarkAsMergedInto(Guid survivorId, DateTimeOffset mergedAt) { MergedIntoId = survivorId; MergedAt = mergedAt; Status = PartyStatus.Archived; }}Authoring a reference rewriter
Section titled “Authoring a reference rewriter”One per module that holds a typed FK to the mergeable aggregate. The rewriter
inlines inside the module’s existing *.EntityFrameworkCore package — early
plans called for a dedicated *.Mergeable package per module, but a 50-line SQL
class does not justify a new csproj + DI extension + test project + CI shard
wiring. The architecture test
(MergeableConventionTests) accepts both *.EntityFrameworkCore
and *.Mergeable placements.
namespace Granit.Invoicing.EntityFrameworkCore.Internal;
internal sealed class InvoicePartyReferenceRewriter( IDbContextFactory<InvoicingDbContext> contextFactory) : IReferenceRewriter<Party>{ public string Description => "Invoice.PartyId"; // Surfaced in audit + preview UI
public async Task<int> RewriteAsync( Guid survivorId, Guid loserId, CancellationToken cancellationToken) { await using InvoicingDbContext db = await contextFactory .CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
PartyId loser = PartyId.Create(loserId); PartyId survivor = PartyId.Create(survivorId);
return await db.Invoices .Where(i => i.PartyId == loser) .ExecuteUpdateAsync(s => s.SetProperty(i => i.PartyId, survivor), cancellationToken) .ConfigureAwait(false); }
public async Task<int> CountAsync( Guid survivorId, Guid loserId, CancellationToken cancellationToken) { await using InvoicingDbContext db = await contextFactory .CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
PartyId loser = PartyId.Create(loserId);
return await db.Invoices .Where(i => i.PartyId == loser) .CountAsync(cancellationToken) .ConfigureAwait(false); }}Two methods, both bulk SQL:
| Method | Purpose | SQL shape |
|---|---|---|
RewriteAsync | Live merge — re-points FK | UPDATE … SET partyid = @survivor WHERE partyid = @loser |
CountAsync | Dry-run preview — no mutation | SELECT COUNT(*) FROM … WHERE partyid = @loser |
Always ExecuteUpdateAsync, never the change tracker. Two reasons:
- Performance. Tables with millions of rows (
invoice_line_items, usage events) would crash an EF-tracked rewrite. Bulk SQL completes sub-second on a warm connection regardless of cardinality. - EF child-collection trap. Loading the loser’s children with the change tracker, then re-attaching them to the survivor, conflicts with EF’s PK and shadow-FK tracking. Bulk SQL bypasses the tracker entirely.
Register the rewriter in the module’s existing
AddGranit{Module}EntityFrameworkCore(...) extension:
public static IHostApplicationBuilder AddGranitInvoicingEntityFrameworkCore( this IHostApplicationBuilder builder, Action<DbContextOptionsBuilder> configure){ // ... existing DbContext + interceptor wiring ... builder.Services.AddReferenceRewriter<Party, InvoicePartyReferenceRewriter>(); return builder;}The orchestrator discovers all IReferenceRewriter<Party> registrations from DI
at merge time and fans out into them inside the same TransactionScope.
Authoring an aggregate adapter
Section titled “Authoring an aggregate adapter”The orchestrator delegates aggregate-specific concerns to
IMergeableAggregateAdapter<TAggregate>, implemented by the consuming module
(Granit.Parties.Mergeable.PartyMergeableAggregateAdapter):
public interface IMergeableAggregateAdapter<TAggregate> where TAggregate : Entity, IMergeable<TAggregate>{ Task<TAggregate?> LoadAsync(Guid id, CancellationToken cancellationToken); Task PersistMergedPairAsync(TAggregate survivor, TAggregate loser, CancellationToken cancellationToken); void ApplyTombstone(TAggregate loser, Guid survivorId, DateTimeOffset mergedAt); Task<int> CollapseChainTombstonesAsync(Guid newSurvivorId, Guid oldSurvivorId, CancellationToken cancellationToken);}Why an adapter rather than reaching into the aggregate via reflection:
LoadAsyncis responsible for disabling theIHasMergeTombstonefilter when the merge orchestrator needs to observe a previously-merged loser (chain merges, recovery scenarios). The default filter would hide it.ApplyTombstoneroutes through the aggregate’s internal mutation point (e.g.Party.MarkAsMergedInto) — keeping the framework on the public side of the encapsulation boundary.CollapseChainTombstonesAsyncrewrites earlier tombstones when a chain merge happens (A→B, then laterB→CrewritesA.MergedIntoIdfromBtoC). Single-hopResolveCurrentAsyncalways reaches the final survivor.PersistMergedPairAsyncisolates the SaveChanges call so the orchestrator can stay agnostic of whichDbContextowns the aggregate.
Idempotency
Section titled “Idempotency”Two layers, on purpose. The HTTP layer
(Granit.Http.Idempotency middleware) absorbs network retries with the same
Idempotency-Key. The DB layer protects against an SDK that regenerates the key
between retries of the same logical intent:
CREATE TABLE granit.merge_idempotency ( id uuid PRIMARY KEY, key text NOT NULL, request_hash text NOT NULL, -- SHA-256(JSON(MergeRequest)) survivor_id uuid NOT NULL, loser_id uuid NOT NULL, result_json text NOT NULL, -- cached MergeResult<T> payload created_at timestamptz NOT NULL, UNIQUE (key, request_hash));The orchestrator looks up (key, request_hash) before opening the transaction:
| Lookup outcome | Behaviour |
|---|---|
| Hit, same hash | Return the cached MergeResult — no DB work, no double rewrite |
| Hit, different hash | Throw MergeException → 409 Conflict (key reused for a different request) |
| Miss | Proceed; UPSERT the entry post-commit (best-effort) |
Entries are kept 24 h. Garbage collection is a future background job — for now the table grows linearly with merge volume, which is single-digit per tenant per day in steady state.
A host that prefers a single shared database over the isolated module DbContext can fold the table into its own model:
// In the host application's DbContext.OnModelCreating:modelBuilder.ConfigureMergeableModule();Tombstone follow-through
Section titled “Tombstone follow-through”Bulk SQL rewrites cover persisted typed FK columns. A PartyId may also live
in JSON-serialised payloads SQL cannot rewrite cleanly:
| Category | Where | Why no SQL rewrite |
|---|---|---|
| Wolverine messages | wolverine_outbox, wolverine_inbox | Schema-less JSON payload, PartyId at any depth |
| Scheduled jobs | scheduling_deferred_actions | Same |
| Pending webhooks | webhook_envelopes | Same |
| Queued notifications | notifications_pending | Same |
| Already-published events | External brokers | Out of our control |
| Audit log | audit_entries.EntityId | Immutable history — MUST NOT rewrite |
Consumers resolve the PartyId on receipt rather than relying on the rewrite:
public class FinaliseInvoiceHandler{ public static async Task HandleAsync( FinaliseInvoiceCommand cmd, IPartyReader reader, IDataFilter dataFilter, IInvoicingService invoicing, CancellationToken ct) { // Single-hop resolution. Chain merges (A→B→C) are collapsed at merge time, // so MergedIntoId always points to the final survivor — no recursion needed. PartyId resolved = await cmd.PartyId.ResolveCurrentAsync(reader, dataFilter, ct);
await invoicing.FinaliseAsync(cmd.InvoiceId, resolved, ct); }}The rule “any handler / job that accepts a PartyId must call
ResolveCurrentAsync” is currently a convention — the architecture test that
enforces it requires either a Roslyn analyser or an IL-level scan (NetArchTest
sees method signatures, not method bodies) and is tracked as
#1409.
[DependsOn( typeof(GranitMergeableModule), typeof(GranitMergeableEntityFrameworkCoreModule))]public class AppModule : GranitModule { }// Idempotency-cache DbContext (or ConfigureMergeableModule() on a shared DbContext)builder.AddGranitMergeableEntityFrameworkCore(o => o.UseNpgsql(builder.Configuration.GetConnectionString("Default")));[DependsOn( typeof(GranitPartiesModule), typeof(GranitPartiesEndpointsModule), typeof(GranitPartiesMergeableModule))] // adds adapter + Party-internal rewriterspublic class AppModule : GranitModule { }builder.AddGranitPartiesEntityFrameworkCore(o => o.UseNpgsql(builder.Configuration.GetConnectionString("Default")));
builder.AddGranitMergeableEntityFrameworkCore(o => o.UseNpgsql(builder.Configuration.GetConnectionString("Default")));
// Adapter + Party-children + Party-parent rewriters.// Cross-module rewriters (Invoice/Subscription/BalanceAccount) auto-register from// their own AddGranit{Module}EntityFrameworkCore calls.builder.AddGranitPartiesMergeable();app.MapGranitParties(); // exposes POST /parties/{id}/merge + previewAPI surface (Granit.Mergeable)
Section titled “API surface (Granit.Mergeable)”| Type | Role |
|---|---|
IHasMergeTombstone | State marker — auto-mapped column / index / query filter |
IMergeable<TSelf> | Behaviour + state — GetConflicts, MergeFrom |
IReferenceRewriter<TAggregate> | Per-module bulk-SQL FK rewriter |
IMergeableAggregateAdapter<TAggregate> | Per-aggregate Load / Persist / Tombstone / ChainCollapse |
IMergeService<TAggregate> | Orchestrator contract — MergePreviewAsync, MergeAsync |
MergeRequest | (SurvivorId, LoserId, Choices, DryRun, Reason, IdempotencyKey) |
MergeResult<T> | (Merged, Conflicts, RewriteCounts, DryRun) |
MergeFieldChoices | Per-field overrides — fluent builder via MergeFieldChoices.NewBuilder() |
WinnerSide | Survivor (default) or Loser |
FieldConflict | One row of the preview diff |
MergeException | Hard-invariant violation → HTTP 422 |
AddReferenceRewriter<TAggregate, TRewriter>() | DI registration helper |
Architecture tests
Section titled “Architecture tests”MergeableConventionTests (in tests/Granit.ArchitectureTests) catches drift on
the framework at build time:
- Curated whitelist actually ships rewriters. Every assembly in the expected
list (currently
Granit.Parties.Mergeable,Granit.Invoicing.EntityFrameworkCore,Granit.Subscriptions.EntityFrameworkCore,Granit.CustomerBalance.EntityFrameworkCore) contains at least oneIReferenceRewriter<Party>implementation. Catches the silent failure of an inlined rewriter being accidentally removed. - Every aggregate carrying
PartyIdoutsideGranit.Partieshas a rewriter. Reflection scan: every concreteEntitysubclass with a typedPartyIdproperty is mapped back to its sibling*.EntityFrameworkCoreor*.Mergeableassembly, which must ship a rewriter. Catches “added a new aggregate carryingPartyIdand forgot the rewriter” — the failure mode that leaves orphan FK references after a merge. - Every
IReferenceRewriter<Party>lives in a persistence package. Rewriters must live in*.EntityFrameworkCoreor*.Mergeable— keeping the SQL / persistence concern next to theDbContextthat owns the FK.
See also
Section titled “See also”- ADR-037 — Party merge framework — design decisions, trade-offs, non-goals
- Parties — central party-management aggregate — the first consumer
- Query Filters — the
MergeTombstonefilter alongsideSoftDelete,Active,MultiTenant - Persistence overview —
ApplyGranitConventionsand the auto-applied column conventions - Entity Lifecycle Events —
*Etointegration events outbox pattern, used byPartyMergedEto