Indiko.Blocks.EventBus.RabbitMQ 2.1.2

dotnet add package Indiko.Blocks.EventBus.RabbitMQ --version 2.1.2
                    
NuGet\Install-Package Indiko.Blocks.EventBus.RabbitMQ -Version 2.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.2" />
                    
Directory.Packages.props
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Indiko.Blocks.EventBus.RabbitMQ --version 2.1.2
                    
#r "nuget: Indiko.Blocks.EventBus.RabbitMQ, 2.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Indiko.Blocks.EventBus.RabbitMQ@2.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.2
                    
Install as a Cake Addin
#tool nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.2
                    
Install as a Cake Tool

Indiko.Blocks.EventBus.RabbitMQ

RabbitMQ-based distributed event bus implementation for microservices and scalable event-driven architectures.

Overview

This package provides a production-ready RabbitMQ implementation of the event bus abstractions, enabling reliable message delivery across distributed systems with features like persistence, acknowledgments, and automatic reconnection.

Features

  • RabbitMQ Integration: Full RabbitMQ messaging support via EasyNetQ
  • Distributed Events: Publish-subscribe across multiple services
  • Durable Messages: Persistent message storage
  • Automatic Reconnection: Handles connection failures gracefully
  • Queue Management: Automatic queue creation and routing
  • Message Acknowledgment: Reliable message delivery
  • Dead Letter Queues: Failed message handling
  • Multiple Consumers: Scale horizontally with competing consumers
  • Auto-Discovery: Automatic handler registration from DI container

Installation

dotnet add package Indiko.Blocks.EventBus.RabbitMQ

Prerequisites

  • RabbitMQ server (3.8+)
  • Management plugin enabled (optional, for monitoring)

Quick Start

Install RabbitMQ

Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
Local Installation

Configure Services

using Indiko.Blocks.EventBus.RabbitMQ;

public class Startup : WebStartup
{
    public override void ConfigureServices(IServiceCollection services)
    {
        base.ConfigureServices(services);
        
        // Configure RabbitMQ event bus
        services.AddRabbitMQEventBus(options =>
        {
            options.Host = Configuration["RabbitMQ:Host"];
            options.Port = Configuration.GetValue<int>("RabbitMQ:Port");
            options.Username = Configuration["RabbitMQ:Username"];
            options.Password = Configuration["RabbitMQ:Password"];
            options.VirtualHost = Configuration["RabbitMQ:VirtualHost"];
            options.ReConnectOnConnectionLost = true;
        });
        
        // Register event handlers
        services.AddScoped<IEventHandler<UserCreatedEvent>, SendWelcomeEmailHandler>();
        services.AddScoped<IEventHandler<OrderPlacedEvent>, ProcessOrderHandler>();
    }
}

Configuration (appsettings.json)

{
  "RabbitMQ": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "PrefetchCount": 10,
    "Timeout": 30,
    "ReConnectOnConnectionLost": true,
    "Product": "MyApplication",
    "Platform": ".NET 10"
  }
}

Define Events

using Indiko.Blocks.EventBus.Abstractions.Interfaces;

public class UserCreatedEvent : IEvent
{
    public Guid UserId { get; set; }
    public string Email { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class OrderPlacedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public Guid UserId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime OrderDate { get; set; }
}

Implement Handlers

Service A - User Service

public class UserService
{
    private readonly IEventBus _eventBus;
    private readonly IUserRepository _userRepository;

    public async Task<User> CreateUserAsync(CreateUserDto dto)
    {
        var user = new User
        {
            Id = Guid.NewGuid(),
            Email = dto.Email,
            FirstName = dto.FirstName,
            LastName = dto.LastName,
            CreatedAt = DateTime.UtcNow
        };
        
        await _userRepository.AddAsync(user);
        
        // Publish event to RabbitMQ
        await _eventBus.PublishAsync(new UserCreatedEvent
        {
            UserId = user.Id,
            Email = user.Email,
            FirstName = user.FirstName,
            LastName = user.LastName,
            CreatedAt = user.CreatedAt
        });
        
        return user;
    }
}

Service B - Email Service

public class SendWelcomeEmailHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<SendWelcomeEmailHandler> _logger;

    public SendWelcomeEmailHandler(IEmailService emailService, ILogger<SendWelcomeEmailHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"Sending welcome email to {event.Email}");
        
        try
        {
            await _emailService.SendWelcomeEmailAsync(
                @event.Email,
                @event.FirstName,
                cancellationToken
            );
            
            _logger.LogInformation($"Welcome email sent to {event.Email}");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Failed to send welcome email to {event.Email}");
            throw; // RabbitMQ will retry or move to dead letter queue
        }
    }
}

Service C - Analytics Service

public class TrackUserRegistrationHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IAnalyticsService _analyticsService;

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await _analyticsService.TrackEventAsync(new AnalyticsEvent
        {
            EventType = "UserRegistration",
            UserId = @event.UserId,
            Timestamp = @event.CreatedAt,
            Properties = new Dictionary<string, object>
            {
                { "email_domain", @event.Email.Split('@')[1] }
            }
        });
    }
}

How It Works

Automatic Handler Registration

The RabbitMQ event bus automatically discovers and registers all handlers from the DI container:

public void AddAllHandlersFromServiceProvider(IServiceProvider serviceProvider)
{
    // Scans all assemblies for IEventHandler<TEvent> implementations
    var eventHandlerTypes = AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(a => a.GetTypes())
        .Where(t => t.GetInterfaces()
            .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IEventHandler<>)));

    // Registers each handler with RabbitMQ
    foreach (var handlerType in eventHandlerTypes)
    {
        var handler = serviceProvider.GetService(handlerType);
        if (handler != null)
        {
            RegisterEventHandler(handler);
        }
    }
}

Queue Naming Convention

Each event type gets its own queue:

UserCreatedEvent ? UserCreatedEvent_Queue
OrderPlacedEvent ? OrderPlacedEvent_Queue
PaymentProcessedEvent ? PaymentProcessedEvent_Queue

Message Flow

Publisher (Service A)
    ?
[RabbitMQ Exchange]
    ?
[UserCreatedEvent_Queue]
    ?
Consumers (Services B, C, D...)

Connection Management

Auto-Reconnection

private void Advanced_Disconnected(object sender, DisconnectedEventArgs e)
{
    _logger.LogWarning($"Disconnected from RabbitMQ: {e.Reason}");

    if (_options.ReConnectOnConnectionLost)
    {
        _logger.LogInformation("Reconnecting to RabbitMQ...");
        UnRegisterAllEventHandlers();
        AddAllHandlersFromServiceProvider(_serviceProvider);
        _logger.LogInformation("Reconnected to RabbitMQ");
    }
}

Connection Events

_bus.Advanced.Connected += Advanced_Connected;
_bus.Advanced.Disconnected += Advanced_Disconnected;
_bus.Advanced.MessageReturned += Advanced_MessageReturned;

Advanced Configurations

Multiple RabbitMQ Instances

{
  "RabbitMQ": {
    "Host": "rabbitmq-cluster.example.com",
    "Port": 5672,
    "Username": "app-user",
    "Password": "secure-password",
    "VirtualHost": "/production",
    "Timeout": 60,
    "PrefetchCount": 20
  }
}

Connection String Builder

var connectionString = RabbitConnectionStringBuilder
    .Init(options)
    .Build();

// Produces: host=localhost:5672;virtualHost=/;username=guest;password=guest

Manual Handler Registration

public void Configure(IApplicationBuilder app, IEventBus eventBus, IServiceProvider services)
{
    // Manual registration if needed
    var handler = services.GetRequiredService<IEventHandler<UserCreatedEvent>>();
    eventBus.RegisterEventHandler(handler);
}

Error Handling and Retries

Automatic Retries

RabbitMQ automatically retries failed messages based on configuration:

public class ResilientEventHandler : IEventHandler<OrderPlacedEvent>
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessOrderAsync(@event);
        }
        catch (TransientException ex)
        {
            // Throw to trigger RabbitMQ retry
            _logger.LogWarning($"Transient error, will retry: {ex.Message}");
            throw;
        }
        catch (PermanentException ex)
        {
            // Log and return (don't throw) to acknowledge message
            _logger.LogError($"Permanent error, message will be dropped: {ex.Message}");
            // Message is acknowledged and won't be retried
        }
    }
}

Dead Letter Queue

Configure dead letter queues for failed messages:

services.AddRabbitMQEventBus(options =>
{
    options.Host = "localhost";
    options.DeadLetterExchange = "dlx";
    options.DeadLetterQueue = "failed_messages";
});

Scaling

Horizontal Scaling

Run multiple instances of your service - RabbitMQ distributes messages:

Service Instance 1 ??
Service Instance 2 ???? [UserCreatedEvent_Queue] ?? RabbitMQ
Service Instance 3 ??

Each message is delivered to only ONE instance (competing consumers).

Message Prefetch

Control how many messages each consumer processes at once:

{
  "RabbitMQ": {
    "PrefetchCount": 10
  }
}

Monitoring

RabbitMQ Management UI

Access at: http://localhost:15672

  • Default credentials: guest/guest
  • View queues, exchanges, message rates
  • Monitor consumer connections

Application Logging

_logger.LogInformation("Connected to RabbitMQ at {host}:{port}", options.Host, options.Port);
_logger.LogDebug("Registered event handler {handler} for {event}", handlerType, eventType);
_logger.LogWarning("Disconnected from RabbitMQ: {reason}", disconnectReason);

Best Practices

  1. Idempotent Handlers: Design handlers to be idempotent (handle duplicates)
  2. Event Versioning: Plan for event schema evolution
  3. Correlation IDs: Include correlation IDs for tracing
  4. Small Events: Keep event payloads small
  5. Error Handling: Distinguish transient vs permanent errors
  6. Connection Pooling: Reuse connections
  7. Monitoring: Set up alerts for queue depths

Deployment

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

  user-service:
    build: ./UserService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

  email-service:
    build: ./EmailService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

volumes:
  rabbitmq-data:

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        env:
        - name: RabbitMQ__Host
          value: "rabbitmq-service"
        - name: RabbitMQ__Username
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RabbitMQ__Password
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password

Migration from InMemory

Simply change the registration:

// From
services.AddInMemoryEventBus();

// To
services.AddRabbitMQEventBus(Configuration);

All events and handlers work without modification!

Target Framework

  • .NET 10

Dependencies

  • Indiko.Blocks.EventBus.Abstractions
  • EasyNetQ (8.0+)
  • RabbitMQ.Client (6.0+)

License

See LICENSE file in the repository root.

  • Indiko.Blocks.EventBus.Abstractions - Core event bus abstractions
  • Indiko.Blocks.EventBus.InMemory - In-memory event bus for development
  • Indiko.Blocks.Mediation.Abstractions - CQRS and mediator pattern

Resources

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.2 272 12/18/2025
2.1.1 669 12/2/2025
2.1.0 679 12/2/2025
2.0.0 328 9/17/2025
1.7.23 188 9/8/2025
1.7.22 183 9/8/2025
1.7.21 199 8/14/2025
1.7.20 207 6/23/2025
1.7.19 203 6/3/2025
1.7.18 203 5/29/2025
1.7.17 199 5/26/2025
1.7.15 153 4/12/2025
1.7.14 164 4/11/2025
1.7.13 164 3/29/2025
1.7.12 181 3/28/2025
1.7.11 185 3/28/2025
1.7.10 199 3/28/2025
1.7.9 185 3/28/2025
1.7.8 188 3/28/2025
1.7.5 212 3/17/2025
1.7.4 204 3/16/2025
1.7.3 177 3/16/2025
1.7.2 213 3/16/2025
1.7.1 234 3/11/2025
1.6.8 233 3/11/2025
1.6.7 290 3/4/2025
1.6.6 157 2/26/2025
1.6.5 190 2/20/2025
1.6.4 180 2/20/2025
1.6.3 175 2/5/2025
1.6.2 186 1/24/2025
1.6.1 189 1/24/2025
1.6.0 145 1/16/2025
1.5.2 145 1/16/2025
1.5.1 194 11/3/2024
1.5.0 161 10/26/2024
1.3.2 184 10/24/2024
1.3.0 180 10/10/2024
1.2.5 207 10/9/2024
1.2.4 179 10/8/2024
1.2.1 165 10/3/2024
1.2.0 161 9/29/2024
1.1.1 171 9/23/2024
1.1.0 198 9/18/2024
1.0.33 211 9/15/2024
1.0.28 176 8/28/2024
1.0.27 177 8/24/2024
1.0.26 167 7/7/2024
1.0.25 177 7/6/2024
1.0.24 164 6/25/2024
1.0.23 170 6/1/2024
1.0.22 193 5/14/2024
1.0.21 158 5/14/2024
1.0.20 186 4/8/2024
1.0.19 180 4/3/2024
1.0.18 169 3/23/2024
1.0.17 200 3/19/2024
1.0.16 205 3/19/2024
1.0.15 168 3/11/2024
1.0.14 182 3/10/2024
1.0.13 184 3/6/2024
1.0.12 206 3/1/2024
1.0.11 214 3/1/2024
1.0.10 198 3/1/2024
1.0.9 188 3/1/2024
1.0.8 172 2/19/2024
1.0.7 179 2/17/2024
1.0.6 175 2/17/2024
1.0.5 186 2/17/2024
1.0.4 186 2/7/2024
1.0.3 164 2/6/2024
1.0.1 197 2/6/2024
1.0.0 236 1/9/2024
1.0.0-preview99 175 12/22/2023
1.0.0-preview98 157 12/21/2023
1.0.0-preview97 139 12/21/2023
1.0.0-preview96 166 12/20/2023
1.0.0-preview95 171 12/20/2023
1.0.0-preview94 156 12/18/2023
1.0.0-preview93 290 12/13/2023
1.0.0-preview92 156 12/13/2023
1.0.0-preview91 226 12/12/2023
1.0.0-preview90 142 12/11/2023
1.0.0-preview89 148 12/11/2023
1.0.0-preview88 249 12/6/2023
1.0.0-preview87 198 12/6/2023
1.0.0-preview86 181 12/6/2023
1.0.0-preview85 181 12/6/2023
1.0.0-preview84 169 12/5/2023
1.0.0-preview83 219 12/5/2023
1.0.0-preview82 179 12/5/2023
1.0.0-preview81 166 12/4/2023
1.0.0-preview80 167 12/1/2023
1.0.0-preview77 156 12/1/2023
1.0.0-preview76 170 12/1/2023
1.0.0-preview75 148 12/1/2023
1.0.0-preview74 188 11/26/2023
1.0.0-preview73 176 11/7/2023
1.0.0-preview72 160 11/6/2023
1.0.0-preview71 178 11/3/2023
1.0.0-preview70 164 11/2/2023
1.0.0-preview69 158 11/2/2023
1.0.0-preview68 174 11/2/2023
1.0.0-preview67 147 11/2/2023
1.0.0-preview66 151 11/2/2023
1.0.0-preview65 163 11/2/2023
1.0.0-preview64 187 11/2/2023
1.0.0-preview63 158 11/2/2023
1.0.0-preview62 143 11/1/2023
1.0.0-preview61 161 11/1/2023
1.0.0-preview60 151 11/1/2023
1.0.0-preview59 183 11/1/2023
1.0.0-preview58 172 10/31/2023
1.0.0-preview57 155 10/31/2023
1.0.0-preview56 174 10/31/2023
1.0.0-preview55 157 10/31/2023
1.0.0-preview54 148 10/31/2023
1.0.0-preview53 150 10/31/2023
1.0.0-preview52 155 10/31/2023
1.0.0-preview51 154 10/31/2023
1.0.0-preview50 161 10/31/2023
1.0.0-preview48 153 10/31/2023
1.0.0-preview46 143 10/31/2023
1.0.0-preview45 162 10/31/2023
1.0.0-preview44 151 10/31/2023
1.0.0-preview43 173 10/31/2023
1.0.0-preview42 185 10/30/2023
1.0.0-preview41 170 10/30/2023
1.0.0-preview40 160 10/27/2023
1.0.0-preview39 189 10/27/2023
1.0.0-preview38 157 10/27/2023
1.0.0-preview37 182 10/27/2023
1.0.0-preview36 138 10/27/2023
1.0.0-preview35 156 10/27/2023
1.0.0-preview34 149 10/27/2023
1.0.0-preview33 167 10/26/2023
1.0.0-preview32 175 10/26/2023
1.0.0-preview31 175 10/26/2023
1.0.0-preview30 171 10/26/2023
1.0.0-preview29 155 10/26/2023
1.0.0-preview28 172 10/26/2023
1.0.0-preview27 165 10/26/2023
1.0.0-preview26 186 10/25/2023
1.0.0-preview25 188 10/23/2023
1.0.0-preview24 161 10/23/2023
1.0.0-preview23 158 10/23/2023
1.0.0-preview22 165 10/23/2023
1.0.0-preview21 158 10/23/2023
1.0.0-preview20 163 10/20/2023
1.0.0-preview19 181 10/19/2023
1.0.0-preview18 163 10/18/2023
1.0.0-preview16 169 10/11/2023
1.0.0-preview14 195 10/10/2023
1.0.0-preview13 172 10/10/2023
1.0.0-preview12 152 10/9/2023
1.0.0-preview101 162 1/5/2024