Adaptare 2.2.1

dotnet add package Adaptare --version 2.2.1
                    
NuGet\Install-Package Adaptare -Version 2.2.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Adaptare" Version="2.2.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Adaptare" Version="2.2.1" />
                    
Directory.Packages.props
<PackageReference Include="Adaptare" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Adaptare --version 2.2.1
                    
#r "nuget: Adaptare, 2.2.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Adaptare@2.2.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Adaptare&version=2.2.1
                    
Install as a Cake Addin
#tool nuget:?package=Adaptare&version=2.2.1
                    
Install as a Cake Tool

Adaptare

A lightweight, transport-agnostic .NET message queue abstraction library for sending, receiving, and processing messages across different transports (Direct/in-process, NATS, RabbitMQ).

Overview

Adaptare provides a small set of core abstractions that let you implement and register message handlers and processors without coupling your application logic to a specific transport.

Key abstractions:

  • IMessageSender — Publish, Send, or Request messages (the multiplexer is registered at startup).
  • IMessageReceiver<TSubscriptionSettings> — Subscribe to messages for a specific transport.
  • IMessageExchange — Match a subject/header (MatchAsync) and return a transport-specific IMessageSender.
  • IMessageHandler<T> / IMessageProcessor<T,TR> — Server-side handler/processor.
  • ISubscribeRegistration / IMessageQueueBackgroundRegistration — Background registration and subscription lifecycle management.

Register AddMessageQueue() during startup to add the multiplexer sender, and then add transports with their respective extension methods.

Quick start

dotnet restore
dotnet build -c Release
dotnet test -c Release --no-restore

There is a Direct transport sample under samples/Adaptare.Sample.Direct demonstrating a minimal in-process flow.

Getting started (DI and startup)

Adaptare uses a builder pattern that integrates with Microsoft DI. Register the MessageQueue in your application startup:

builder.Services
    .AddMessageQueue();

Select transports and register handlers/processors with the transport-specific extensions. Direct transport example:

builder.Services
    .AddMessageQueue()
    .AddDirectMessageQueue(cfg => cfg
        .AddProcessor<ProcessorType>("subject1")
        .AddHandler<HandlerType>("subject2")
        .AddReplyHandler<HandlerType2>("subject3"))
    .AddDirectGlobPatternExchange("*");

NATS and RabbitMQ have analogous Add{Transport}MessageQueue(...) extension points.

AddMessageQueue() registers a MultiplexerMessageSender as IMessageSender. The multiplexer uses the configured exchanges (MessageExchangeOptions.Exchanges) and selects the first matching IMessageExchange by invoking MatchAsync.

Message semantics

API methods on IMessageSender express different messaging semantics:

  • PublishAsync: fire-and-forget (one-way).
  • RequestAsync<TMessage, TReply>: Request/Reply: waits for a TReply. If the reply contains MQ_Fail in the header, a MessageProcessFailException is thrown at the caller.
  • SendAsync: transport-specific semantics; some transports (for example, NATS) support sending raw bytes and receiving replies; others may throw NotSupportedException.

Example usage:

var messageSender = serviceProvider.GetRequiredService<IMessageSender>();
await messageSender.PublishAsync("subject1", requestBytes, headers, cancellationToken);

var reply = await messageSender.RequestAsync<byte[], MyReply>("subject1", requestBytes, headers);

Headers, tracing, and failures

Use MessageHeaderValue to carry message metadata. The library provides TraceContextPropagator to inject and extract distributed tracing context from headers (Activity/TraceContext propagation).

Common header keys are defined in MessageHeaderValueConsts:

  • MQ_Ask_Id, MQ_Reply_Id, MQ_Reply, and MQ_Fail.

If a remote handler needs to indicate failure, it sets MQ_Fail on the reply header. The request caller then receives MessageProcessFailException.

Subscriptions and ack behavior

Subscriptions are registered with ISubscribeRegistration, and background registrations create actual consumers. Typical RabbitMQ scenarios:

  • SubscribeRegistration<TMessage, THandler> — Basic handler with AutoAck set to true or false. If AutoAck=false, the consumer must call BasicAck on success and can call BasicNack on failure.
  • AcknowledgeSubscribeRegistration<TMessage, THandler> — Provides advanced ack control using IAcknowledgeMessage<T> and IAcknowledgeMessageHandler<T> (Ack, Nak, progress reports, aborts).

Background registration implementations include NatsBackgroundRegistration and RabbitMQBackgroundRegistration.

Adding transports or exchanges

To add a new exchange or transport:

  1. Implement IMessageExchange:
    • MatchAsync(subject, header) — when this exchange should handle the subject.
    • GetMessageSenderAsync(subject, IServiceProvider) — return a transport-specific IMessageSender.
  2. Register your exchange via MessageQueueConfiguration.AddExchange(exchange) or implement an Add{Transport}MessageQueue extension.

Note the multiplexer uses the first matching exchange, so the registration order affects routing. Use MessageQueueConfiguration.PushExchange if you need to prioritize an exchange.

Serializers

Each transport exposes a serializer registry (for example, IRabbitMQSerializerRegistry and INatsSerializerRegistry). Use GetSerializer<T> and GetDeserializer<T> for typed serialization and deserialization.

Testing

  • Tests use xUnit + NSubstitute and are located under Adaptare.*.UnitTests.
  • During unit tests, use AddFake{Transport}MessageQueue to avoid starting background registrations or creating real network connections (see AddFakeNatsMessageQueue and AddFakeRabbitMessageQueue).

Troubleshooting & common issues

  • MessageSenderNotFoundException: thrown when no exchange matches the subject — verify you registered an exchange (or used Add*GlobPatternExchange) and check exchange order.
  • MessageProcessFailException: thrown when a reply header contains MQ_Fail.
  • RabbitMQ ack issues: if AutoAck = false, ensure BasicAck is called when processing succeeds, otherwise messages will be requeued.
  • NATS connectivity issues: verify INatsConnectionManager configuration and registerName settings.

Contributing & code style

See .github/copilot-instructions.md for Copilot guidance and contributor guidelines. Short code style pointers:

  • PascalCase for types, I prefix for interfaces.
  • Private static fields start with _, non-static private fields with m_ (project convention).
  • Tabs for indentation (size 4), lines limited to 100 characters, CRLF line endings.

When adding a new transport or feature, prefer adding an IMessageExchange implementation and writing appropriate tests.


Snippets

Dependency Injection:

builder.Services
    .AddMessageQueue();

Configure a transport (Direct):

builder.Services
    .AddMessageQueue()
    .AddDirectMessageQueue(config => config
        .AddProcessor<ProcessorType>("subject1")
        .AddHandler<HandlerType>("subject2")
        .AddReplyHandler<HandlerType2>("subject3"))
    .AddDirectGlobPatternExchange("*");

Sending messages:

var messageSender = serviceProvider.GetRequiredService<IMessageSender>();

var request = new SendMessageType
{
    // fill request payload
};

await messageSender.PublishAsync("subject1", request.ToByteArray(), cancellationToken).ConfigureAwait(false);

// or

var responseData = await messageSender.RequestAsync<byte[]>("subject1", request.ToByteArray(), cancellationToken).ConfigureAwait(false);

// parse response if typed
var response = ReceiveType.Parser.ParseFrom(responseData.Span);

Receiving messages:

internal class ProcessorType : IMessageProcessor
{
    public async ValueTask<ReadOnlyMemory<byte>> HandleAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
    {
        var request = SendMessageType.Parser.ParseFrom(data.Span);

        // ...process request

        return response.ToByteArray();
    }
}

internal class HandlerType : IMessageHandler
{
    public async ValueTask HandleAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
    {
        var request = SendMessageType.Parser.ParseFrom(data.Span);

        // ...process request
    }
}

License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Adaptare:

Package Downloads
Adaptare.RabbitMQ

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

Adaptare.Nats

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

Adaptare.Direct

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.2.1 136 6/8/2026
2.2.0 512 12/12/2025
2.1.0 315 11/24/2025
2.0.2 621 7/28/2025
2.0.1 218 7/28/2025
2.0.0 362 5/18/2025
1.0.1-alpha 230 5/17/2025
1.0.0 263 1/12/2025