Skip to content

Mergeable — Generic Aggregate Merge Framework

Multi-channel data entry produces duplicates: the same customer created twice via two CRM imports, a supplier mapped under two external providers, a party captured both as an individual and as their employer. Without a merge primitive those duplicates corrupt every aggregate that references them — invoices issued against the wrong party, ledger fragmented across two balances, mappings in conflict.

Granit.Mergeable is a generic merge framework: define an IMergeable<TSelf> aggregate, register one IReferenceRewriter<TSelf> per module that holds an FK to it, and the orchestrator handles the rest — preview, validation, advisory locking, bulk-SQL fan-out, tombstone, audit, integration event. The first consumer is Granit.Parties.Mergeable; the same shape applies to Granit.Catalog.Product deduplication and any future Lead / Opportunity / Asset aggregate.

The orchestrator (EfMergeService<TAggregate> in Granit.Mergeable.EntityFrameworkCore) runs every merge inside a single TransactionScope at IsolationLevel.Serializable — under the framework’s single-PostgreSQL deployment assumption, every participating DbContext enrols into the same physical transaction (no DTC). Rewriter failure rolls back everything atomically.

sequenceDiagram
    participant Endpoint as POST /parties/{id}/merge
    participant Orch as EfMergeService<TAggregate>
    participant Idem as merge_idempotency
    participant Lock as pg_advisory_xact_lock
    participant Adapter as IMergeableAggregateAdapter<T>
    participant Rewriters as IReferenceRewriter<T>... (DI list)
    participant Outbox as Wolverine Outbox

    Endpoint->>Orch: MergeAsync(MergeRequest)
    Orch->>Idem: Lookup (key, hash)
    alt Hit + same hash
        Idem-->>Orch: cached MergeResult
        Orch-->>Endpoint: 200 (replay)
    else
        Orch->>Lock: BEGIN tx (Serializable) + advisory lock per tenant
        Orch->>Adapter: LoadAsync(survivor) + LoadAsync(loser)
        Orch->>Orch: validate invariants + GetConflicts
        alt DryRun
            Rewriters-->>Orch: CountAsync (no UPDATE)
            Orch->>Lock: ROLLBACK
            Orch-->>Endpoint: MergeResult { DryRun=true, Conflicts, Counts }
        else Live
            Orch->>Orch: survivor.MergeFrom(loser, choices)
            Orch->>Adapter: ApplyTombstone(loser)
            Orch->>Adapter: CollapseChainTombstonesAsync
            Rewriters-->>Orch: RewriteAsync (bulk UPDATE)
            Orch->>Adapter: PersistMergedPairAsync
            Orch->>Outbox: PartyMergedEto enqueued (same SaveChanges)
            Orch->>Lock: COMMIT
            Orch->>Idem: UPSERT (best-effort, post-commit)
            Orch-->>Endpoint: 200 MergeResult
        end
    end

Two key design points:

  • Locking is tenant-wide. A pg_advisory_xact_lock keyed on hashtext('party-merge-' || tenantId) serialises concurrent merges within a tenant; merges across tenants stay parallel. Layered on top, SELECT … FOR UPDATE on both aggregate rows protects against intra-tenant interleaving (e.g. two admins picking the same loser).
  • Audit log is not rewritten. Bulk-SQL rewriters target persisted typed FK columns. The audit log is intentionally immutable — the query layer joins on parties.MergedIntoId to render “Party X (merged into Y on …)”. Same goes for in-flight payloads (Wolverine outbox/inbox, scheduled jobs, webhooks): they are resolved on receipt via ResolveCurrentAsync rather than rewritten in place. See tombstone follow-through.

Two interfaces, both in the framework:

flowchart LR
    A[Aggregate] -->|state-only marker| B[IHasMergeTombstone]
    A -->|behavior + state| C[IMergeable&lt;TSelf&gt;]
    B -->|EF auto-applies| D[MergedIntoId column<br/>+ MergedAt column<br/>+ index<br/>+ MergeTombstone query filter]
    C -->|inherits| B

    style A fill:#0ea5e9,color:#fff
    style C fill:#6366f1,color:#fff

Marker for the tombstone state, decoupled from the merge service so EF Core, listings, query filters, and a future un-merge endpoint can target it without referencing Granit.Mergeable:

public interface IHasMergeTombstone
{
Guid? MergedIntoId { get; } // null on survivor; survivor.Id on loser
DateTimeOffset? MergedAt { get; }
}

Implementing it is enough to get all the EF plumbing for free — ApplyGranitConventions detects the interface and:

  • Adds a nullable MergedIntoId (Guid?) and MergedAt (DateTimeOffset?) column.
  • Indexes MergedIntoId (powers chain-collapse and admin “merged-out” listings).
  • Registers the named query filter GranitFilterNames.MergeTombstone with expression e.MergedIntoId == null — every standard query excludes tombstoned rows by default, the same way ISoftDeletable excludes soft-deleted ones.

No per-module mapping code, no migration template to copy.

public interface IMergeable<TSelf> : IHasMergeTombstone
where TSelf : Entity, IMergeable<TSelf>
{
IReadOnlyList<FieldConflict> GetConflicts(TSelf loser);
void MergeFrom(TSelf loser, MergeFieldChoices choices);
}

GetConflicts returns a list of FieldConflict(FieldPath, SurvivorValue, LoserValue, Default) — empty when survivor and loser are scalar-equal; one entry per differing scalar field otherwise. The orchestrator surfaces the list as MergeResult.Conflicts for the preview UI.

MergeFrom folds the loser into the survivor using MergeFieldChoices — overrides for individual fields, defaulting to the recommended WinnerSide from GetConflicts when unspecified. Implementations throw MergeException for hard invariants (tenant mismatch, kind mismatch, default-currency mismatch, already-archived aggregates). Hard invariants cannot be overridden by an admin choice — they translate to HTTP 422 at the endpoint boundary.

public class Party : AuditedAggregateRoot, IMultiTenant, IHasMetadata, IMergeable<Party>
{
public Guid? MergedIntoId { get; private set; }
public DateTimeOffset? MergedAt { get; private set; }
public IReadOnlyList<FieldConflict> GetConflicts(Party loser) =>
// Compare Name, Website, TaxStatus, ExternalMappings, Metadata… Empty when equal.
BuildConflictList(loser);
public void MergeFrom(Party loser, MergeFieldChoices choices)
{
// Hard invariants — non-overridable, translate to 422.
if (loser.TenantId != TenantId) throw new MergeException("tenant mismatch");
if (loser.Kind != Kind) throw new MergeException("kind mismatch");
if (loser.DefaultCurrency != DefaultCurrency) throw new MergeException("currency mismatch");
// Per-field choices — defaults documented per field, override at merge time.
if (choices.ResolveOrDefault("Name", WinnerSide.Survivor) == WinnerSide.Loser)
Rename(loser.Name);
// Roles always union (bitwise OR — no conflict possible).
AddRoles(loser.Roles);
// Metadata: merge dictionaries with per-key overrides ("Metadata.segment" → Loser).
MergeMetadataFrom(loser, choices);
}
// Internal mutation point used by IMergeableAggregateAdapter.ApplyTombstone — the
// orchestrator never touches private setters via reflection.
internal void MarkAsMergedInto(Guid survivorId, DateTimeOffset mergedAt)
{
MergedIntoId = survivorId;
MergedAt = mergedAt;
Status = PartyStatus.Archived;
}
}

One per module that holds a typed FK to the mergeable aggregate. The rewriter inlines inside the module’s existing *.EntityFrameworkCore package — early plans called for a dedicated *.Mergeable package per module, but a 50-line SQL class does not justify a new csproj + DI extension + test project + CI shard wiring. The architecture test (MergeableConventionTests) accepts both *.EntityFrameworkCore and *.Mergeable placements.

namespace Granit.Invoicing.EntityFrameworkCore.Internal;
internal sealed class InvoicePartyReferenceRewriter(
IDbContextFactory<InvoicingDbContext> contextFactory) : IReferenceRewriter<Party>
{
public string Description => "Invoice.PartyId"; // Surfaced in audit + preview UI
public async Task<int> RewriteAsync(
Guid survivorId, Guid loserId, CancellationToken cancellationToken)
{
await using InvoicingDbContext db = await contextFactory
.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
PartyId loser = PartyId.Create(loserId);
PartyId survivor = PartyId.Create(survivorId);
return await db.Invoices
.Where(i => i.PartyId == loser)
.ExecuteUpdateAsync(s => s.SetProperty(i => i.PartyId, survivor), cancellationToken)
.ConfigureAwait(false);
}
public async Task<int> CountAsync(
Guid survivorId, Guid loserId, CancellationToken cancellationToken)
{
await using InvoicingDbContext db = await contextFactory
.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
PartyId loser = PartyId.Create(loserId);
return await db.Invoices
.Where(i => i.PartyId == loser)
.CountAsync(cancellationToken)
.ConfigureAwait(false);
}
}

Two methods, both bulk SQL:

MethodPurposeSQL shape
RewriteAsyncLive merge — re-points FKUPDATE … SET partyid = @survivor WHERE partyid = @loser
CountAsyncDry-run preview — no mutationSELECT COUNT(*) FROM … WHERE partyid = @loser

Always ExecuteUpdateAsync, never the change tracker. Two reasons:

  1. Performance. Tables with millions of rows (invoice_line_items, usage events) would crash an EF-tracked rewrite. Bulk SQL completes sub-second on a warm connection regardless of cardinality.
  2. EF child-collection trap. Loading the loser’s children with the change tracker, then re-attaching them to the survivor, conflicts with EF’s PK and shadow-FK tracking. Bulk SQL bypasses the tracker entirely.

Register the rewriter in the module’s existing AddGranit{Module}EntityFrameworkCore(...) extension:

public static IHostApplicationBuilder AddGranitInvoicingEntityFrameworkCore(
this IHostApplicationBuilder builder, Action<DbContextOptionsBuilder> configure)
{
// ... existing DbContext + interceptor wiring ...
builder.Services.AddReferenceRewriter<Party, InvoicePartyReferenceRewriter>();
return builder;
}

The orchestrator discovers all IReferenceRewriter<Party> registrations from DI at merge time and fans out into them inside the same TransactionScope.

The orchestrator delegates aggregate-specific concerns to IMergeableAggregateAdapter<TAggregate>, implemented by the consuming module (Granit.Parties.Mergeable.PartyMergeableAggregateAdapter):

public interface IMergeableAggregateAdapter<TAggregate>
where TAggregate : Entity, IMergeable<TAggregate>
{
Task<TAggregate?> LoadAsync(Guid id, CancellationToken cancellationToken);
Task PersistMergedPairAsync(TAggregate survivor, TAggregate loser, CancellationToken cancellationToken);
void ApplyTombstone(TAggregate loser, Guid survivorId, DateTimeOffset mergedAt);
Task<int> CollapseChainTombstonesAsync(Guid newSurvivorId, Guid oldSurvivorId, CancellationToken cancellationToken);
}

Why an adapter rather than reaching into the aggregate via reflection:

  • LoadAsync is responsible for disabling the IHasMergeTombstone filter when the merge orchestrator needs to observe a previously-merged loser (chain merges, recovery scenarios). The default filter would hide it.
  • ApplyTombstone routes through the aggregate’s internal mutation point (e.g. Party.MarkAsMergedInto) — keeping the framework on the public side of the encapsulation boundary.
  • CollapseChainTombstonesAsync rewrites earlier tombstones when a chain merge happens (A→B, then later B→C rewrites A.MergedIntoId from B to C). Single-hop ResolveCurrentAsync always reaches the final survivor.
  • PersistMergedPairAsync isolates the SaveChanges call so the orchestrator can stay agnostic of which DbContext owns the aggregate.

Two layers, on purpose. The HTTP layer (Granit.Http.Idempotency middleware) absorbs network retries with the same Idempotency-Key. The DB layer protects against an SDK that regenerates the key between retries of the same logical intent:

CREATE TABLE granit.merge_idempotency (
id uuid PRIMARY KEY,
key text NOT NULL,
request_hash text NOT NULL, -- SHA-256(JSON(MergeRequest))
survivor_id uuid NOT NULL,
loser_id uuid NOT NULL,
result_json text NOT NULL, -- cached MergeResult<T> payload
created_at timestamptz NOT NULL,
UNIQUE (key, request_hash)
);

The orchestrator looks up (key, request_hash) before opening the transaction:

Lookup outcomeBehaviour
Hit, same hashReturn the cached MergeResult — no DB work, no double rewrite
Hit, different hashThrow MergeException → 409 Conflict (key reused for a different request)
MissProceed; UPSERT the entry post-commit (best-effort)

Entries are kept 24 h. Garbage collection is a future background job — for now the table grows linearly with merge volume, which is single-digit per tenant per day in steady state.

A host that prefers a single shared database over the isolated module DbContext can fold the table into its own model:

// In the host application's DbContext.OnModelCreating:
modelBuilder.ConfigureMergeableModule();

Bulk SQL rewrites cover persisted typed FK columns. A PartyId may also live in JSON-serialised payloads SQL cannot rewrite cleanly:

CategoryWhereWhy no SQL rewrite
Wolverine messageswolverine_outbox, wolverine_inboxSchema-less JSON payload, PartyId at any depth
Scheduled jobsscheduling_deferred_actionsSame
Pending webhookswebhook_envelopesSame
Queued notificationsnotifications_pendingSame
Already-published eventsExternal brokersOut of our control
Audit logaudit_entries.EntityIdImmutable history — MUST NOT rewrite

Consumers resolve the PartyId on receipt rather than relying on the rewrite:

public class FinaliseInvoiceHandler
{
public static async Task HandleAsync(
FinaliseInvoiceCommand cmd,
IPartyReader reader,
IDataFilter dataFilter,
IInvoicingService invoicing,
CancellationToken ct)
{
// Single-hop resolution. Chain merges (A→B→C) are collapsed at merge time,
// so MergedIntoId always points to the final survivor — no recursion needed.
PartyId resolved = await cmd.PartyId.ResolveCurrentAsync(reader, dataFilter, ct);
await invoicing.FinaliseAsync(cmd.InvoiceId, resolved, ct);
}
}

The rule “any handler / job that accepts a PartyId must call ResolveCurrentAsync” is currently a convention — the architecture test that enforces it requires either a Roslyn analyser or an IL-level scan (NetArchTest sees method signatures, not method bodies) and is tracked as #1409.

[DependsOn(
typeof(GranitMergeableModule),
typeof(GranitMergeableEntityFrameworkCoreModule))]
public class AppModule : GranitModule { }
// Idempotency-cache DbContext (or ConfigureMergeableModule() on a shared DbContext)
builder.AddGranitMergeableEntityFrameworkCore(o =>
o.UseNpgsql(builder.Configuration.GetConnectionString("Default")));
TypeRole
IHasMergeTombstoneState marker — auto-mapped column / index / query filter
IMergeable<TSelf>Behaviour + state — GetConflicts, MergeFrom
IReferenceRewriter<TAggregate>Per-module bulk-SQL FK rewriter
IMergeableAggregateAdapter<TAggregate>Per-aggregate Load / Persist / Tombstone / ChainCollapse
IMergeService<TAggregate>Orchestrator contract — MergePreviewAsync, MergeAsync
MergeRequest(SurvivorId, LoserId, Choices, DryRun, Reason, IdempotencyKey)
MergeResult<T>(Merged, Conflicts, RewriteCounts, DryRun)
MergeFieldChoicesPer-field overrides — fluent builder via MergeFieldChoices.NewBuilder()
WinnerSideSurvivor (default) or Loser
FieldConflictOne row of the preview diff
MergeExceptionHard-invariant violation → HTTP 422
AddReferenceRewriter<TAggregate, TRewriter>()DI registration helper

MergeableConventionTests (in tests/Granit.ArchitectureTests) catches drift on the framework at build time:

  • Curated whitelist actually ships rewriters. Every assembly in the expected list (currently Granit.Parties.Mergeable, Granit.Invoicing.EntityFrameworkCore, Granit.Subscriptions.EntityFrameworkCore, Granit.CustomerBalance.EntityFrameworkCore) contains at least one IReferenceRewriter<Party> implementation. Catches the silent failure of an inlined rewriter being accidentally removed.
  • Every aggregate carrying PartyId outside Granit.Parties has a rewriter. Reflection scan: every concrete Entity subclass with a typed PartyId property is mapped back to its sibling *.EntityFrameworkCore or *.Mergeable assembly, which must ship a rewriter. Catches “added a new aggregate carrying PartyId and forgot the rewriter” — the failure mode that leaves orphan FK references after a merge.
  • Every IReferenceRewriter<Party> lives in a persistence package. Rewriters must live in *.EntityFrameworkCore or *.Mergeable — keeping the SQL / persistence concern next to the DbContext that owns the FK.