Stratara.Outbox.RabbitMQ
3.1.4
dotnet add package Stratara.Outbox.RabbitMQ --version 3.1.4
NuGet\Install-Package Stratara.Outbox.RabbitMQ -Version 3.1.4
<PackageReference Include="Stratara.Outbox.RabbitMQ" Version="3.1.4" />
<PackageVersion Include="Stratara.Outbox.RabbitMQ" Version="3.1.4" />
<PackageReference Include="Stratara.Outbox.RabbitMQ" />
paket add Stratara.Outbox.RabbitMQ --version 3.1.4
#r "nuget: Stratara.Outbox.RabbitMQ, 3.1.4"
#:package Stratara.Outbox.RabbitMQ@3.1.4
#addin nuget:?package=Stratara.Outbox.RabbitMQ&version=3.1.4
#tool nuget:?package=Stratara.Outbox.RabbitMQ&version=3.1.4
Stratara.Outbox.RabbitMQ
License: FSL-1.1-MIT (Functional Source License — source-available; converts to MIT after 2 years). Not OSI-approved OSS.
Outbox-pattern command + event dispatch for the Stratara event-sourced stack with a RabbitMQ / Azure Service Bus message-bus implementation. Contains the write-side dispatchers, the outbox-retry worker, the read-side mediator command worker, the message-bus implementations, and the Redis-backed ProjectionReplayState that coordinates dispatch skip during projection replay.
What's in the box
| Folder | Contents |
|---|---|
Outbox/ |
OutboxOptions, CommandOutboxDispatcher (write-side ICommand fan-out via IMessageBus, falls back to outbox table on bus failure), EventBundleOutboxDispatcher (same for EventBundle), OutboxWorker (hosted service that retries unpublished outbox rows on a polling interval), NullOutboxLock + RedisOutboxLock (IOutboxLock implementations — default no-op for single-instance deployments, Redis-leased distributed lock for multi-replica setups) |
Messaging/ |
RabbitMqBus — IMessageBus over RabbitMQ. Azure Service Bus ships as the sibling Stratara.Outbox.AzureServiceBus package. |
Mediator/ |
MediatorCommandWorker (hosted service that subscribes to the command topic and dispatches into the in-process IMediator) |
Projections/ |
ProjectionReplayState (Redis-backed concrete IProjectionReplayState; dispatchers skip publishing while replay is active) |
DependencyInjection/ |
AddOutboxDispatcher(), AddOutboxWorker(IConfiguration), AddRedisOutboxLock() (opt-in distributed lock), AddProjectionReplayState(), AddMediatorWorker(), AddMessaging() |
Diagnostics/Extensions/ |
LoggerOutboxExtensions, LoggerMessagingExtensions (source-generated logger surfaces) |
Quick start
// In your API host:
builder.AddMessaging(); // IMessageBus + MessagingOptions binding
builder.Services
.AddOutboxDispatcher() // CommandOutboxDispatcher + EventBundleOutboxDispatcher + ProjectionReplayState
.AddOutboxWorker(builder.Configuration); // OutboxWorker hosted service (only if this host owns retries)
// In your command worker:
builder.Services
.AddMediatorWorker(); // MediatorCommandWorker hosted service
The dispatchers consult IProjectionReplayState.IsReplayActive before each publish and skip dispatch (writing to the outbox table only) while a replay is in progress.
Multi-instance outbox workers
AddOutboxWorker registers NullOutboxLock as the default IOutboxLock — a no-op that preserves the single-instance assumption. For multi-replica deployments call AddRedisOutboxLock() afterwards; it replaces the no-op with a Redis-leased lock (SET stratara:outbox:lock NX EX) so only one replica drains at a time:
builder.AddCaching(); // registers IConnectionMultiplexer
builder.Services
.AddOutboxDispatcher()
.AddOutboxWorker(builder.Configuration)
.AddRedisOutboxLock(); // multi-replica safe
The lease defaults to 60 s (OutboxOptions.LockLeaseSeconds). Tune it so it exceeds the worst-case drain duration; otherwise the lock can expire mid-cycle and a peer may start a concurrent drain. Outbox semantics are still at-least-once, so a duplicate publish is recoverable provided handlers stay idempotent.
Dependencies
Stratara.Abstractions— forICommand,IEvent,IMessageBus,ICommandOutboxDispatcher,IEventBundleOutboxDispatcher,IProjectionReplayState,IMessagingIdentifier,IWriteUnitOfWork(used at runtime via the outbox repository).Stratara.Contracts— forEventBundle+CommandEnvelopemessages.Stratara.Mediator—MediatorCommandWorkerdispatches into the in-processIMediator.Stratara.Sessions— dispatcher hydratesCommandEnvelopefrom the current session context.Stratara.Shared— for messaging primitives, resilience pipeline names, mapping helpers, and the diagnostics base.RabbitMQ.Client,StackExchange.Redis(replay-state + optional outbox-lock).Microsoft.Extensions.Hosting.Abstractions+Microsoft.Extensions.Options.ConfigurationExtensions— for hosted services + options binding.
The outbox dispatcher persists rows through
IWriteUnitOfWork.CreateOutboxRepository— that interface lives inStratara.Abstractions, but the concrete implementation comes fromStratara.EventSourcing.EntityFrameworkCore. Reference that package alongside this one to get a working stack.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.8)
- Microsoft.Extensions.DependencyInjection (>= 10.0.8)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.8)
- RabbitMQ.Client (>= 7.2.1)
- StackExchange.Redis (>= 2.13.10)
- Stratara.Abstractions (>= 3.1.4)
- Stratara.Contracts (>= 3.1.4)
- Stratara.Mediator (>= 3.1.4)
- Stratara.Sessions (>= 3.1.4)
- Stratara.Shared (>= 3.1.4)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on Stratara.Outbox.RabbitMQ:
| Package | Downloads |
|---|---|
|
Stratara.Infrastructure
Infrastructure glue for the Stratara framework — authorization decorators, configuration providers, and DI composition helpers that wire Mediator, Outbox, Identity, and EF Core into a hosted app. |
|
|
Stratara.EventSourcing.WorkerDefaults
Worker-host wiring composites for the Stratara event-sourced stack. IHostApplicationBuilder extensions (AddBackendServices, AddCommandWorkerServices, AddEventProjectionWorkerServices, AddSagaWorkerServices, AddOutboxWorkerServices) bundle the per-concern DI calls so each worker host opts in with one line. |
GitHub repositories
This package is not used by any popular GitHub repositories.
### Added
- **Command-workload isolation (heavy-command lane)** — long-running commands can now be routed to a
dedicated worker lane so they cannot starve interactive commands. Mark a command with the new
`Stratara.Abstractions.Mediator.IHeavyCommand` marker and the `ICommandOutboxDispatcher`
automatically publishes it to a separate heavy-command topic (`IMessagingIdentifier.HeavyCommandTopic` /
`HeavyCommandSubscription`, configurable under `Messaging:HeavyCommand`, defaulting to `heavy-command` /
`heavy-command-subscription`). Run a dedicated heavy-command worker with the new
`services.AddHeavyCommandWorker(degreeOfParallelism?)` extension, or the
`builder.AddHeavyCommandWorkerServices(degreeOfParallelism?)` host composite — in the same process as
the interactive worker (two lanes) or in a separately scaled host. Each worker's degree of parallelism
is configurable per lane. `IMessagingIdentifier` gains `HeavyCommandTopic`, `HeavyCommandSubscription`,
and the `GetCommandTopic(Type)` / `GetCommandSubscription(Type)` routing helpers. The interactive lane
(`AddMediatorWorker()`) is unchanged and remains the default; commands not marked heavy keep their
existing routing. If a heavy command is dispatched while no heavy worker is bound, the publish is
rejected and the command is preserved in the outbox until a heavy-command worker comes online — it is
never dropped. Works over both the RabbitMQ and Azure Service Bus message buses (Azure Service Bus
requires the heavy-command topic/subscription to be provisioned, like the existing command topic). New
log-event ID `105_005` (`CommandWorkerLaneStarted`) in `Stratara.Diagnostics`.
- **Observability metrics across the worker pipeline** (`Stratara.Diagnostics`) — the shared
`Stratara.Service` meter now publishes throughput and latency instruments so operators can see how the
event-sourcing pipeline is behaving instead of flying blind on a single counter. New instruments:
`event_source.events.appended` (counter, tagged by `event.type` / `aggregate.type`),
`outbox.published` (counter, tagged by `outbox.kind` = `command` / `event`), `command.duration`
(histogram, ms, tagged by `request.type` / `outcome`), `projection.events.processed` (counter) +
`projection.bundle.duration` (histogram, ms), `saga.events.processed` (counter) +
`saga.bundle.duration` (histogram, ms), and `saga.inflight` (up/down counter). They are recorded by the
event source, command worker, projection worker, saga worker, and outbox worker respectively. Because
projections and sagas are real-time bus subscribers without a persisted checkpoint, these report
**throughput and latency**, not consumer lag. No configuration is required — point any OpenTelemetry
metrics exporter at the `Stratara.Service` meter.
- **Operational health checks for the event store and outbox** (`Stratara.EventSourcing.EntityFrameworkCore`) —
two opt-in readiness checks added to any `IHealthChecksBuilder`: `AddEventStoreHealthCheck()` verifies
the write-side database is reachable, and `AddOutboxHealthCheck(degradedThreshold?, unhealthyThreshold?)`
reports the pending outbox backlog (exposed under the `pending` data key) and escalates to
`Degraded` / `Unhealthy` when the backlog crosses the supplied thresholds. Both are tagged `ready` by
default (so they map to a readiness endpoint, not liveness) and require the Stratara write store to be
registered. The write-store DbContext is now also resolvable as a scoped `IWriteDbContext` service to
support these checks.
- **Polly-backed mediator resilience behavior** (`Stratara.Resilience`) — an opt-in pipeline behavior
wraps the in-process dispatch of a request marked with the new
`Stratara.Abstractions.Resilience.IResilientRequest` in the named Polly pipeline the request selects
(`ResiliencePipelineName`). Register it with the new `AddStrataraResilienceBehavior()` (after
`AddStrataraValidation()` / `AddStrataraTenantIsolation()` so the retry wraps the handler, not the
guards); requests without the marker are unaffected. A new built-in pipeline
`ResilienceNames.ConcurrencyConflict` retries **only** on
`Stratara.Abstractions.Persistence.ConcurrencyConflictException` (5 attempts, short exponential
backoff) so a handler that re-reads and re-applies on an optimistic-concurrency clash succeeds without
bespoke retry loops; it is registered by `AddResiliencePipelines()` alongside the existing message-bus
and dispatcher pipelines. Only mark handlers that are safe to re-run (idempotent or concurrency-guarded).