Fluens.Messaging
0.7.4
dotnet add package Fluens.Messaging --version 0.7.4
NuGet\Install-Package Fluens.Messaging -Version 0.7.4
<PackageReference Include="Fluens.Messaging" Version="0.7.4" />
<PackageVersion Include="Fluens.Messaging" Version="0.7.4" />
<PackageReference Include="Fluens.Messaging" />
paket add Fluens.Messaging --version 0.7.4
#r "nuget: Fluens.Messaging, 0.7.4"
#:package Fluens.Messaging@0.7.4
#addin nuget:?package=Fluens.Messaging&version=0.7.4
#tool nuget:?package=Fluens.Messaging&version=0.7.4
Fluens.Messaging
Inbox/outbox messaging pattern for modular applications.
Installation
dotnet add package Fluens.Messaging
Usage
Setup
// Host setup
fluensBuilder.AddMessaging();
// Module setup (inside IModuleSetup.RegisterServices)
// Consumers are auto-discovered from the module assembly
module.AddMessaging<OrdersDbContext>(msg =>
{
msg.Publishes<OrderCreatedMessage>();
});
DbContext Configuration
Every module DbContext registered via AddMessaging<TDbContext> MUST inherit from the abstract
FluensDbContext base (ADR-0020). The base wires the messaging EF configuration, registers the
outbox interceptor idempotently, pre-declares the messaging DbSet<>s, and exposes the per-context
OutboxFlushStack LIFO consumed by the stateless OutboxSaveChangesInterceptor (push in
SavingChangesAsync, pop in both SavedChangesAsync and SaveChangesFailedAsync — so nested
SaveChangesAsync calls keep the stack balanced and the post-commit transport wake-up is neither
lost nor duplicated). Override MessagingSchema per module; subclasses overriding OnModelCreating
or OnConfiguring MUST call base.* first so messaging wiring survives.
internal sealed class OrdersDbContext : FluensDbContext
{
protected override string MessagingSchema => "orders";
public OrdersDbContext(DbContextOptions<OrdersDbContext> options) : base(options) { }
public DbSet<Order> Orders => Set<Order>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder); // applies messaging configurations
modelBuilder.HasDefaultSchema(MessagingSchema);
modelBuilder.ApplyConfigurationsFromAssembly(typeof(OrdersDbContext).Assembly);
}
}
// In AddDbContext registration — AddOutboxInterceptor is optional (the base context registers it
// idempotently in OnConfiguring), but a fluent registration site keeps the wiring visible.
services.AddDbContext<OrdersDbContext>((sp, options) =>
{
options.AddOutboxInterceptor(sp);
});
ApplyMessagingConfigurations/ApplyIngressConfigurationmap every column to an explicit snake_case name (message_type,processed_at, …), matching theFluens.Migrationsschema. The mapping is self-contained —UseSnakeCaseNamingConvention()is not required for the messaging tables.
Publishing
public record OrderCreatedMessage(Guid OrderId) : IMessage;
// Publish is synchronous — message is stored in outbox during SaveChangesAsync
publisher.Publish(new OrderCreatedMessage(order.ID));
await dbContext.SaveChangesAsync(); // Outbox interceptor writes message
Consuming
public class OrderCreatedConsumer : IMessageConsumer<OrderCreatedMessage>
{
public async Task ConsumeAsync(
OrderCreatedMessage message,
MessageContext context,
CancellationToken ct)
{
// Process the message
}
}
Architecture: Publish (sync, lock-free ConcurrentQueue in-memory) → Outbox (EF SaveChanges interceptor on a FluensDbContext) → Transport (background worker, per-handler fan-out; bounded-parallel two-phase distribution — a parallel network I/O phase with a per-delivery timeout, ADR-0022; local-directed branch on the fluens-directed-module envelope header for ADR-0019 outbox-routed faults) → Inbox (non-awaiting InboxOperator dispatcher + persistent per-lane LaneWorkers owned by LaneRegistry) → Consumer. Concurrent IMessagePublisher.Publish from a fan-out / Task.WhenAll handler within the same scope is safe (ADR-0021). Failed messages are retried in two stages (in-memory then DB-scheduled) then moved to a dead_letters table; a directed Fault<TMessage> is published through the same outbox path (ADR-0019).
Partitioned consumption with persistent per-lane workers (ADR-0010 / ADR-0011 / ADR-0018)
Every module that calls AddMessaging<TDbContext> (with TDbContext : FluensDbContext) runs through
the partitioned inbox operator — no opt-in. PartitionCount defaults to 1 (fully sequential per
handler); a higher value spreads each handler's messages across partitionKey mod N lanes that run
concurrently while messages sharing a partition key stay strictly ordered. InboxOperator is a
non-awaiting dispatcher (ADR-0018): one cycle fetches the due batch, plans lanes via the pure
InboxLanePlanner, and routes each candidate onto its persistent LaneWorker<TDbContext> via
LaneRegistry<TDbContext>.TryDispatchAsync (non-blocking TryWrite only — WriteAsync is forbidden
on the dispatch path); the operator does NOT Task.WhenAll lane completion (a full cycle returns in
O(batch size) wall time, independent of lane work). A worker whose channel is full surfaces false
and the row stays pending — the DB is the durable buffer. Lanes are owned by a
ConcurrentDictionary<LaneKey, LaneWorker<TDbContext>> with lazy spawn on first dispatch, periodic
idle self-removal (default 5 min — the planner's fairness window), and a bounded graceful drain on
host shutdown (default 30s).
module.AddMessaging<OrdersDbContext>(msg =>
{
msg.Publishes<OrderCreatedMessage>();
msg.WithPartitionCount(4); // 4 concurrent lanes per handler
});
Namespace: the message-contract types below —
Envelope, theIPartitionedByInt/IPartitionedByHashpartition markers, theIPriorityCarrierpriority marker, and the[MessageTtl]attribute — live in theFluens.Messaging.Messagingnamespace, so addusing Fluens.Messaging.Messaging;alongsideusing Fluens.Messaging;. The rootIMessagemarker stays inFluens.Messaging.
A message opts into ordered partitioning by implementing a partition marker (otherwise it is unpartitioned and runs on slot 0):
public sealed record OrderCreatedMessage(int OrderId, int CustomerId) : IPartitionedByInt
{
public int GetPartitionKey() => CustomerId; // per-customer ordering
}
// or a stable string key (folded with FNV-1a — never string.GetHashCode):
public sealed record InvoiceIssued(string TenantId) : IPartitionedByHash
{
public string GetPartitionKey() => TenantId;
}
Delivery priority (IPriorityCarrier): a message declares a non-Normal delivery priority by
implementing the IPriorityCarrier marker. At publish auto-fill the pure PriorityResolver resolves the
priority (an IPriorityCarrier yields its own GetPriority(); any other message resolves to
MessagePriorities.Normal) and stamps it onto Envelope.Priority and the denormalized priority column,
so a carrier's declared priority reaches the inbox row:
public sealed record FraudAlert(int OrderId) : IPriorityCarrier
{
public MessagePriorities GetPriority() => MessagePriorities.High; // higher is preferred within its lane
}
Within a partition lane, ordering prefers higher priority (Priority DESC) but applies an
anti-starvation fairness rule: a lower-priority message that has waited beyond the fairness threshold
(default 5 minutes) is admitted ahead of pending higher-priority messages, so a long-waiter is never
indefinitely deferred behind a stream of fresh high-priority messages. The priority that orders a lane is the
value carried on the inbox row (stamped at publish time), not re-evaluated at consume time.
Transactional inbox + outbox-routed faults: each message is processed by its lane worker in a fresh
scope; the consumer's business write, the inbox acknowledgement (ProcessedAt), and any directed
Fault<TMessage> outbox row staged via IMessagePublisher.Publish (ADR-0019) commit together in one
SaveChangesAsync, so a crash can never leave one without the other (exactly-once business effect under
at-least-once delivery). For a BusinessFaultException the original consume transaction rolls back and
the lane worker opens a fresh scope to commit ACK + fault outbox row in a single SaveChangesAsync. Fan-out
is per handler — one inbox row per (MessageId, HandlerType), so retry and dead-letter are isolated per
subscribing handler.
Two-stage recoverability: a transiently failed handler is retried in-memory [0.1, 0.3, 0.5, 1.0]s
on a fresh scope held inside the lane worker (the rolled-back attempt leaves no business effect; stage-1
attempts are NOT persisted as a journal — only the terminal dead-letter AttemptHistory roll-up is,
ADR-0018), then on a DB-persisted back-off schedule [1, 2, 3, 5]s (the worker reads the next channel
item; the DB-bound operator's due-time fetch gate is the head-of-line skip), and finally moved to
dead_letters — the dead-letter row (envelope + full attempt history + failure code + exception type) is
inserted and the originating inbox row is deleted in one transaction, plus a system.terminal-failure
Fault<TMessage> is staged on the outbox in the same SaveChangesAsync, so the move is atomic and the
freed MessageId can be re-injected by a replay without a composite-PK collision. The poison policy decides
whether the lane worker keeps draining (DeadLetterAndContinue) or exits (HaltPartition).
Permanent-failure short-circuit (ADR-0023 / ADR-0025 / ADR-0026): a deterministic resolution failure —
an unresolvable message type, an undeserializable payload, an undeserializable envelope, or a missing consumer
registration — can never succeed on retry, so it is not put through the two retry stages. Such a failure is
thrown as a typed exception implementing the marker IPermanentConsumeFailure — the infrastructure-thrown
family is MessageTypeResolutionException, MessagePayloadDeserializationException,
EnvelopeDeserializationException (ADR-0026 — raised by the consume-path DeserializeEnvelopeOrThrow on a
malformed envelope blob), and ConsumerNotRegisteredException; the lane worker catches them on a guarded arm
placed before the general catch and forwards them as RetryPolicy.Decide(permanent: true), which records a
single attempt and routes the message straight to dead-letter on the first failure — skipping the entire
in-memory + DB-scheduled progression (so a message destined to fail does not burn the full ≈12 s eight-attempt
budget first). Envelope-corruption terminals stamp FailureCode = "system.envelope-corruption" on the
dead-letter row (distinct from the generic system.terminal-failure) so dashboards can filter envelope
corruption as a separate category (ADR-0026). Transient failures are unaffected — they still walk both stages.
The dead-letter move is itself resilient to an unresolvable type or envelope: building the
system.terminal-failure fault re-resolves the original message type / payload and re-deserializes the
envelope, so each step is guarded — an unbuildable-type, malformed-payload, or malformed-envelope message skips
only the (non-essential) system fault (counted on fault.publish_skipped) while the dead-letter move always
proceeds and preserves the message's diagnostics (MessageType string + raw payload + envelope).
Handler code MAY signal a deterministic, non-retryable failure by throwing
PermanentConsumeFailureException (ADR-0025) — a public sealed exception that implements the now-public
IPermanentConsumeFailure marker. The same lane-worker catch arm classifies it as Permanent: true, so it
reaches dead-letter on the first attempt without burning the ≈12 s retry budget. User-defined exception
types MAY also implement IPermanentConsumeFailure directly when a domain-specific exception name reads
better than the generic one (e.g. PayloadSchemaUnsupportedException : Exception, IPermanentConsumeFailure).
When to throw which exception (ADR-0025 decision guide)
BusinessFaultException and PermanentConsumeFailureException are not interchangeable — they encode
two distinct failure semantics and produce two distinct durable outcomes. Pick by the nature of the
failure, not by the desired routing:
| Exception | Failure nature | Outcome | Use when |
|---|---|---|---|
BusinessFaultException (or ctx.Fault(code, details?)) |
Business "refused" — the message is semantically valid but the domain deliberately declines | ACK + publish a directed business Fault<TMessage> with Info.Code = <your code> and Info.Exception = null. No DLQ. No retry. |
Account locked, order already fulfilled, idempotent duplicate, quota exhausted — the producer reads the fault as a business outcome and routes it as domain feedback. |
PermanentConsumeFailureException (or any user-defined Exception implementing IPermanentConsumeFailure) |
Technical "impossible" — no amount of retry would ever succeed | DLQ on the first attempt + publish a directed system Fault<TMessage> with Info.Code = "system.terminal-failure" and Info.Exception = ExceptionInfo(Type, Message). |
Foreign-key violation referencing a missing parent, arg-out-of-range from a malformed-but-deserializable payload, invariant violation from a corrupted message — an operator inspects + replays after a data fix. |
public async Task ConsumeAsync(OrderCreatedMessage msg, MessageContext ctx, CancellationToken ct)
{
if (await IsDuplicateAsync(msg.OrderId, ct))
throw new BusinessFaultException("order.duplicate"); // ACK + business fault, no DLQ
if (!await ParentCustomerExistsAsync(msg.CustomerId, ct))
throw new PermanentConsumeFailureException(
$"customer {msg.CustomerId} missing — message can never be processed"); // first-attempt DLQ
}
Throwing PermanentConsumeFailureException for a legitimate business refusal lands the message in DLQ and
the producer reads it as a system terminal failure (system.terminal-failure), not a business outcome —
which is the wrong contract toward the producer. Conversely, throwing a bare ArgumentException /
DbUpdateException / InvalidOperationException for a genuinely deterministic failure walks the full
≈12 s eight-attempt retry budget before reaching the DLQ it was always destined for; reach for
PermanentConsumeFailureException (or a typed IPermanentConsumeFailure) in that case.
Consume interceptors: implement IConsumeInterceptor (or IConsumeInterceptor<TMessage>) for
Russian-doll middleware that wraps the handler inside the lane's scope and transaction (auto-discovered
by AddModuleMessaging via Scrutor):
internal sealed class LoggingConsumeInterceptor : IConsumeInterceptor
{
public async Task OnConsume(MessageContext ctx, Func<Task> next)
{
// before the handler …
await next();
// after the handler …
}
}
Dead-letter maintenance, recovery, and outbox control (ADR-0011, surfaces split by ADR-0014)
The per-module operational surface is split by responsibility (ADR-0014) into three SRP-aligned interfaces, each
scoped to a single module DbContext and consumed by the dashboard endpoints:
IMessagingMaintenance— read / diagnostics: query dead-letters, report inbox lag.IMessagingRecovery— replay / retry: single + bulk/filtered dead-letter replay (forward-looking name absorbs future manual retry, scheduled re-attempts, replay-with-modifications).IOutboxControl— outbox control-plane: mark a pending outbox row expired in place (forward-looking name absorbs future force-resend, cancel, reschedule).
// Query dead-letters (an empty filter returns all; set dimensions combine with AND)
var filter = new DeadLetterFilter
{
MessageType = "OrderCreatedMessage",
FailureCode = "system.terminal-failure",
FailedAfter = DateTimeOffset.UtcNow.AddHours(-1),
};
IReadOnlyList<DeadLetterSummary> failures = await maintenance.QueryDeadLettersAsync(filter, ct);
int lag = await maintenance.GetInboxLagAsync(ct); // unprocessed inbox rows
int single = await recovery.ReplayAsync(deadLetterId, ct); // 1 when replayed, 0 when absent / already replayed
int bulk = await recovery.ReplayManyAsync(filter, ct); // count of dead-letters replayed
int expired = await outboxControl.ExpireOutboxAsync(outboxId, ct); // mark a pending outbox row dead in place (never deleted)
DeadLetterSummary projects only the diagnostic why-columns (MessageId, HandlerType, MessageType,
OriginalSchema/OriginalTable, PartitionKey, FailureCode, ExceptionType, Error, RetryCount,
FailedAt, ReplayedAt) — it never exposes the internal dead-letter entity or its envelope / payload
blobs.
Replay re-injects, never mutates in place. A replayed dead-letter re-enters the inbox as a fresh
pending row carrying the same MessageId and HandlerType as the original failure plus a fresh tail
(RetryCount = 0, NextRetryAt = null, unprocessed) and the dead-letter's recorded OriginalSchema; the
source dead_letters row is then marked ReplayedAt, both in one transaction. The re-inject is
collision-free because the terminal dead-letter move had already deleted the originating inbox row, freeing
the composite (MessageId, HandlerType) primary key, so the replayed row lands at the partition tail
dedup-free (ADR-0011). ReplayAsync is idempotent (a missing or already-replayed dead-letter is a no-op);
ReplayManyAsync replays every not-yet-replayed dead-letter matching the filter.
Faults and directed delivery (ADR-0012 / ADR-0019)
A failing message can surface back to the producer as a first-class Fault<TMessage> message that
carries the original message verbatim plus a diagnostic FaultInfo. Because it is itself an IMessage,
it rides the same durable outbox/inbox/mesh path and is consumed with IMessageConsumer<Fault<T>>. The
publish is outbox-routed: the lane worker calls the scoped IMessagePublisher.Publish inside the
consume scope; the staged fault outbox row commits in the same SaveChangesAsync as the business
write + inbox ACK, so an ACK without a published fault is impossible (ADR-0019). The closed-generic
Fault<TMessage> is constructed by the internal FaultBuilder via DI-aware ActivatorUtilities.CreateInstance
- reflective
required initassignment (ADR-0017); the directed origin module rides in thefluens-directed-modulepublish-options header andLocalFanOutroutes the fault to one origin-module subscriber viaDirectedFanOut(bypassing type fan-out, no relay). The pre-ADR-0019FaultDispatchertype + the operator-suppliedFaultPublisherdelegate seam are gone.
public sealed record OrderRejectedDetails(string Reason) : IMessage;
// Producer side — observe faults for OrderCreatedMessage
internal sealed class OrderFaultConsumer : IMessageConsumer<Fault<OrderCreatedMessage>>
{
public Task ConsumeAsync(Fault<OrderCreatedMessage> fault, MessageContext ctx, CancellationToken ct)
{
var original = fault.Message; // the OrderCreatedMessage that failed
var code = fault.Info.Code; // e.g. "order.rejected" or "system.terminal-failure"
var details = fault.Info.Details<OrderRejectedDetails>(serializer, typeNameSerializer);
var exception = fault.Info.Exception; // ExceptionInfo? — non-null only for a system fault
return Task.CompletedTask;
}
}
A consumer raises a business fault — the message was understood but cannot be processed for a domain
reason — either by calling ctx.Fault(...) (commits any business write it already made, then ACKs and
publishes) or by throwing BusinessFaultException (rolls back, re-ACKs, then publishes). Both take the
ACK-and-publish path (no retry) and produce a fault with no captured exception:
public async Task ConsumeAsync(OrderCreatedMessage msg, MessageContext ctx, CancellationToken ct)
{
if (!await CanFulfil(msg, ct))
ctx.Fault("order.rejected", new OrderRejectedDetails("out of stock")); // ACK + publish, no retry
}
A system fault is published automatically when an ordinary exception exhausts both retry stages
(RetryPolicy, ADR-0011): the lane emits a Fault<TMessage> stamped with the reserved code
system.terminal-failure and an ExceptionInfo snapshot of the captured exception. Only a system fault
carries a non-null Info.Exception.
A fault is directed — routed to exactly one target, not broadcast to every Fault<T> subscriber. The
target is the explicit Envelope.FaultTo when set, otherwise the message origin (SourceApp +
SourceModule). The cross-application leg reuses the existing mesh ingress (POST /mesh/deliver): the
directed module rides inside the envelope's fluens-directed-module header, so no new wire endpoint is
added and the no-relay rule still holds.
Configuration
MessagingOptions is bound from the "Fluens:Messaging" configuration section:
| Property | Default | Description |
|---|---|---|
OutboxBatchSize |
500 |
Max outbox messages fetched per transport cycle. Tuned for the modular-monolith primary use case (ADR-0036) where the local-only fast-path commits one batch per subscriber-module DbContext and benefits from a larger fetch. Pathological cycle time stays bounded by the drain-then-sleep loop (ADR-0033) and the MaxDrainDurationSeconds soft-cap (default 30s), so a larger default cannot starve graceful shutdown. (Was 100 pre-ADR-0036.) |
InboxBatchSize |
100 |
Max inbox messages fetched per processing cycle |
InboxProcessorIntervalSeconds |
30 |
Fallback polling interval (seconds) for the inbox operator worker; immediate wake-up on new inbox rows is driven by IInboxNotifier. |
TransportProcessorIntervalSeconds |
60 |
Polling interval in seconds for the outbox transport processor |
TransportDeliveryTimeoutSeconds |
30 |
Per-delivery timeout budget (seconds) applied to each outbound mesh delivery attempt (ADR-0022). Must be > 0 and stay < TransportProcessorIntervalSeconds so a cycle cannot overrun its own interval. |
TransportMaxConcurrentDeliveries |
10 |
Maximum concurrent outbound deliveries the transport processor runs in parallel — the delivery-semaphore bound (ADR-0022). Must be > 0; set to 1 for strictly-sequential delivery. |
MessageTtlSeconds |
86400 |
Default time-to-live (24h) applied to every outbox message; overridable per message via [MessageTtl] |
SentRetentionDays |
7 |
Retention for terminal (sent) outbox rows before housekeeping deletes them |
ProcessedRetentionDays |
7 |
Retention for processed inbox rows before housekeeping deletes them |
IngressRetentionDays |
7 |
Retention for processed ingress staging rows before housekeeping deletes them |
DeadLettersRetentionDays |
30 |
Retention for replayed dead-letter rows before housekeeping deletes them |
MaxDrainDurationSeconds |
30 |
Soft-cap (seconds) bounding the drain-then-sleep loop in the three hot-path *OnceAsync workers (transport, inbox, ingress) so a pathological drain cycle cannot starve graceful shutdown (ADR-0033). Must be > 0. Checked at the top of each drain iteration BEFORE the next fetch — never inside per-message I/O. |
{
"Fluens": {
"Messaging": {
"OutboxBatchSize": 500,
"InboxBatchSize": 100,
"InboxProcessorIntervalSeconds": 30,
"TransportProcessorIntervalSeconds": 60,
"TransportDeliveryTimeoutSeconds": 30,
"TransportMaxConcurrentDeliveries": 10,
"MessageTtlSeconds": 86400,
"SentRetentionDays": 7,
"ProcessedRetentionDays": 7,
"IngressRetentionDays": 7,
"DeadLettersRetentionDays": 30,
"MaxDrainDurationSeconds": 30
}
}
}
Throughput characteristics
MessageTransportProcessor distinguishes two distribution paths per drain iteration (ADR-0036). The fetched batch is split via the pure static OutboxDistributionPlanner.IsLocalOnly(remoteSubscribers, converged) (a messageType → subscribers lookup cached per drain iteration) and each partition runs under its own commit cadence.
Local-only batch path — applies to every OutboxMessage whose IRemoteSubscriptionRegistry.GetRemoteSubscribers(messageType) returns an empty owed-app set under a converged registry (IsLocalOnly == true). The whole sub-batch runs through one private DistributeLocalBatchAsync call with a single per-iteration BatchedLocalInboxWriter: PREPARE accumulates per-handler InboxMessage rows across every message into one cached DbContext per subscriber-module DbContextType (dedup via AnyAsync on the composite (MessageId, HandlerType) PK per ADR-0010), then FlushAsync commits one SaveChangesAsync per cached subscriber-module DbContext and wakes each written module's inbox processor on a finally-free path past every commit (writer-then-wake symmetry); PERSISTENCE re-reads every source OutboxMessage tracked by Id in a fresh EF scope, calls MarkAsSent on each, and commits one batch SaveChangesAsync for the whole sub-batch. So a 500-message local-only batch fanning to two subscriber modules costs one PREPARE SaveChangesAsync per subscriber-module DbContext + one PERSIST SaveChangesAsync for the publisher's outbox — not 500 + 500 + 500. The EmptyRemoteSubscriptionRegistry (in-process-only apps) is always converged so a pure modular monolith takes this path from the very first batch.
Mesh path — applies when IsLocalOnly == false (has remote subscribers OR registry unconverged). DistributeMessageAsync runs ADR-0022 Rule 5 unchanged: one SaveChangesAsync per message in PERSISTENCE, with per-destination network ACK semantics protected verbatim through the existing PREPARE → parallel I/O → PERSISTENCE three-scope shape. Batching mesh deliveries is forbidden because a hung peer's Hold would block healthy peers in the same batch and the per-destination convergence / TTL semantics depend on the per-message commit boundary. An unconverged-with-empty owed-app set deliberately routes here so the source row holds via PlanOutboxTerminal([], converged: false) → Hold, preserving the ADR-0007 cold-start no-loss invariant.
A mixed batch (some local-only + some mesh) runs both partitions within the same drain iteration; ordering within a priority band is preserved by the upstream ADR-0035 priority-aware FIFO fetch. The local-only path's inbox-wake set (localBatchNotified) is deliberately separate from the mesh path's outer notifiedModules so each subscriber-module is woken exactly once per drain iteration.
Crash safety. A PREPARE-success + PERSIST-fail crash on the local-only path leaves inbox handler-rows committed AND source outbox rows pending; the next drain iteration re-fetches, AnyAsync dedup short-circuits the duplicate inbox writes (zero new rows), the next PERSIST attempt commits the batched MarkAsSent. No data loss, no duplicates — the composite (MessageId, HandlerType) PK is the authoritative idempotency guard, identical to the per-message path's crash recovery just at batch granularity.
Tuning for modular-monolith
For a pure modular monolith (no mesh adapter installed — EmptyRemoteSubscriptionRegistry is registered) every batch takes the local-only fast-path. The recommended MessagingOptions.OutboxBatchSize = 500 (the new ADR-0036 default, up from 100) trades a slightly larger PREPARE working-set for far fewer SaveChangesAsync round-trips per drain iteration. The drain-then-sleep loop's MaxDrainDurationSeconds soft-cap (default 30s) caps the worst-case cycle wall-time regardless of batch size, so increasing OutboxBatchSize cannot starve graceful shutdown. If a deployment runs the mesh adapter and most published messages have remote subscribers, the larger default still costs only the fetch — the mesh path's per-message commit cadence is unchanged.
Polling and wake-up cadence
Fluens.Messaging does NOT poll on a fixed timer in the steady state — the three hot-path workers
combine writer-then-wake signaling with a per-cycle drain-then-sleep loop so that under
load they walk the durable backlog continuously, and the configured polling intervals
(InboxProcessorIntervalSeconds, TransportProcessorIntervalSeconds) act only as fallbacks.
Writer-then-wake. Every code path that writes a row a hot-path worker is waiting on first
persists the row (SaveChangesAsync), then signals the worker via the matching notifier
(ITransportNotifier.WakeUp after OutboxSaveChangesInterceptor flushes the outbox;
IInboxNotifier.WakeUp(moduleName) after MessageTransportProcessor / IngressDistributionProcessor
fans out to a subscriber inbox). The two non-fan-out inbox-write seams — ScopedLocalInboxWriter
(the shared ILocalInboxWriter consumed by both transport + ingress fan-out, per-handler write +
commit) and DeadLetterReplayer (operator-initiated single + bulk replay re-injecting a fresh
inbox row inside BeginTransactionAsync / CommitAsync) — follow the same writer-then-wake
guarantee: each calls IInboxNotifier.WakeUp(moduleName) exactly once after the row is durably
committed (past CommitAsync on the replayer, past SaveChangesAsync on the inbox writer) so an
operator-initiated replay or a directly-routed fan-out row does not wait the
InboxProcessorIntervalSeconds (default 30s) timer fallback before dispatch. The wake call sits
on a finally-free path: a throw from the in-transaction write propagates without reaching the
wake — no-wake-on-rollback is enforced by construction, not by try/finally discipline
(ADR-0033). The BackgroundWorker
channel is bounded
Channel.CreateBounded<byte>(new BoundedChannelOptions(3) { FullMode = DropWrite, SingleReader = true })
and WakeUp writes are best-effort TryWrite(0) — under sustained publish pressure the channel
silently coalesces and drops surplus signals, so the worker MUST not rely on receiving exactly one
wake-up per persisted row.
Drain-then-sleep (ADR-0033). Each hot-path *OnceAsync method
(MessageTransportProcessor.DistributeOnceAsync per module, InboxOperator.ProcessOnceAsync,
IngressDistributionProcessor.DistributeOnceAsync) wraps its fetch + work body in a while (true)
drain loop and only exits when its source signals "drained":
| Worker | Drain-loop continuation gate |
|---|---|
MessageTransportProcessor |
batch.Count == OutboxBatchSize (exit on a strictly smaller fetch) |
IngressDistributionProcessor |
rows.Count == IngressBatchSize (exit on a strictly smaller fetch — IngressBatchSize is the Fluens.Web.Messaging adapter's MessagingMeshOptions.IngressBatchSize) |
InboxOperator |
!anyRejected AND batch.Count == InboxBatchSize — NOT acceptedCount == batch.Count because the pure InboxLanePlanner legitimately skips lanes whose head is not yet due, so accepted-count equality is unreachable on a healthy drain and would collapse the loop back to single-batch-per-tick |
The soft-cap drainDuration < MaxDrainDurationSeconds (default 30s) is checked at the top of each
iteration BEFORE the next fetch — never inside a per-message scope nor inside the transport I/O
phase (the per-delivery TransportDeliveryTimeoutSeconds budget remains the inner I/O bound).
CancellationToken.IsCancellationRequested is honored between iterations so graceful shutdown
preempts the drain at the iteration boundary. Per-message / per-batch persistence (transport's
per-message SaveChangesAsync, ingress's MarkProcessedAsync, inbox's per-lane LaneWorker
commits) guarantees the next iteration's fetch never re-sees the just-processed rows.
This pattern closes the silent-loss vector under load: when the channel drops a WakeUp, the
worker still drains its source in the current cycle instead of falling back to the polling
interval (60s for transport, 30s for inbox / ingress), so a 100k-row backlog clears in
O(backlog / batchSize) iterations instead of O(backlog × interval / batchSize). The soft-cap
bounds the worst-case graceful-shutdown latency: a drain cycle yields to cancellation within
MaxDrainDurationSeconds rather than running unbounded against a pathological backlog. See
ADR-0033 for the architectural
decision and rules.
Priority-aware FIFO fetch (ADR-0035)
The two hot-path fetch sites order pending rows by Priority DESC first and GUIDv7 mint-time
second so the priority DESC slot of the partial pending indexes is live and cross-message_type
head-of-line starvation is structurally eliminated.
- Outbox transport fetch (
MessageTransportProcessor.FetchUnpublishedMessagesAsync) — orders the pending batch viaOutboxFetchOrdering.PendingPriorityFifo→OrderByDescending(m => m.Priority).ThenBy(m => m.Id)whereIdis theGuid.CreateVersion7()publish-time mint of the outbox row, so a strict-FIFO tie-break within a priority band rides on the publisher's mint-time order. This replaces the pre-ADR-0035OrderBy(m => m.Id)which left thepriority DESCslot ofix_outbox_pendingdead and let a singlemessage_typeaccumulating a mint-time-earlier backlog drain everyOutboxBatchSizefetch indefinitely while newer types of equal or higher priority starved. TheWherepredicate (the ADR-0009 scheduling gate) is unchanged. - Inbox operator fetch (
InboxOperator.FetchPendingAsync) — orders the pending batch viaInboxFetchOrdering.PendingDueFifo→OrderByDescending(m => m.Priority).ThenBy(m => m.MessageId)whereMessageIdis the publisher-side GUIDv7 carried verbatim through the outbox / mesh / inbox seam. This replaces the pre-ADR-0035OrderBy(m => m.ReceivedAt)which left thepriority DESCslot ofix_inbox_pendingdead and starved newer high-prioritymessage_types at the fetch site that the within-laneInboxLanePlannercould not see (the planner only orders rows the batch admitted; a starved row that never makes the batch is never promoted).ReceivedAtis the receiver-side wall-clock write time and is distinct from the publisher's mint time — the strict-FIFO tie-break must come fromMessageId. The within-lane planner's anti-starvation rule (starved-first oldest-first, thenPriority DESC, then GUIDv7 mint-time via big-endian byte comparison, ADR-0018) is unchanged and continues to operate post-fetch over whatever rows the new priority-aware fetch admits — a starvedLow-priority row is still promoted within its lane.
See ADR-0035 for the
architectural decision, the partial-index alignment trade-off, and the
IngressDistributionProcessor.FetchUnprocessedAsync follow-up scope note.
Message Envelope (ADR-0008 / ADR-0026)
Every message carries a single serialized Envelope rather than a bare execution-context blob. The
envelope nests the execution Context exactly once (correlation / activity / trace / message /
causation / user ids — never duplicated) and adds messaging metadata: an informational SchemaVersion
(see below), SourceApp, SourceModule, ContentType, an optional PartitionKey, a scheduled
AvailableAt, a delivery Priority, optional ReplyTo / FaultTo routing addresses, and
user-supplied Headers. It round-trips to / from byte[] via the AOT-safe source-generated
EnvelopeSerializerContext. The same serialized envelope is carried unchanged across publishing →
transport → consuming and over the mesh wire (no re-serialization). The PartitionKey, AvailableAt,
and Priority fields are also promoted to denormalized columns (ADR-0009) so the database can sort /
filter on them.
SchemaVersion (ADR-0026): Envelope.SchemaVersion is an int stamped at publish time, default
1 for envelopes constructed by current code. Pre-existing blobs that pre-date ADR-0026 omit the
field and deserialize as SchemaVersion = 0 (System.Text.Json's default for int); the additive-only
forward-compatibility policy treats 0 and 1 as semantically equivalent — the rollout is
transparent, no migration, no rebuilds. The property is informational only: no runtime code path
branches on it; the field exists so dashboards and logs can surface "running older code than producer"
during a rolling deploy. Schema evolution stays additive-only — new fields are appended as
optional (nullable or non-null primitive with a safe default), never remove, rename, or change the
type of an existing field; bump SchemaVersion on every additive change to make the producer-version
drift observable.
Envelope corruption is a permanent consume failure: if an inbox envelope blob cannot be
deserialized on the consume path (a JsonException from EnvelopeSerializerContext.Envelope), the
lane worker raises EnvelopeDeserializationException : IPermanentConsumeFailure (mirroring the
ADR-0023 typed exceptions) and the existing guarded IPermanentConsumeFailure catch arm routes the
message straight to the dead-letter on the first attempt (no retry — retrying cannot un-corrupt
the bytes). The dead-letter row stamps FailureCode = "system.envelope-corruption" (distinct from
the generic system.terminal-failure) and ExceptionType = nameof(EnvelopeDeserializationException),
so an envelope-corruption uptick is filterable as a separate category. The ingress seam
(IngressDistributionProcessor.ResolveDirectedModule) does NOT escalate envelope corruption to a
dead-letter (no ingress dead-letter table) — it catches locally, increments the
ingress.directed_drop counter with reason = envelope_invalid, and marks the row processed (the
lane-worker seam is the only escalation path).
Scheduling
Set Envelope.AvailableAt to schedule a message for later delivery. The transport processor's fetch
only selects rows whose availability window has opened (available_at IS NULL OR available_at <= now()),
so a scheduled message is invisible to transport until then. Its TTL is measured from the availability
instant: ExpiresAt = (AvailableAt ?? CreatedAt) + TTL.
Retention and housekeeping
A per-module HousekeepingWorker batch-deletes only terminal messaging rows past their retention
window — sent outbox rows (SentRetentionDays), processed inbox rows (ProcessedRetentionDays), and
replayed dead-letter rows (DeadLettersRetentionDays). An in-flight row is never deleted, so
housekeeping can never lose an undelivered message. Deletes run in bounded batches and reuse the
transport interval as the cleanup cadence.
Metrics
Fluens.Messaging emits a single BCL meter (System.Diagnostics.Metrics) named Fluens.Messaging
(MessagingMetrics) for the consume/transport/ingress/fault-publish pipeline. Every instrument
stamps a stratified tag scheme (ADR-0026) so a single dashboard can drill from "which module is
hot" to the specific failing handler / message / failure category:
| Instrument | Kind | Tags | Meaning |
|---|---|---|---|
inbox.lag |
ObservableGauge<long> |
module |
The absolute count of unprocessed inbox rows (ProcessedAt IS NULL) for each module — a true COUNT, not a +1/-1 delta |
consume.duration |
Histogram<double> (ms) |
module, handler_type |
How long a consumer took to process a message |
retry.count |
Counter<long> |
module, handler_type, message_type |
Handler retries scheduled by the two-stage retry policy |
deadletter.count |
Counter<long> |
module, handler_type, message_type, failure_code |
Messages moved to dead-letters — failure_code discriminates system.terminal-failure (retry exhaustion) from system.envelope-corruption (ADR-0026) |
delivery.latency |
Histogram<double> (ms) |
destination_app |
Successful per-destination remote delivery time |
ingress.directed_drop |
Counter<long> |
directed_module, message_type, reason |
Directed-delivery drops at the ingress seam. reason vocabulary: no_subscriber (named origin module has no local subscriber) or envelope_invalid (the staged envelope could not be deserialized — the ingress seam has no dead-letter table, so the row is marked processed and the drop is observable only as this counter + a warning log) |
fault.publish_skipped |
Counter<long> |
module, reason |
Skipped auxiliary fault publishes at LaneWorker.StageFaultOutboxRow. reason vocabulary: envelope_invalid (the fault-build envelope deserialize returned null), origin_unknown (the resolved origin module is null/empty), or message_unresolvable (the original message type or payload could not be resolved or deserialized). The underlying dead-letter / business-fault commit still proceeds — only the diagnostic Fault<TMessage> publish is skipped |
messaging.drain.iterations |
Counter<long> |
worker_type, terminal_reason |
Cumulative drained-batch count from the three hot-path drain-then-sleep workers (ADR-0034). Each emission adds the just-completed cycle's iteration count (never 1) — a single MessagingMetrics.RecordDrainCycle call adds N where N is the cycle's drained-batch count |
messaging.drain.duration |
Histogram<double> (ms) |
worker_type, terminal_reason |
Wall-clock duration of one drain cycle on the three hot-path workers (ADR-0034), recorded as TimeSpan.TotalMilliseconds so the bucket scheme aligns with consume.duration / delivery.latency |
inbox.lag is fed by a per-module hosted InboxLagObserver<TDbContext> (registered automatically by
AddModuleMessaging): each cycle it opens a fresh DI scope, COUNTs the module's unprocessed inbox rows, and
writes the absolute count into the gauge's per-module registry via MessagingMetrics.SetInboxLag. Because
the value is absolute (not accumulated), the gauge is restart-safe and self-correcting — a restart re-seeds the
registry on the first cycle, and the observable callback re-reads the registry on every collection. The observer
reuses InboxProcessorIntervalSeconds as its cadence and drops its module from the registry on shutdown.
MessagingMetrics.RecordConsume(duration, module, handlerType) records only the consume.duration histogram
(it never touches inbox.lag); the LaneWorker calls it exactly once per successfully-committed message, after
the transactional SaveChangesAsync — the retry, dead-letter, and BusinessFaultException paths throw before
reaching it, so they never record.
Drain-cycle telemetry (ADR-0034). The paired messaging.drain.iterations + messaging.drain.duration
instruments are emitted from a single public static entry point
MessagingMetrics.RecordDrainCycle(string workerType, long iterations, TimeSpan duration, string terminalReason)
so the two KeyValuePair<string, object?> tag locals are built once inside the helper body and passed to both
Counter.Add and Histogram.Record — tag drift between the paired emissions is structurally impossible. The
tag vocabulary is fixed and shared across all three worker types:
worker_type ∈ {transport, inbox, ingress}— the hot-path worker emitting the cycle (MessageTransportProcessor/InboxOperator/IngressDistributionProcessorrespectively; each hard-codes its own string literal).terminal_reason ∈ {drained, near_empty, time_cap, rejected}— why the drain cycle stopped. The first three values are emitted by every worker; therejectedvalue is unique to the inbox (worker_type = "inbox") and is reserved for theInboxLanePlannerlane-full continuation-gate carve-out —LaneWorker'sBoundedChannelFullMode.DropWritemakes theTryWrite == falsepath structurally unreachable in current code, so live emissions today collapse that carve-out intodrained(the documented invariant remains for a future channel-mode change toWait). Arejectedmeasurement on a non-inbox worker is an instrumentation bug.
Per-drain-cycle cadence — exactly one RecordDrainCycle call per drain cycle. MessageTransportProcessor.DistributeOnceAsync
emits per module inside its outer foreach module walk (each module owns its own drain loop); InboxOperator.ProcessOnceAsync
and IngressDistributionProcessor.DistributeOnceAsync emit once per *OnceAsync invocation. A cycle that fetched zero
rows still emits exactly one sample (iterations = 1, terminal_reason = "near_empty") so dashboards never see a
missing-sample artefact. NO module tag, NO message_type tag — per-module backlog visibility stays on inbox.lag;
the drain instruments hold a fixed 3 × 3 = 9 series ceiling per instrument (or 3 × 4 = 12 once rejected becomes
live again), independent of module / message-type count.
A representative OTLP emission of the paired instruments — a transport drain cycle that fetched 7 batches before the source emptied, took 480ms, and exited on the natural-drain default:
{
"instrument": "messaging.drain.iterations",
"kind": "Counter<long>",
"value": 7,
"tags": { "worker_type": "transport", "terminal_reason": "drained" }
}
{
"instrument": "messaging.drain.duration",
"kind": "Histogram<double>",
"unit": "ms",
"value": 480.0,
"tags": { "worker_type": "transport", "terminal_reason": "drained" }
}
The two measurements share identical tag values by construction (single-entry RecordDrainCycle helper),
so a dashboard query that filters one instrument by (worker_type, terminal_reason) aligns one-to-one with
the other. See ADR-0034 for the full rationale,
the rejected-value structural-unreachability proof, and the cardinality trade-off.
Distributed tracing (ADR-0027)
Fluens.Messaging also owns a static ActivitySource named "Fluens.Messaging" (MessagingActivitySource —
single point of truth for the source name + span construction, mirroring MessagingMetrics's static-class
shape) that produces Producer / Consumer / Producer spans on the three messaging hot-paths. W3C trace-context
propagates end-to-end through the existing Envelope.Context.ActivityId (the traceparent string) plus the
new nullable Context.TraceState (an additive ADR-0008 extension under ADR-0026's forward-compat policy —
legacy envelopes deserialize TraceState = null and parse cleanly).
| Span | Kind | Name template | Started in | Key attributes |
|---|---|---|---|---|
| publish | Producer |
"{messageType} publish" |
OutboxSaveChangesInterceptor.FlushPendingToOutbox (per pending message, before context build) |
messageType, messageId, partitionKey, availableAt |
| consume | Consumer |
"{messageType} process" |
LaneWorker.TryProcessOnceAsync (per attempt, parent rebuilt from envelope via MessagingActivitySource.TryParseParent) |
messageType, messageId, handlerType, laneSlot, retryCount |
| delivery | Producer |
"{destinationApp} send" |
MessageTransportProcessor.DeliverAsync (per per-destination push, parent of the HttpClient client span) |
destinationApp, messageId, fluens.delivery.outcome (Acked | Hold | TimedOut) |
Retry / dead-letter / permanent-failure decisions surface as AddEvent on the failing attempt's consume span
(never as separate spans — Stage-2 1–5s pauses between attempts cannot be represented as one paused OTel span):
fluens.retry.scheduled— emitted inScheduleRetryAsyncbefore theExecuteUpdateAsync; tagsstage(1in-memory /2DB-scheduled) +next_retry_at.fluens.deadletter.move— emitted inMoveToDeadLetterAndPublishSystemFaultAsyncbefore theSaveChangesAsync; tagfailure_code(system.terminal-failureorsystem.envelope-corruptionper ADR-0026).fluens.permanent_failure— emitted when the lane worker classifies an attempt as permanent (anyIPermanentConsumeFailureper ADR-0023, including handler-thrownPermanentConsumeFailureExceptionper ADR-0025).
A BusinessFaultException is a domain signal, not a failure — the consume span ends with status Unset / Ok
and no RecordException (only retry-exhaustion + ADR-0023 permanent failure record the captured exception).
A malformed legacy envelope can never crash the lane worker: MessagingActivitySource.TryParseParent is a total
function that returns default(ActivityContext) on null / empty / malformed traceparent and the consume span
simply starts a fresh trace.
Consumer opt-in — source-name registration stays consumer-driven (identical to the existing Fluens.Cqrs
pattern): list "Fluens.Messaging" in ObservabilityOptions.AdditionalSources so the
AddObservability()-wired OTLP exporter subscribes to the source. Forgetting it is a silent no-op (the helpers
return null, no spans export, no exception thrown).
{
"Fluens": {
"Observability": {
"Enabled": true,
"OtlpEndpoint": "http://otel-collector:4317",
"AdditionalSources": [ "Fluens.Messaging" ]
}
}
}
The receiver-side MeshEndpoints.POST /mesh/deliver does NOT start a Fluens-owned consume span — it persists
the envelope verbatim to IngressOutbox; the consume span starts later in LaneWorker.TryProcessOnceAsync on
the receiver side via the same MessagingActivitySource.TryParseParent path, linking to the original publisher
trace. The mesh HttpClient instrumentation already wired by AddObservability() produces the cross-process
client span as a child of the per-destination delivery span — no Fluens-owned wire-level span is added beyond
the outer DeliverAsync producer span. See ADR-0027 for the full rationale, alternatives, and rollback plan.
Message TTL (no-loss semantics)
Every outbox message carries an absolute expiry. The source OutboxMessage.ExpiresAt is computed as (AvailableAt ?? CreatedAt) + TTL at the moment the message is written to the outbox (so a scheduled message's TTL is measured from its availability instant). The TTL is MessagingOptions.MessageTtlSeconds by default, overridable per message type with the [MessageTtl] attribute:
[MessageTtl(300)] // this message type expires 5 minutes after it is written
public record OrderCreatedMessage(Guid OrderId) : IMessage;
When a message exceeds its TTL it is marked expired in place (ExpiredAt + the last error are recorded) and never delivered again — the outbox row is never deleted. A message is therefore always either delivered or visibly dead, never silently lost.
The source-row expiry above is publish-time absolute and stays so. Each per-destination OutboxDelivery row, however, gets a fan-out-relative expiry (ADR-0024): when the transport processor fans the message out it stamps the delivery row's ExpiresAt via OutboxDistributionPlanner.PlanDeliveryExpiry, which reconstructs the original ttl (ExpiresAt - (AvailableAt ?? CreatedAt)) and re-anchors it to the fan-out instant (now + ttl) instead of copying the source row's absolute deadline. So a delivery row born after a slow-convergence window (e.g. Consul-KV unavailable at startup) starts its full ttl budget at fan-out rather than being born already past ExpiresAt and expired on the same cycle with zero delivery attempts. A genuinely past-TTL source (negative/zero reconstructed ttl) still expires immediately — that is correct.
Graceful Shutdown
Both MessageTransportProcessor and InboxOperatorWorker support cooperative cancellation, and
LaneRegistry<TDbContext> registers an IHostApplicationLifetime.ApplicationStopping callback that
fires its ShutdownAsync with a bounded graceful-drain window (default 30s, ADR-0018):
- Batch loops check the cancellation token between iterations — unprocessed messages are picked up on next startup (at-least-once)
OperationCanceledExceptionduring shutdown is not treated as a processing failure — no retry count increment, no dead letter moves- Messages interrupted by shutdown retain their current retry count; the inbox row stays pending and re-runs on the next process start
LaneRegistry.ShutdownAsyncis shutdown, not a drain primitive — integration harnesses and graceful-stop code MUST drain pending rows by polling the operator's own pending filter (processed_at IS NULL AND (next_retry_at IS NULL OR next_retry_at <= now)) BEFORE callingShutdownAsync, never useShutdownAsyncas the drain gate itself
Deployment Scope (ADR-0015)
Each Fluens application instance is the sole process running its messaging module(s) and the sole
owner of the module's database / schema — multi-instance horizontal scaling of the same module is
not supported. Horizontal scale-out MUST be achieved by partitioning the workload across modules
(each module instance owns a dedicated DB or schema) — never by running two replicas pointed at the
same inbox / outbox / ingress_outbox. Fluens cannot detect a misconfigured second instance; the
deployment topology is the operator's responsibility.
Cross-application mesh (ports)
Fluens.Messaging is non-web — it contains no gRPC, protobuf, or service-discovery code. Cross-application delivery is expressed through two port abstractions that ride on the existing outbox/inbox durability (at-least-once, MessageId dedup, retry, dead-letter, context propagation):
IRemoteMessageTransport— outbound port: pushes a batch of already-serialized messages (RemoteMessagecarriesbyte[]payload, type-name string,byte[]envelope) to a single peer endpoint and returns the per-message acks (RemoteDeliveryResult).IRemoteSubscriptionRegistry— read port over themessageType → {app, endpoint}map learned from peers; returns the remote subscribers a message must be pushed to.RemoteSubscriberInfo.Endpointis nullable — anullendpoint means the owed app has no healthy instance right now, so its delivery row is held (waits for a live instance), never dropped. Also exposesbool HasConverged: during startup an empty owed-app set is ambiguous (no peers vs. peers not yet learned), so the sender fans an outbox row into per-destination delivery rows only after the registry has converged (the mesh adapter flips it on its first peer refresh; the in-process no-op default is always converged). While unconverged the outbox row stays pending for retry instead of being marked delivered.
Per-destination delivery (ADR-0007 / ADR-0024): delivery is tracked with one durable outbox_delivery row per (OutboxId, DestinationApp) pair rather than the all-or-nothing terminal flag. Each row's ExpiresAt is fan-out-relative — re-anchored to the fan-out instant via OutboxDistributionPlanner.PlanDeliveryExpiry (now + reconstructed ttl, ADR-0024), not copied from the source row's publish-time absolute deadline (see Message TTL above). Each row is then delivered (acked → DeliveredAt), held (no healthy endpoint), or TTL-expired in place (ExpiredAt + a fluens.messaging.outbox.delivery.expired metric tagged with destination_app). The source outbox row is marked sent (SentAt) only when every owed row is delivered, ExpiredAt when all rows are terminal and at least one expired, and stays pending otherwise — so a transiently-absent peer can no longer mark the message delivered forever.
Bounded-parallel two-phase distribution (ADR-0022): within a cycle the transport processor distributes each message in three scopes — a sequential prepare scope (local fan-out + per-destination delivery-row creation + classification into pure work items, touching EF only), a parallel I/O phase that pushes every deliverable row concurrently with no DbContext open (bounded by TransportMaxConcurrentDeliveries via a single processor-wide semaphore, each push wrapped in a TransportDeliveryTimeoutSeconds budget), and a sequential persistence scope that re-reads the tracked rows and commits the outcomes in one SaveChangesAsync per message. A hung or slow peer therefore no longer blocks healthy peers in the same batch (head-of-line blocking is removed), every delivery has a bounded time budget, and concurrent connections are capped. A per-delivery timeout (or any failed push / missing ack) maps to a hold — the row stays pending and is retried next cycle, never expired and never dead-lettered. Setting TransportMaxConcurrentDeliveries = 1 reproduces strictly-sequential delivery.
The core registers no-op defaults (NullRemoteMessageTransport, EmptyRemoteSubscriptionRegistry) via TryAddSingleton, so in-process-only applications keep working unchanged. The gRPC mesh adapter is Fluens.Web.Messaging, which overrides these ports — install it to enable P2P delivery between applications.
Receiver ingress and the no-relay rule
A batch received from a peer is staged atomically in the single IngressOutbox table (shared schema, MessageId primary key = end-to-end dedup) before the receiver acks. Local fan-out into module inboxes is a separate idempotent step performed by IngressDistributionProcessor. A received message is terminal: it is distributed to local inboxes only and never forwarded back over the network. This no-relay guarantee is structural — the ingress processor has no transport dependency, so relaying is impossible by construction.
A directed message (ADR-0012 — e.g. a cross-application Fault<T>) reuses this same ingress: its origin-module target rides inside the serialized envelope's fluens-directed-module header, which IngressDistributionProcessor reads to route the message to exactly one origin module (bypassing type fan-out) — no new wire endpoint, and still local-only (no relay).
IngressDbContext (the dedicated context owning the ingress_outbox table) and IngressDistributionProcessor are defined here in the core but registered by the Fluens.Web.Messaging adapter, which supplies the database provider pointed at the shared schema.
License
This project is licensed under the MIT License.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Fluens (>= 0.7.4)
- Fluens.Kernel (>= 0.7.4)
- Microsoft.EntityFrameworkCore.Relational (>= 10.0.9)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.9)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on Fluens.Messaging:
| Package | Downloads |
|---|---|
|
Fluens.Web.Messaging.Dashboard
Cluster-wide monitoring surface for the Fluens cross-application messaging mesh: dead-letter queries, inbox lag, and stats snapshots. |
|
|
Fluens.Web.Messaging
P2P HTTP/Protobuf mesh adapter for cross-application Fluens messaging, with shared-key signed cluster authentication. |
|
|
Fluens.Messaging.Sagas
Process Manager / Saga abstraction on top of Fluens.Messaging. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.7.4 | 38 | 6/18/2026 |
| 0.7.2 | 67 | 6/18/2026 |
| 0.7.1 | 64 | 6/18/2026 |
| 0.6.6 | 134 | 3/11/2026 |
| 0.6.5 | 114 | 3/4/2026 |
| 0.6.4 | 114 | 3/4/2026 |
| 0.6.3 | 119 | 3/3/2026 |
| 0.6.2 | 118 | 3/2/2026 |
| 0.6.1 | 115 | 3/2/2026 |
| 0.6.0 | 121 | 3/1/2026 |
| 0.5.7 | 117 | 3/1/2026 |
| 0.5.6 | 118 | 3/1/2026 |
| 0.5.5 | 121 | 2/28/2026 |
| 0.5.4 | 120 | 2/28/2026 |
| 0.5.3 | 128 | 2/27/2026 |
| 0.5.2 | 119 | 2/27/2026 |
| 0.5.1 | 124 | 2/27/2026 |
| 0.5.0 | 124 | 2/26/2026 |
| 0.3.2 | 127 | 2/26/2026 |
| 0.3.1 | 120 | 2/26/2026 |