SimplyWorks.Bus
8.1.12
dotnet add package SimplyWorks.Bus --version 8.1.12
NuGet\Install-Package SimplyWorks.Bus -Version 8.1.12
<PackageReference Include="SimplyWorks.Bus" Version="8.1.12" />
<PackageVersion Include="SimplyWorks.Bus" Version="8.1.12" />
<PackageReference Include="SimplyWorks.Bus" />
paket add SimplyWorks.Bus --version 8.1.12
#r "nuget: SimplyWorks.Bus, 8.1.12"
#:package SimplyWorks.Bus@8.1.12
#addin nuget:?package=SimplyWorks.Bus&version=8.1.12
#tool nuget:?package=SimplyWorks.Bus&version=8.1.12
SimplyWorks.Bus
A lightweight .NET 8 message bus library built on top of RabbitMQ, designed for event-driven microservice architectures in ASP.NET Core.
The library ships as three complementary NuGet packages:
| Package | Purpose |
|---|---|
SimplyWorks.Bus |
Core runtime — publishing, consuming, retries, dead-letter, tracing |
SimplyWorks.Bus.RabbitMqExtensions |
Public contracts, consumer interfaces, dashboard data models |
SimplyWorks.Bus.RabbitMqViewer |
Built-in HTMX operations dashboard (dark + light mode, zero JS framework) |
Table of Contents
- Features
- Installation
- Quick Start
- Publishing Messages
- Consuming Messages
- Delayed Publishing
- Broadcasting & Listeners
- Per-Queue Configuration
- Extended Consumer Options (IConsumeExtended)
- Monitoring — IConsumerReader
- Error Queue Inspection — IErrorQueueReader
- Operational Events Pipeline
- Dashboard Data Service — IBusDashboardDataService
- Custom Alert Thresholds — IAlertEvaluator
- Building a Custom Dashboard
- Operations Viewer Dashboard
- Full BusOptions Reference
- Architecture
- Testing
- Dependencies
Features
- 🚌 Simple Message Publishing — typed and string-based
IPublish - ⏱️ Delayed Publishing — deliver to a single consumer after an exact delay via
IDelayedPublish; uses the RabbitMQ Delayed Message Exchange plugin when available, TTL buckets otherwise - 📡 Broadcasting — fan-out to all application instances via
IBroadcast/IListen<T> - 🔄 Automatic Retries — configurable per-queue retry counts and delay with dead-letter routing
- 🎯 Typed Consumers —
IConsume<T>with strongly-typed message deserialization - ⚙️ Extended Consumer Options — per-consumer prefetch and priority via
IConsumeExtended - 🔐 JWT Propagation — user context forwarded through message headers across services
- 📊 Queue Monitoring — live queue depth, rates, and consumer counts via
IConsumerReader - 🔍 Error Queue Inspection — peek retry and dead-letter queues via
IErrorQueueReader - 📈 Operational Events — structured lifecycle events with
ActivitySourcetracing andMetermetrics - 🖥️ Built-in Dashboard — HTMX admin UI with dark/light mode toggle, form-based login, CSS refresh animations, and critical status tooltips via
SimplyWorks.Bus.RabbitMqViewer - 🏗️ Custom Dashboard API — all data exposed through
IBusDashboardDataServiceinSimplyWorks.Bus.RabbitMqExtensions— build your own UI without the viewer package - 🧪 Testing Support — mock publisher for unit testing
Installation
# Core — always required
dotnet add package SimplyWorks.Bus
# Public contracts + monitoring interfaces (no RabbitMQ.Client dependency)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions
# Optional: built-in operations dashboard
dotnet add package SimplyWorks.Bus.RabbitMqViewer
Quick Start
1. Connection string
Add RabbitMQ connection string to your appsettings.json:
{
"ConnectionStrings": {
"RabbitMQ": "amqp://guest:guest@localhost:5672/"
}
}
2. Service registration
In your Startup.cs or Program.cs:
services.AddBus(config =>
{
config.ApplicationName = "MyApp";
// Optional JWT configuration
config.Token.Key = Configuration["Token:Key"];
config.Token.Issuer = Configuration["Token:Issuer"];
config.Token.Audience = Configuration["Token:Audience"];
});
services.AddBusPublish(); // registers IPublish, IBroadcast
services.AddBusConsume(); // registers IHostedService consumer + scans calling assembly
Publishing Messages
public class OrderController : ControllerBase
{
private readonly IPublish _publish;
private readonly IBroadcast _broadcast;
public OrderController(IPublish publish, IBroadcast broadcast)
{
_publish = publish;
_broadcast = broadcast;
}
[HttpPost]
public async Task<IActionResult> Create(CreateOrderRequest req)
{
// Routed to consumers subscribed to OrderCreated
await _publish.Publish(new OrderCreated { OrderId = Guid.NewGuid() });
// Fan-out to all connected application instances
await _broadcast.Broadcast(new OrderNotification { Message = "New order" });
return Ok();
}
}
String-based publish (useful for dynamic routing):
await _publish.Publish("OrderCreated", jsonPayload);
await _publish.Publish("OrderCreated", payloadBytes);
Consuming Messages
Typed consumer
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message)
{
// Auto-acked on success, rejected/retried on exception
}
}
Multi-message string consumer
public class GenericConsumer : IConsume
{
public Task<IEnumerable<string>> GetMessageTypeNames()
=> Task.FromResult<IEnumerable<string>>(new[] { "OrderCreated", "OrderCancelled" });
public async Task Process(string typeName, string message) { ... }
}
Optional failure handler
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
// Called on every failure (including retries) — optional
public async Task OnFail(Exception ex) { ... }
}
Accessing request context
public class SecureConsumer : IConsume<SecureMessage>
{
private readonly RequestContext _ctx;
public SecureConsumer(RequestContext ctx) => _ctx = ctx;
public async Task Process(SecureMessage msg)
{
var user = _ctx.User;
var correlationId = _ctx.CorrelationId;
var remaining = _ctx.GetValue("RemainingRetries");
}
}
Consumer registration
services.AddBusConsume(); // scan calling assembly
services.AddBusConsume(typeof(OrderCreatedConsumer).Assembly); // specific assembly
Delayed Publishing
IDelayedPublish delivers a message to one specific consumer queue after a delay. Unlike IPublish — which routes by message type and fans out to every consumer bound to that type — delayed publishing routes by the target consumer's class name, so only that one queue receives the message.
IDelayedPublish is registered automatically when you call AddBusPublish().
Usage
public class OrderController : ControllerBase
{
private readonly IDelayedPublish _delayed;
public OrderController(IDelayedPublish delayed) => _delayed = delayed;
[HttpPost("schedule")]
public async Task<IActionResult> Schedule()
{
// Deliver to OrderReminderConsumer.Process(OrderReminder) in 30 minutes
await _delayed.PublishDelayed(
new OrderReminder { OrderId = "123" },
consumerName: nameof(OrderReminderConsumer),
delay: TimeSpan.FromMinutes(30));
return Accepted();
}
}
The consumerName is the exact class name of the target consumer. Combined with the message type name, it forms the naked queue name {ConsumerClass}.{MessageType} that uniquely identifies the consumer queue.
// Raw overload — useful when consumer and message types are not referenced in the calling assembly
await _delayed.PublishDelayed(
messageTypeName: "OrderReminder",
body: jsonString,
nakedQueueName: "orderreminderconsumer.orderreminder",
delay: TimeSpan.FromMinutes(30));
Passing TimeSpan.Zero (or negative) skips the delay entirely and delivers immediately to the targeted consumer.
Why not IPublish with a delay?
IPublish.Publish routes by message type name on the process exchange. Every consumer queue bound to that message type receives a copy — that is intentional fan-out. IDelayedPublish routes by the consumer's naked queue name on a separate direct exchange, so the message reaches exactly one queue regardless of how many other consumers handle the same message type.
How delivery works
The library detects at startup whether the broker has the RabbitMQ Delayed Message Exchange plugin enabled, and picks the appropriate strategy automatically. You write the same calling code either way.
Strategy 1 — Delayed Message Exchange plugin (exact delays)
When the plugin is available (BusOptions.DelayedPluginAvailable == true), the library declares an x-delayed-message exchange (v3.{env}.delay.x) and publishes with an x-delay header set to the requested milliseconds. The plugin holds the message broker-side until the delay elapses, then routes it directly to the target consumer queue.
Publisher ──► v3.{env}.delay.x (x-delayed-message, direct)
│ holds for exactly N ms
▼
v3.{env}.{app}.{ConsumerClass}.{MessageType} ← target consumer queue
This is the preferred path — delays are precise to the millisecond.
Strategy 2 — TTL delay buckets (fallback when plugin is absent)
When the plugin is not installed, the library uses RabbitMQ's native TTL + dead-letter mechanism. Requested delays are rounded up to the nearest pre-configured bucket (default ladder: 1 s, 5 s, 15 s, 30 s, 60 s, 5 min, 15 min, 30 min, 1 h). A pair of exchanges and a queue are created on demand for each bucket duration used.
Publisher
│ publish(routingKey = "orderreminderconsumer.orderreminder")
▼
v3.{env}.delay.in.1800s ← fanout entry exchange
│ (delivers to TTL queue regardless of routing key,
│ but preserves the original routing key on the message)
▼
v3.{env}.delay.1800s ← TTL queue (x-message-ttl = 1800000 ms)
│
│ TTL expires → dead-letter with original routing key intact
▼
v3.{env}.delay ← direct router exchange
│ routes by "orderreminderconsumer.orderreminder"
▼
v3.{env}.{app}.orderreminderconsumer.orderreminder ← target consumer queue
Why the fanout entry exchange?
Publishing directly into the TTL queue (via the default exchange) would replace the routing key with the queue name, losing the consumer target. The fanout entry exchange delivers to the TTL queue without touching the routing key, so it survives intact all the way to the dead-letter route on the direct router.
Bucket queues are declared lazily on first use. Each bucket is created at most once per process lifetime. The bucket ladder is configurable:
services.AddBus(config =>
{
// Custom bucket ladder — delays round up to nearest value (seconds)
config.DelayBucketsSeconds = new[] { 5, 30, 60, 300, 3600 };
});
A delay larger than all configured buckets creates a one-off bucket for that exact duration.
Exchange topology summary
| Exchange | Type | Purpose |
|---|---|---|
v3.{env}.delay |
Direct | Final router — consumer queues bind here by naked queue name |
v3.{env}.delay.x |
x-delayed-message | Plugin path — holds messages until delay elapses |
v3.{env}.delay.in.{N}s |
Fanout | TTL fallback — entry point for each bucket duration |
Each consumer queue gets an extra binding to v3.{env}.delay (and v3.{env}.delay.x when the plugin is present) during ConsumersService startup. No other topology changes are needed.
Failure handling after delivery
Once a delayed message is delivered to the consumer queue, the normal retry and dead-letter machinery takes over. If the consumer throws, the message is rejected to its .retry queue, retried up to DefaultRetryCount times, and moved to .bad on exhaustion — exactly the same as a non-delayed message.
Broadcasting & Listeners
Broadcasts are fan-out messages delivered to every running instance simultaneously.
// Send
await _broadcast.Broadcast(new PricingUpdated { Version = 42 });
// Trigger live consumer refresh across all instances
await _broadcast.RefreshConsumers();
// Receive
public class PricingUpdatedListener : IListen<PricingUpdated>
{
public async Task Process(PricingUpdated message) { ... }
public async Task OnFail(Exception ex) { ... } // optional
}
services.AddBusListen(); // scan calling assembly
services.AddBusListen(typeof(PricingUpdatedListener).Assembly); // specific assembly
Per-Queue Configuration
Fine-tune individual queues via BusOptions.AddQueueOption. The key is "{ConsumerClass}.{MessageType}" (case-insensitive).
services.AddBus(config =>
{
config.AddQueueOption(
"OrderCreatedConsumer.OrderCreated",
prefetch: 10,
retryCount: 3,
retryAfterSeconds: 30);
// Enable priority queue (0–10 range)
config.AddQueueOption(
"PaymentConsumer.PaymentProcessed",
priority: 10);
});
Extended Consumer Options (IConsumeExtended)
For consumers that need per-instance prefetch or priority without touching global config:
// Typed consumer with runtime options
public class PriorityOrderConsumer : IConsumeExtended<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
public Task<ConsumerOptions> GetConsumerOptions() =>
Task.FromResult(new ConsumerOptions { Prefetch = 8, Priority = 5 });
}
// Multi-message consumer with per-type options
public class MultiConsumer : IConsumeExtended
{
public Task<IEnumerable<string>> GetMessageTypeNames() =>
Task.FromResult<IEnumerable<string>>(new[] { "TypeA", "TypeB" });
public Task<IDictionary<string, ConsumerOptions>> GetMessageTypeNamesWithOptions() =>
Task.FromResult<IDictionary<string, ConsumerOptions>>(
new Dictionary<string, ConsumerOptions>
{
["TypeA"] = new ConsumerOptions { Prefetch = 4 },
["TypeB"] = new ConsumerOptions { Prefetch = 16, Priority = 3 }
});
public async Task Process(string typeName, string message) { ... }
}
IConsumeExtended consumers are hot-reloadable — call IBroadcast.RefreshConsumers() to apply updated options across all running instances without restart.
Monitoring — IConsumerReader
Registered automatically by AddBus(). Returns live queue statistics from the RabbitMQ Management API with configurable caching.
// All consumers
var all = await _reader.GetAllConsumersCount();
// Typed consumer + message type
var order = await _reader.GetConsumerCount<OrderConsumer, OrderCreated>();
// Multi-message consumer by message name
var generic = await _reader.GetConsumerCount<GenericConsumer>("OrderCreated");
// All queues for a consumer class
var byClass = await _reader.GetConsumerCount<OrderConsumer>();
ConsumerCount fields:
| Field | Description |
|---|---|
Name |
Consumer class name |
MessageName |
Message type name |
TotalNodes |
Active consumer instances |
ProcessingCount |
Messages in-flight (unacknowledged) |
QueueCount |
Messages ready in main queue |
RetryCount |
Messages in retry queue |
FailedCount |
Messages in dead-letter queue |
Priority |
Consumer priority level |
Prefetch |
QoS prefetch count |
IncomingRate |
Publish rate (msg/s) |
ProcessingRate |
Deliver rate to consumers (msg/s) |
AckRate |
Acknowledge rate (msg/s) |
services.AddBus(config =>
{
config.MonitoringCacheSeconds = 10; // cache duration 3–60 s, default 5
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
});
Error Queue Inspection — IErrorQueueReader
Peek at messages in retry or dead-letter queues without removing them.
// Typed consumer
var failed = await _errorReader.Peek<OrderConsumer, OrderCreated>(ErrorQueueType.Bad, count: 20);
// Multi-message consumer
var retrying = await _errorReader.Peek<GenericConsumer>("OrderCreated", ErrorQueueType.Retry);
// Raw queue name
var raw = await _errorReader.PeekByQueueName("v3.production.orderconsumer.ordercreated.bad");
ErrorMessage properties:
| Property | Description |
|---|---|
RawBody |
Original JSON payload |
Exchange |
Exchange where message was published |
RoutingKey |
Routing key used at publish |
Properties |
All AMQP properties |
Headers |
Extracted AMQP headers |
CorrelationId |
Correlation ID if set |
ExceptionHistory |
All recorded exceptions oldest-first |
LastException |
Most recent exception string |
Operational Events Pipeline
SW.Bus emits strongly-typed lifecycle events that flow through a lock-free buffered pipeline. The goal is observability and diagnostics, not logging.
Events emitted
| Event | Trigger |
|---|---|
PublishStarted |
Message about to be published |
PublishCompleted |
Publish succeeded (includes duration ms, payload bytes) |
PublishFailed |
Publish threw an exception |
MessageProcessingStarted |
Consumer received and started processing |
MessageProcessingCompleted |
Processed successfully (includes duration ms) |
MessageProcessingFailed |
Consumer threw an exception (type, message, stack trace) |
MessageRetryScheduled |
Message rejected back to retry queue |
MessageMovedToDeadLetter |
Message exhausted retries |
ConsumerConnected |
Consumer channel attached to a queue |
ConsumerDisconnected |
Consumer channel shut down |
QueueBackpressureDetected |
Queue depth exceeded QueueBackpressureThreshold |
Every event carries: TimestampUtc, SchemaVersion, EventName, MachineName, Environment, ApplicationName, Exchange, QueueName, ConsumerName, MessageType, MessageId, CorrelationId, CausationId, TraceId, SpanId, DeliveryTag.
Pipeline architecture
Consumer / Publisher hot path
│ (non-blocking TryWrite)
▼
BoundedChannel<IOperationalEvent> ← configurable capacity, drop-oldest on full
│
▼ (BackgroundService, configurable flush interval)
OperationalEventDispatcher
├── InMemoryOperationalEventStore (ring buffer — always present)
└── [your IOperationalEventBatchSink registrations]
Plugging in a custom external sink
using SW.Bus.RabbitMqExtensions;
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
{
await _esClient.BulkAsync(events, cancellationToken);
}
}
// Sinks stack additively — InMemoryStore is always active alongside yours
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
OpenTelemetry tracing
services.AddOpenTelemetry()
.WithTracing(b => b
.AddSource("SimplyWorks.Bus") // ActivitySource name
.AddOtlpExporter());
Prometheus / OpenTelemetry metrics (meter name: "SimplyWorks.Bus")
services.AddOpenTelemetry()
.WithMetrics(b => b
.AddMeter("SimplyWorks.Bus")
.AddPrometheusExporter());
| Instrument | Type | Description |
|---|---|---|
sw_bus_publish_started_total |
Counter | Publish attempts |
sw_bus_publish_completed_total |
Counter | Successful publishes |
sw_bus_publish_failed_total |
Counter | Failed publishes |
sw_bus_processing_started_total |
Counter | Messages picked up |
sw_bus_processing_completed_total |
Counter | Messages processed successfully |
sw_bus_processing_failed_total |
Counter | Messages that threw exceptions |
sw_bus_retry_scheduled_total |
Counter | Messages sent to retry queue |
sw_bus_dead_letter_total |
Counter | Messages moved to dead-letter |
sw_bus_operational_event_dropped_total |
Counter | Events dropped (buffer full) |
sw_bus_processing_latency_ms |
Histogram | Consumer processing time |
sw_bus_publish_latency_ms |
Histogram | Publish time |
Pipeline configuration
services.AddBus(config =>
{
config.OperationalEventsEnabled = true; // default
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
});
Dashboard Data Service — IBusDashboardDataService
All dashboard data is exposed through IBusDashboardDataService (defined in SimplyWorks.Bus.RabbitMqExtensions, implemented and registered by SimplyWorks.Bus). Inject it directly to build your own custom dashboard, REST API, or health probe — no dependency on SimplyWorks.Bus.RabbitMqViewer needed. See Building a Custom Dashboard for a complete guide.
public class OpsController : ControllerBase
{
private readonly IBusDashboardDataService _dash;
public OpsController(IBusDashboardDataService dash) => _dash = dash;
[HttpGet("ops/summary")]
public async Task<IActionResult> Summary()
=> Ok(await _dash.GetSummaryAsync());
[HttpGet("ops/consumers")]
public async Task<IActionResult> Consumers()
=> Ok(await _dash.GetConsumerHealthAsync());
[HttpGet("ops/queues")]
public async Task<IActionResult> Queues()
=> Ok(await _dash.GetQueueDetailsAsync());
[HttpGet("ops/retries")]
public async Task<IActionResult> Retries()
=> Ok(await _dash.GetRetryAnalysisAsync());
[HttpGet("ops/dead-letters")]
public async Task<IActionResult> DeadLetters()
=> Ok(await _dash.GetDeadLetterSummaryAsync());
[HttpGet("ops/alerts")]
public async Task<IActionResult> Alerts()
=> Ok(await _dash.GetAlertsAsync());
[HttpGet("ops/events")]
public IActionResult Events(
[FromQuery] string? consumer, [FromQuery] string? messageType,
[FromQuery] string? correlationId, [FromQuery] string? traceId,
[FromQuery] string? eventName, [FromQuery] int limit = 200)
=> Ok(_dash.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
CorrelationId: correlationId, TraceId: traceId,
EventName: eventName, Limit: limit)));
}
View models:
| Record | Key fields |
|---|---|
DashboardSummary |
TotalConsumers, UnhealthyConsumers, DisconnectedConsumers, TotalQueueDepth, TotalRetryBacklog, TotalDeadLetterBacklog, TotalIncomingRate, TotalAckRate, ActiveAlerts, LastUpdatedUtc |
ConsumerHealthView |
All ConsumerCount fields + QueueName, IsBackpressured, HealthStatus (AlertSeverity) |
QueueDetailView |
Main/retry/dead-letter queue names and depths, consumer count, rates |
RetryAnalysisView |
Per-consumer retry backlog ordered by size with severity |
DeadLetterSummaryView |
Per-consumer DL count, last exception type/message, last failure timestamp |
DashboardAlert |
Severity (Info/Warning/Critical), Title, Detail, QueueName, ConsumerName, TimestampUtc |
OperationalEventFilter fields (all optional, string fields are case-insensitive substring matches):
ApplicationName, ConsumerName, MessageType, CorrelationId, TraceId, QueueName, EventName (exact), From, To, Limit (default 200)
Custom Alert Thresholds — IAlertEvaluator
Default thresholds are configured on BusOptions:
services.AddBus(config =>
{
config.AlertRetryWarningThreshold = 10; // Warning when retry backlog ≥ N
config.AlertRetryCriticalThreshold = 100; // Critical when retry backlog ≥ N
config.AlertDeadLetterCriticalThreshold = 100; // Critical when DL count ≥ N
config.QueueBackpressureThreshold = 5000; // backpressure warning threshold
});
To apply domain-specific or SLA-based thresholds, replace the default evaluator:
public class MyAlertEvaluator : IAlertEvaluator
{
public IReadOnlyList<DashboardAlert> Evaluate(ConsumerHealthView[] consumers)
{
var alerts = new List<DashboardAlert>();
foreach (var c in consumers)
{
if (c.Name == "PaymentConsumer" && c.FailedCount > 0)
alerts.Add(new DashboardAlert(
AlertSeverity.Critical, "Payment Dead Letter",
$"{c.FailedCount} failed payments require immediate attention.",
c.QueueName, c.Name, DateTime.UtcNow));
}
return alerts;
}
}
// Register before AddBus() — TryAddSingleton means yours wins
services.AddSingleton<IAlertEvaluator, MyAlertEvaluator>();
services.AddBus(...);
Building a Custom Dashboard
If you do not want to use SimplyWorks.Bus.RabbitMqViewer — e.g. you want React, Blazor, an existing admin framework, or a JSON API for a mobile app — everything you need is in SimplyWorks.Bus.RabbitMqExtensions. You never need to install the viewer package.
Package requirements
dotnet add package SimplyWorks.Bus # core runtime (always required)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions # contracts + data service
# SimplyWorks.Bus.RabbitMqViewer is NOT needed
What the library provides
All interfaces below are registered automatically by services.AddBus(...). Just inject what you need.
| Interface | Description |
|---|---|
IBusDashboardDataService |
Aggregate read layer — summary, consumer health, queues, retries, dead letters, alerts, events |
IConsumerReader |
Raw per-consumer queue statistics from the RabbitMQ Management API (with configurable caching) |
IErrorQueueReader |
Peek at messages in retry and dead-letter queues without removing them |
IOperationalEventStore |
Query the in-memory event ring buffer with OperationalEventFilter |
IAlertEvaluator |
Evaluate a ConsumerHealthView[] snapshot and return DashboardAlert objects |
IOperationalEventBatchSink |
Implement to stream event batches to external systems (Elasticsearch, ClickHouse, etc.) |
Minimal REST API example (no viewer package)
// Program.cs
builder.Services.AddBus(config => { config.ApplicationName = "MyApp"; /* ... */ });
builder.Services.AddBusPublish();
builder.Services.AddBusConsume();
var app = builder.Build();
app.MapGet("/ops/summary", async (IBusDashboardDataService d) => await d.GetSummaryAsync());
app.MapGet("/ops/consumers", async (IBusDashboardDataService d) => await d.GetConsumerHealthAsync());
app.MapGet("/ops/queues", async (IBusDashboardDataService d) => await d.GetQueueDetailsAsync());
app.MapGet("/ops/retries", async (IBusDashboardDataService d) => await d.GetRetryAnalysisAsync());
app.MapGet("/ops/dead-letters", async (IBusDashboardDataService d) => await d.GetDeadLetterSummaryAsync());
app.MapGet("/ops/alerts", async (IBusDashboardDataService d) => await d.GetAlertsAsync());
app.MapGet("/ops/events", (IBusDashboardDataService d,
string? consumer, string? messageType,
string? eventName, int limit = 100) =>
d.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
EventName: eventName, Limit: limit)));
app.Run();
Querying the operational event store directly
public class MyOpsService
{
private readonly IOperationalEventStore _store;
public MyOpsService(IOperationalEventStore store) => _store = store;
public IReadOnlyList<IOperationalEvent> GetRecentFailures(int limit = 50) =>
_store.GetRecent(new OperationalEventFilter(
EventName: "MessageProcessingFailed",
Limit: limit));
public long TotalEventsSinceStartup => _store.TotalReceived;
}
Pattern matching on event records
All events are strongly typed C# records inheriting from OperationalEventBase. Use pattern matching to extract type-specific fields:
foreach (var evt in _store.GetRecent())
{
switch (evt)
{
case MessageProcessingFailed f:
Console.WriteLine($"[FAIL] {f.ConsumerName} — {f.ExceptionType}: {f.ExceptionMessage}");
break;
case MessageProcessingCompleted c:
Console.WriteLine($"[OK] {c.ConsumerName} in {c.ProcessingDurationMs:F1} ms");
break;
case MessageRetryScheduled r:
Console.WriteLine($"[RETRY] {r.ConsumerName} attempt {r.RetryCount}, {r.RemainingRetryCount} remaining");
break;
case MessageMovedToDeadLetter dl:
Console.WriteLine($"[DLQ] {dl.ConsumerName} → {dl.DeadLetterRoutingKey}");
break;
case QueueBackpressureDetected bp:
Console.WriteLine($"[PRESSURE] {bp.QueueName} depth={bp.QueueDepth} threshold={bp.Threshold}");
break;
case ConsumerConnected cc:
Console.WriteLine($"[CONNECT] {cc.ConsumerName} tag={cc.ConsumerTag}");
break;
case ConsumerDisconnected cd:
Console.WriteLine($"[DISCONN] {cd.ConsumerName} reason={cd.Reason}");
break;
case PublishFailed pf:
Console.WriteLine($"[PUB FAIL] {pf.MessageType} — {pf.ExceptionType}");
break;
}
}
Replacing or extending the in-memory event store
Register IOperationalEventBatchSink to stream events alongside the built-in ring buffer:
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
=> await _esClient.BulkAsync(events, cancellationToken);
}
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
services.AddBus(...); // in-memory store and your sink both active
To replace query access with your own persistent store:
public class ClickHouseEventStore : IOperationalEventStore, IOperationalEventBatchSink
{
public long TotalReceived => /* query row count */;
public IReadOnlyList<IOperationalEvent> GetRecent(OperationalEventFilter? filter = null)
=> /* translate filter → SQL → query */;
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken ct = default)
=> await _clickHouse.BulkInsertAsync(events, ct);
}
// Register BEFORE AddBus() — TryAddSingleton means yours wins
services.AddSingleton<ClickHouseEventStore>();
services.AddSingleton<IOperationalEventStore>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddSingleton<IOperationalEventBatchSink>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddBus(...);
Health probe using ConsumerHealthView.HealthStatus
builder.Services.AddHealthChecks()
.AddAsyncCheck("bus-consumers", async (IBusDashboardDataService dash, ct) =>
{
var consumers = await dash.GetConsumerHealthAsync(ct);
var disconnected = consumers.Where(c => c.TotalNodes == 0).ToList();
var critical = consumers.Where(c => c.HealthStatus == AlertSeverity.Critical).ToList();
if (disconnected.Any())
return HealthCheckResult.Unhealthy(
$"{disconnected.Count} consumer(s) disconnected: " +
string.Join(", ", disconnected.Select(c => c.Name)));
if (critical.Any())
return HealthCheckResult.Degraded($"{critical.Count} consumer(s) in critical state.");
return HealthCheckResult.Healthy($"{consumers.Length} consumer(s) all healthy.");
});
Operations Viewer Dashboard
SimplyWorks.Bus.RabbitMqViewer adds a server-rendered operations dashboard built with Pico.css + HTMX. No JavaScript framework required. All tables auto-refresh via HTMX polling with CSS animations.
dotnet add package SimplyWorks.Bus.RabbitMqViewer
Pages
| Route | Content |
|---|---|
/bus-viewer |
Overview — summary cards, consumer health table, active alerts, live event feed |
/bus-viewer/consumers |
Consumer Health — full grid with status badges, rates, prefetch, priority |
/bus-viewer/queues |
Queue Details — main + retry + dead-letter depths and rates per queue |
/bus-viewer/retries |
Retry Analysis — consumers with retry backlogs ordered by severity |
/bus-viewer/dead-letters |
Dead-Letter Inspection — counts, last exception, last failure timestamp |
/bus-viewer/events |
Live Events — filterable operational event stream with inline exception details |
/bus-viewer/login |
Login page — form-based credential entry (no browser credential caching) |
/bus-viewer/logout |
Logout — clears session cookie and redirects to login |
UI features
- 🌗 Dark / Light mode toggle — persisted in
localStorage; theme restored before first paint to prevent flash - ⚠️ Critical status tooltips — hovering a "⚠ Critical" badge shows a bullet list of exactly why the consumer is critical (disconnected nodes, dead-letter count, retry count, backpressure, publish/ack imbalance)
- ✨ Live refresh animations — a blue-purple shimmer bar sweeps the top edge of each table zone while loading; each
<tbody>row slides up and fades in with a 60 ms stagger on data arrival - 🔴 Pulsing live-data dot — a green dot next to "Updated HH:mm:ss" confirms live polling is active
- Data auto-refreshes: consumer/queue/retry tables every 10 s, events every 5 s, dead-letters every 15 s
Registration
// Program.cs / Startup.ConfigureServices — call after AddBus()
builder.Services.AddBusViewer(o => o.UseBasicAuth());
// Middleware pipeline
app.UseStaticFiles(); // serves /_content/SimplyWorks.Bus.RabbitMqViewer/bus-viewer.css
app.UseRouting();
app.UseAuthentication(); // required for RequirePolicy() mode
app.UseAuthorization();
app.UseBusViewer();
app.MapRazorPages(); // discovers BusViewer area automatically
Note:
UseBusViewer()can be placed at any position in the pipeline. WhenUseBasicAuth()is active, anIStartupFilterautomatically inserts the viewer's authentication gate at the very beginning of the pipeline — beforeUseAuthorization— so 403 errors from the host application's authorization policies can never block the viewer.
Authentication
Mode 1 — Form-based login (UseBasicAuth())
Credentials are validated against IConfiguration at request time (supports secret rotation without restart). On success a short-lived session cookie is issued:
- Cookie
.BusViewerAuth, path-scoped to/bus-viewer,HttpOnly,SameSite=Strict - Session-only: deleted when the browser closes, 8-hour absolute maximum
- No browser credential caching (unlike HTTP Basic auth browser dialogs)
- Logout at
/bus-viewer/logoutclears the cookie immediately
builder.Services.AddBusViewer(o => o.UseBasicAuth(
usernameConfigKey: "BusViewer:Username", // default key
passwordConfigKey: "BusViewer:Password")); // default key
{
"BusViewer": { "Username": "ops", "Password": "super-secret" }
}
Environment variable equivalents: BusViewer__Username, BusViewer__Password
Mode 2 — Decoupled / policy (recommended for production)
Delegates entirely to the host's authorization pipeline. Any scheme works (JWT, cookie, OIDC, Windows Auth):
builder.Services.AddAuthorization(o =>
o.AddPolicy("OpsOnly", p => p.RequireRole("ops")));
builder.Services.AddBusViewer(o => o.RequirePolicy("OpsOnly"));
Mode 3 — Anonymous (development only)
Throws InvalidOperationException at startup when IHostEnvironment.IsProduction() is true unless explicitly suppressed:
builder.Services.AddBusViewer(o =>
{
o.AllowAnonymous();
o.AllowAnonymousInProduction = true; // only if behind proxy/network policy
});
Viewer options reference
| Option | Default | Description |
|---|---|---|
Title |
"SW.Bus Operations" |
Header title displayed in the sidebar |
AuthMode |
None |
Set via RequirePolicy(), UseBasicAuth(), or AllowAnonymous() |
PolicyName |
null |
Policy name used when AuthMode = Policy |
UsernameConfigKey |
"BusViewer:Username" |
Config key for the login username |
PasswordConfigKey |
"BusViewer:Password" |
Config key for the login password |
AllowAnonymousInProduction |
false |
Suppress production guard for anonymous mode |
Full BusOptions Reference
services.AddBus(config =>
{
// ── Identity ──────────────────────────────────────────────────────────
config.ApplicationName = "OrderService";
// ── Connection ────────────────────────────────────────────────────────
config.HeartBeatTimeOut = 60; // heartbeat seconds (0 = off)
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
// ── Queue defaults ────────────────────────────────────────────────────
config.DefaultQueuePrefetch = 4; // QoS prefetch per consumer
config.DefaultRetryCount = 5; // retries before dead-letter
config.DefaultRetryAfter = 60; // seconds between retries
config.DefaultMaxPriority = 0; // 0 = priority queues disabled
// ── Per-queue overrides ───────────────────────────────────────────────
config.AddQueueOption("OrderConsumer.OrderCreated",
prefetch: 10, retryCount: 3, retryAfterSeconds: 30, priority: 5);
// ── Broadcast listeners ───────────────────────────────────────────────
config.ListenRetryCount = 5;
config.ListenRetryAfter = 60;
// ── Monitoring ────────────────────────────────────────────────────────
config.MonitoringCacheSeconds = 5; // cache management API responses (3–60)
// ── JWT context propagation ───────────────────────────────────────────
config.Token.Key = "your-secret-key";
config.Token.Issuer = "your-issuer";
config.Token.Audience = "your-audience";
// ── Operational events pipeline ───────────────────────────────────────
config.OperationalEventsEnabled = true;
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when buffer full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
// ── Alert thresholds ─────────────────────────────────────────────────
config.QueueBackpressureThreshold = 5000; // queue depth → backpressure event
config.AlertRetryWarningThreshold = 10; // retry count → Warning alert
config.AlertRetryCriticalThreshold = 100; // retry count → Critical alert
config.AlertDeadLetterCriticalThreshold = 100; // DL count → Critical alert
});
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ Process Exchange (direct) ──► Consumer Queue │
│ │ on failure │
│ ▼ │
│ Dead-Letter Exchange ──► Retry Queue ──► (TTL) ──► Consumer Q │
│ └──► Bad Queue (exhausted retries) │
│ │
│ Node Exchange (direct) ──► Per-Instance Queue (broadcasts) │
│ │
│ Delay Exchange (direct) ──────────────────────► Consumer Queue │
│ ├── [plugin path] Delay Plugin Exchange (x-delayed-message) │
│ └── [TTL fallback] Delay Entry Exchange (fanout) │
│ └──► Delay Bucket Queue (TTL) │
│ │ on expiry │
│ └──► Delay Exchange ─────► │
└─────────────────────────────────────────────────────────────────┘
▲ │
│ publish │ consume
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ SimplyWorks.Bus Runtime │
│ BasicPublisher ─── ActivitySource ─── OperationalEventPublisher│
│ ConsumerRunner ─── ActivitySource ─── OperationalEventPublisher│
│ ConsumersService ──────────────────── OperationalEventPublisher│
│ │
│ OperationalEventDispatcher (BackgroundService) │
│ ├── InMemoryOperationalEventStore (ring buffer) │
│ └── [custom IOperationalEventBatchSink sinks] │
│ │
│ BusDashboardDataService ── IConsumerReader (+ cache) │
│ ── IOperationalEventStore │
│ ── IAlertEvaluator │
└─────────────────────────────────────────────────────────────────┘
│
▼ (optional)
┌─────────────────────────────────────────────────────────────────┐
│ SimplyWorks.Bus.RabbitMqViewer │
│ /bus-viewer Overview dashboard │
│ /bus-viewer/consumers Consumer health grid │
│ /bus-viewer/queues Queue depths & rates │
│ /bus-viewer/retries Retry analysis │
│ /bus-viewer/dead-letters Dead-letter inspection │
│ /bus-viewer/events Live operational event stream │
└─────────────────────────────────────────────────────────────────┘
Queue naming convention:
{env}.{app}.{ConsumerClass}.{MessageType}
{env}.{app}.{ConsumerClass}.{MessageType}.retry
{env}.{app}.{ConsumerClass}.{MessageType}.bad
Example with ApplicationName = "OrderService" in Development:
v3.development.orderservice.ordercreatedconsumer.ordercreated
v3.development.orderservice.ordercreatedconsumer.ordercreated.retry
v3.development.orderservice.ordercreatedconsumer.ordercreated.bad
Testing
Unit tests
// Replace IPublish with a no-op — no RabbitMQ connection required
services.AddBusPublishMock();
// IBusDashboardDataService, IConsumerReader, IErrorQueueReader are standard
// interfaces — mock them with any mocking library
Integration tests
The SW.Bus.IntegrationTests project spins up real RabbitMQ containers via Testcontainers and tests the full delayed-publishing path against both broker variants. Docker must be running.
dotnet test SW.Bus.IntegrationTests/SW.Bus.IntegrationTests.csproj
What is tested
PluginAvailableTests — starts a container from heidiks/rabbitmq-delayed-message-exchange:3.13.0-management (the plugin is pre-enabled). Asserts that:
BusOptions.DelayedPluginAvailableistrue(plugin was detected at startup).- A message published with a 5-second delay is not delivered before the delay elapses.
- The message is delivered after the delay, routed via the
x-delayed-messageexchange.
PluginUnavailableTests — starts a container from stock rabbitmq:3.13-management (no plugin). Asserts that:
BusOptions.DelayedPluginAvailableisfalse.- A message published with a 5-second delay is not delivered before the delay elapses.
- The message is delivered after the delay, routed via the TTL-bucket fallback.
Both test classes share the same scenario logic in DelayedDeliveryScenario.RunAsync, which boots a full generic host (AddBus + AddBusConsume + AddBusPublish), waits for consumer topology to bind, publishes a delayed message, checks it has not arrived early, then polls until it arrives or a 25-second deadline passes.
Structure
| File | Role |
|---|---|
BusHarness |
Boots a real IHost against a container's connection string; waits for ConsumersService to declare topology |
DelayedConsumer |
IConsume<DelayDto> — target consumer that records receive timestamps in MessageSink |
MessageSink |
Singleton ConcurrentDictionary of message id → received-at timestamp |
DelayedDeliveryScenario |
Shared assertions used by both test classes |
PluginAvailableTests |
Container with the delayed-message plugin |
PluginUnavailableTests |
Stock container, exercises TTL buckets |
Dependencies
| Package | Version | Used for |
|---|---|---|
RabbitMQ.Client |
6.8.1 | AMQP connection and channel management |
EasyNetQ.Management.Client |
3.0.1 | RabbitMQ Management API calls |
Scrutor |
4.2.2 | Assembly scanning for consumers |
SimplyWorks.HttpExtensions |
8.1.1 | JWT and request context propagation |
SimplyWorks.PrimitiveTypes |
8.1.3 | RequestContext, IConsume<T>, etc. |
Contributing
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Submit a pull request
License
MIT — see LICENSE
Support
- Issues: github.com/simplify9/SW-Bus/issues
- Sample app:
SW.Bus.SampleWebproject in this repository
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- EasyNetQ.Management.Client (>= 3.0.1)
- RabbitMQ.Client (>= 6.8.1)
- Scrutor (>= 4.2.2)
- SimplyWorks.Bus.RabbitMqExtensions (>= 8.1.12)
- SimplyWorks.HttpExtensions (>= 8.1.1)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on SimplyWorks.Bus:
| Package | Downloads |
|---|---|
|
ChequeScore.PrimitiveTypes.EFUtils
ChequeScore EF Utils |
|
|
SimplyWorks.Bus.RabbitMqViewer
Built-in HTMX operations dashboard for SimplyWorks.Bus. Mounts a server-rendered dark-mode admin UI (Pico.css + HTMX) at a configurable route with consumer health, queue metrics, retry analysis, dead-letter inspection, and live operational events — zero JavaScript framework required. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 8.1.12 | 111 | 6/15/2026 |
| 8.1.11 | 247 | 5/17/2026 |
| 8.1.10 | 117 | 5/17/2026 |
| 8.1.9 | 128 | 5/11/2026 |
| 8.1.8 | 196 | 4/9/2026 |
| 8.1.7 | 950 | 1/25/2026 |
| 8.1.6 | 148 | 1/25/2026 |
| 8.1.5 | 233 | 1/20/2026 |
| 8.1.4 | 200 | 1/12/2026 |
| 8.1.3 | 167 | 1/8/2026 |
| 8.1.2 | 145 | 1/4/2026 |
| 8.1.1 | 168 | 12/31/2025 |
| 8.1.0 | 439 | 9/16/2025 |
| 8.0.11 | 3,206 | 1/27/2025 |
| 8.0.10 | 696 | 11/7/2024 |
| 8.0.9 | 262 | 10/20/2024 |
| 8.0.4 | 550 | 6/27/2024 |
| 6.0.24 | 2,795 | 12/16/2024 |
| 6.0.23 | 2,851 | 11/7/2024 |
| 6.0.22 | 906 | 10/17/2024 |
See https://github.com/simplify9/SW-Bus/releases for release notes and changelog.