StreamFlow 10.0.0

dotnet add package StreamFlow --version 10.0.0                
NuGet\Install-Package StreamFlow -Version 10.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="StreamFlow" Version="10.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add StreamFlow --version 10.0.0                
#r "nuget: StreamFlow, 10.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install StreamFlow as a Cake Addin
#addin nuget:?package=StreamFlow&version=10.0.0

// Install StreamFlow as a Cake Tool
#tool nuget:?package=StreamFlow&version=10.0.0                

StreamFlow - Just another RabbitMQ handler

Justification

I know there are multiple cool RabbitMQ wrappers (MassTransit, EasyNetQ and many more) in the wild which does the job much better then this library. However I was never been able to use them as is without overriding multiple methods or classes. So I decided to make my own wrapper with my own needs.

Install via NuGet

If you want to include the HTTP sink in your project, you can install it directly from NuGet.

To install the package, run the following command in the Package Manager Console:

PM> Install-Package StreamFlow.RabbitMq

Concepts

All names in RabbitMQ server can be defined by implementing IRabbitMqConventions and registering in dependency injection. However default behavior works like this:

  • exchange name is created using request class name
  • queue name is created using <request class name>:<request handler name>[:<service id>][:<consumer group name>]
  • error queue name is created using <request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error
A consumer group is a group of consumers that share the same group id. 
When a topic is consumed by consumers in the same group, every record 
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the 
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have 
multiple services like customers and orders - they will have different 
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders". 

The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.

It can be also treated as a consumer group but in this case it applies to whole service.

Component Lifetimes

Connection

Connection object is registered as singleton. However there is a separation between consumer and publisher connections. It means that single service instance can have maximum two connection to RabbitMQ server.

IPublisher

Uses singleton life time. Internally it uses Channel collection with bounded size of 100_000 items. If publications in the code are happening way faster then this collection is processed - you might receive a RabbitMqPublisherException with Reason: InternalQueueIsFull.

Under the hood publisher uses asynchronous publisher acknowledgments. Which means every PublishAsync actually waits (asynchronously) for server confirmation. See for more detailed explanation in RabbitMQ documentation: https://www.rabbitmq.com/confirms.html#publisher-confirms Here you can also see more detailed examples of the behavior: https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html For a summary: if you await each PublishAsync method - you actually wait confirmation for each publication as in this example:

public async Task PublishMessagesAsync<T>(T [] messages)
{
    foreach (var message in messages)
    {
      try
      {
          await _publisher.PublishAsync(message);
      }
      catch (Exception e)
      {
          // handle publication errors
      }
    }
}

Example above might be relatively slow - however it is very simple to implement. In order to improve publishing performance you can do similar approach like in this example:

public async Task PublishMessagesAsync<T>(T [] messages)
{
    var tasks = new Task[messages.Length];
    for (var i = 0; i < messages.Length; ++i)
    {
        var message = messages[i];
        tasks[i] = _publisher.PublishAsync(message);
    }
    
    foreach (var task in tasks)
    {
        try
        {
            await task;
        }
        catch (Exception e)
        {
            // handle publication errors
        }
    }
}

NOTE: if you do not specify cancellation token (or specify default), or no timeout is specified in publish options - library automatically adds 60 seconds timeout. If application do not receive response in this time - message is marked as cancelled.

Middleware

Each middleware is added as transient. Every middleware will be created on each received or published message.

Consumers

Every consumer is singleton. However for each received message own scope is created and disposed at the end of processing.

Handlers

Handlers are registered as scoped instances.

Super simple to use

Define a message:

public class PingRequest
{
    public DateTime Timestamp { get; set; }
}

Publish message:

public class PingController
{
    private readonly IPublisher _publisher;
    
    public PingController(IPublisher publisher)
    {
        _publisher = publisher;
    }
    
    public async Task SendPing()
    {
        await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
    }
}

Define message consumer:

public class PingRequestConsumer : IConsumer<PingRequest>
{
    public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
    {
        Console.WriteLine(message.Body.Timestamp);
        return Task.CompletedTask;
    }
}

Configure using ASP.NET Core:

services.AddStreamFlow(transport =>
{
    transport
        .UsingRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .StartConsumerHostedService()
        )
        .Consumers(builder => builder
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr1"))
            .Add<PingRequest, PingRequestConsumer>(options => options
                .ConsumerCount(5)
                .ConsumerGroup("gr2"))
        )
        .ConfigureConsumerPipe(builder => builder
            .Use<LogAppIdMiddleware>()
        )
        .ConfigurePublisherPipe(builder => builder
            .Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
        );
});

The code above registers stream flow classes, configured RabbitMQ connection and instructs to start ASP.NET Core hosted service which starts configured consumers. It configures 2 consumer groupds launching 5 instances for each group.

ConfigureConsumerPipe and ConfigurePublisherPipe registers middleware-like actions which are executed when message is received or when published respectively. Code of these middlewares:

// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly ILogger<LogAppIdMiddleware> _logger;

    public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
    {
        _logger = logger;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        if (!string.IsNullOrWhiteSpace(context.AppId))
        {
            _logger.LogInformation("AppId: {AppId}", context.AppId);
        }

        return next(context);
    }
}

// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
    private readonly string _appId;

    public SetAppIdMiddleware(string appId)
    {
        _appId = appId;
    }

    public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
    {
        context.WithAppId(_appId);
        return next(context);
    }
}

Even though those example middlewares are very simple in the real world scenarios it can create very powerfull implementations:

  • retries
  • exception handling
  • logging
  • metrics
  • ...

MediatR

There is an implementation for MediatR library which might greatly simplify consumer implementation.

Simply install package:

Install-Package StreamFlow.RabbitMq.MediatR

Library expects that MediatR itself is already configured and available in dependency injection container.

MediatR Notifications

Request class must implement INotification interface as for example:

public class PingNotification : INotification
{
}

And then it can be used in StreamFlow:

    ....
    .Consumers(builder => builder
        .AddNotification<PingNotification>()
    )
    ....

MediatR Request without response

Request class must implement IRequest interface as for example:

public class PingRequest : IRequest
{
}

And then it can be used in StreamFlow:

    ....
    .Consumers(builder => builder
        .AddRequest<PingRequest>()
    )
    ....

MediatR Request with response

Request class must implement IRequest interface as for example:

public class PongResponse
{
}

public class PingPongRequest : IRequest<PongResponse>
{
}

And then it can be used in StreamFlow:

    ....
    .Consumers(builder => builder
        .AddRequest<PingPongRequest, PongResponse>()
    )
    ....

However in this case you get an additional behavior: response is sent back using RabbitMQ publisher bus. For this to work you also need to enable publisher host (see EnablePublisherHost call):

services.AddStreamFlow(transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection("localhost", "guest", "guest")
            .EnableConsumerHost(consumer => consumer.Prefetch(5))
            .WithPrometheusMetrics()
            .WithPublisherOptions(publisher => publisher
                .EnablePublisherHost()
            )
        )
}    

Metrics

Library has a support for various metrics which represents various information about insights of publishers and consumers. Out of the box there is an implementation for Prometheus metrics, but definitelly its not limited to it.

In order to enable metrics, add Prometheus package:

PM> Install-Package StreamFlow.RabbitMq.Prometheus

And enable metrics when configuring RabbitMQ (see: WithPrometheusMetrics):

services.AddStreamFlow(streamFlowOptions, transport =>
{
    transport
        .UseRabbitMq(mq => mq
            .Connection(options.Host, options.Username, options.Password, options.VirtualHost)
            .WithPrometheusMetrics()
            ....
        );
});

Prometheus metrics

  • streamflow_messages_published

    histogram, represents published message counts and durations (seconds)

    labels: exchange, state [completed or failed]

  • streamflow_bus_messages_published

    histogram, represents published message counts and durations (seconds) when using publisher host

    labels: exchange, state [completed or failed]

  • streamflow_messages_publishing_events

    histogram, represents various events happening during publishing process, like channel creation, preparations, serialization, transaction commit and so on, helps to see if there are any bottlenecks in publisher process

    labels: exchange, event

  • streamflow_messages_publishing_errors

    counter, represents exceptions happened during message publishing

    labels: exchange

  • streamflow_bus_messages_publishing_errors

    counter, represents exceptions happened during message publishing when publisher host is used

    labels: exchange

  • streamflow_messages_consumed

    histogram, consumed message count and consumer durations (seconds)

    labels: exchange, queue, state

  • streamflow_messages_consumed_errors

    counter, represents exceptions got during consumer process

    labels: exchange, queue

  • streamflow_messages_bus_publishing

    counter, shows how many messages are published using publisher host

  • streamflow_messages_bus_publishing_errors

    counter, represents publishing over publisher host errors, since publisher host channel is bounded - it will start increasing when publisher host is receiving more messasges than is able to actually send to RabbitMQ

  • streamflow_publisher_pool_size

    counter, when using publisher pooling, shows pool size (publishers created but currently not used)

  • streamflow_publisher_pool_in_use

    counter, when using publisher pooling, shows how many publisher instances are currently in use

Error Handling

Every application deals with errors. I am pretty sure - RabbitMQ handlers can get into multiple exceptional cases. In case of unhandled exception IRabbitMqErrorHandler instance is created which by default will send a message to error queue. Even though you can define your own error queue name by overriding IRabbitMqConventions interface, default behavior simply add ":Error" suffix to the end of the consumer group queue and send message to that queue with exception details in message header. Application developers or support can later decide what to do with those messages.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net8.0

    • No dependencies.
  • net9.0

    • No dependencies.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on StreamFlow:

Package Downloads
StreamFlow.RabbitMq

RabbitMQ.Client wrapper

StreamFlow.Outbox.EntityFrameworkCore

RabbitMQ.Client wrapper

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
10.0.0 187 12/11/2024
9.1.0 9,871 4/19/2023
9.0.3 735 3/30/2023
9.0.2 324 3/29/2023
9.0.1 307 3/29/2023
9.0.0 327 3/29/2023
8.0.0 1,312 2/18/2023
7.4.0 522 2/11/2023
7.3.1 2,905 11/2/2022
7.3.0 852 10/28/2022
7.2.1 765 10/25/2022
7.2.0 683 10/25/2022
7.1.4 1,157 10/11/2022
7.1.3 740 10/11/2022
7.1.2 742 10/11/2022
7.1.1 727 10/10/2022
7.1.0 804 10/9/2022
7.0.0 708 10/9/2022
6.8.1 3,674 5/20/2022
6.8.0 732 5/20/2022
6.7.2 1,096 2/24/2022
6.7.1 973 1/23/2022
6.7.0 950 1/17/2022
6.6.1 854 1/13/2022
6.6.0 940 1/12/2022
6.5.1 442 12/20/2021
6.5.0 423 12/20/2021
6.4.0 415 12/20/2021
6.3.4 417 12/14/2021
6.3.3 414 12/14/2021
6.3.2 420 12/14/2021
6.3.1 434 12/14/2021
6.3.0 501 12/14/2021
6.2.1 449 12/9/2021
6.2.0 418 12/9/2021
6.1.2 432 12/8/2021
6.1.1 438 12/8/2021
6.1.0 456 12/8/2021
6.0.0 690 12/6/2021
5.2.0 714 11/23/2021
5.1.0 706 11/23/2021
5.0.0 459 11/14/2021
4.1.0 967 10/14/2021
4.0.3 855 9/20/2021
4.0.2 881 9/19/2021
4.0.1 813 9/19/2021
4.0.0 830 9/19/2021
3.9.0 868 9/19/2021
3.8.0 943 9/12/2021
3.7.0 914 9/12/2021
3.5.0 816 9/7/2021
3.4.0 821 9/7/2021
3.3.0 778 9/7/2021
3.2.0 774 9/7/2021
3.1.0 748 9/2/2021
3.0.0 406 9/1/2021
2.5.0 569 5/4/2021
2.3.0 390 5/4/2021
2.2.0 429 4/27/2021
2.1.0 415 4/26/2021
2.0.0 397 4/26/2021