TAF.Infra.MassTransit
2.2.14
dotnet add package TAF.Infra.MassTransit --version 2.2.14
NuGet\Install-Package TAF.Infra.MassTransit -Version 2.2.14
<PackageReference Include="TAF.Infra.MassTransit" Version="2.2.14" />
<PackageVersion Include="TAF.Infra.MassTransit" Version="2.2.14" />
<PackageReference Include="TAF.Infra.MassTransit" />
paket add TAF.Infra.MassTransit --version 2.2.14
#r "nuget: TAF.Infra.MassTransit, 2.2.14"
#:package TAF.Infra.MassTransit@2.2.14
#addin nuget:?package=TAF.Infra.MassTransit&version=2.2.14
#tool nuget:?package=TAF.Infra.MassTransit&version=2.2.14
TAF.Infra.MassTransit
A comprehensive abstraction layer over MediatR and MassTransit that provides unified Command, Query, and Event handling patterns with support for both local (in-process) and distributed (message broker) execution.
๐ Features
- Unified CQRS Pattern: Clean separation of Commands, Queries, and Events
- Dual Execution Modes:
- Local execution via MediatR
- Distributed execution via MassTransit (RabbitMQ, Azure Service Bus, Amazon SQS)
- Smart Queue Naming: Configurable naming conventions with automatic sanitization and duplicate suffix removal
- Context-Aware: Built-in context propagation for multi-tenant, multi-app scenarios with helper utilities
- Multiple Message Brokers: Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory
- Automatic Registration: Auto-discovery and registration of handlers
- Retry Policies: Configurable retry mechanisms with exponential backoff
- Type Safety: Strongly-typed message contracts and handlers
- Environment Support: Multi-environment queue/topic naming with project and environment identifiers
๐ฆ Installation
dotnet add package TAF.Infra.MassTransit
๐๏ธ Architecture Overview
The library provides three main abstractions:
- Commands: Actions that change system state (fire-and-forget or with response)
- Queries: Read-only operations that return data (always local via MediatR)
- Events: Notifications about things that have happened (distributed via message broker)
Execution Patterns
| Pattern | Local (MediatR) | Distributed (MassTransit) |
|---|---|---|
| Commands | โ
SendAsync() |
โ
PublishAsync() |
| Queries | โ
SendAsync() |
โ (Always local) |
| Events | โ | โ
PublishAsync() |
Queue/Topic Naming Convention
The library automatically generates structured names for queues and topics:
- Commands:
{projectName}-{environment}-command-{commandName} - Events:
{projectName}-{environment}-event-{eventName}
Examples:
CreateUserCommandโmyproject-prod-command-createuserUserCreatedEventโmyproject-prod-event-usercreated
๐ Quick Start
1. Configuration
appsettings.json:
{
"MessageBus": {
"Host": "localhost",
"Port": 5672,
"Username": "guest",
"Password": "guest",
"VirtualHost": "/",
"ExchangeName": "default",
"RetryLimit": 5,
"UseMessageScheduler": true
}
}
Program.cs:
using TAF.Infra.MassTransit;
using TAF.Infra.MassTransit.Enums;
var builder = WebApplication.CreateBuilder(args);
// Basic configuration from appsettings
builder.Services.AddMessageBus(builder.Configuration);
// With custom naming conventions
builder.Services.AddMessageBusWithNaming(
builder.Configuration,
projectName: "MyProject",
environment: "production"
);
// Manual configuration with naming
builder.Services.AddMessageBusWithNaming(
options =>
{
options.Provider = MessageBusConfigurationEnum.RabbitMQ;
options.Settings["Host"] = "localhost";
options.Settings["Username"] = "guest";
options.Settings["Password"] = "guest";
},
projectName: "MyProject",
environment: "dev",
commandQueueTemplate: "{projectName}-{environment}-cmd-{commandName}",
eventTopicTemplate: "{projectName}-{environment}-evt-{eventName}"
);
2. Create Messages
Commands:
using TAF.Infra.MassTransit.Base;
// Fire-and-forget command
public class CreateUserCommand : BaseCommand
{
public string Name { get; set; }
public string Email { get; set; }
}
// Command with response
public class GetUserCommand : BaseCommand<UserResponse>
{
public Guid UserId { get; set; }
}
public class UserResponse
{
public string Name { get; set; }
public string Email { get; set; }
}
Queries:
using TAF.Infra.MassTransit.Base;
public class GetUsersQuery : BaseQuery<List<UserResponse>>
{
public int PageSize { get; set; } = 10;
public int PageNumber { get; set; } = 1;
}
Events:
using TAF.Infra.MassTransit.Base;
public class UserCreatedEvent : BaseEvent
{
public Guid UserId { get; set; }
public string Name { get; set; }
public string Email { get; set; }
}
3. Create Handlers
Command Handler:
using TAF.Infra.MassTransit.Abstractions;
using TAF.Infra.MassTransit.Abstractions.Command;
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
private readonly IUserRepository _userRepository;
private readonly IEventBus _eventBus;
public CreateUserCommandHandler(IUserRepository userRepository, IEventBus eventBus)
{
_userRepository = userRepository;
_eventBus = eventBus;
}
public async Task HandleAsync(CreateUserCommand command, Context context,
CancellationToken cancellationToken = default)
{
var user = new User
{
Name = command.Name,
Email = command.Email
};
await _userRepository.CreateAsync(user);
// Publish event
var userCreatedEvent = new UserCreatedEvent
{
UserId = user.Id,
Name = user.Name,
Email = user.Email
};
await _eventBus.PublishAsync(userCreatedEvent, context, cancellationToken);
}
}
Query Handler:
using TAF.Infra.MassTransit.Abstractions.Query;
public class GetUsersQueryHandler : IQueryHandler<GetUsersQuery, List<UserResponse>>
{
private readonly IUserRepository _userRepository;
public GetUsersQueryHandler(IUserRepository userRepository)
{
_userRepository = userRepository;
}
public async Task<List<UserResponse>> HandleAsync(GetUsersQuery query, Context context,
CancellationToken cancellationToken = default)
{
var users = await _userRepository.GetPagedAsync(query.PageNumber, query.PageSize);
return users.Select(u => new UserResponse { Name = u.Name, Email = u.Email }).ToList();
}
}
Event Handler:
using TAF.Infra.MassTransit.Abstractions.Event;
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
public UserCreatedEventHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task HandleAsync(UserCreatedEvent @event, Context context,
CancellationToken cancellationToken = default)
{
await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
}
}
4. Usage in Controllers/Services
using TAF.Infra.MassTransit.Helpers;
[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
private readonly ICommandBus _commandBus;
private readonly IQueryBus _queryBus;
public UsersController(ICommandBus commandBus, IQueryBus queryBus)
{
_commandBus = commandBus;
_queryBus = queryBus;
}
[HttpPost]
public async Task<IActionResult> CreateUser([FromBody] CreateUserRequest request)
{
// Create context from HTTP headers (extracts tenant, user info, etc.)
var context = ContextHelper.CreateFromHeaders(Request.Headers);
var command = new CreateUserCommand
{
Name = request.Name,
Email = request.Email
};
// Execute locally via MediatR
await _commandBus.SendAsync<CreateUserCommand, Unit>(command, context);
// OR execute distributed via MassTransit
// Queue: myproject-prod-command-createuser
await _commandBus.PublishAsync(command, context);
return Ok();
}
[HttpGet]
public async Task<IActionResult> GetUsers([FromQuery] int page = 1, [FromQuery] int size = 10)
{
// Create context from parameters
var context = ContextHelper.CreateFromParameters(
userId: User.Identity.Name,
tenantId: "tenant-123",
appId: "web-api",
environmentId: "production"
);
var query = new GetUsersQuery
{
PageNumber = page,
PageSize = size
};
// Queries are always executed locally
var result = await _queryBus.SendAsync<GetUsersQuery, List<UserResponse>>(query, context);
return Ok(result);
}
}
โ๏ธ Configuration Options
Message Broker Providers
services.AddMessageBus(options =>
{
// RabbitMQ (default)
options.Provider = MessageBusConfigurationEnum.RabbitMQ;
// Azure Service Bus
options.Provider = MessageBusConfigurationEnum.AzureServiceBus;
// Amazon SQS
options.Provider = MessageBusConfigurationEnum.AmazonSQS;
// In-Memory (testing)
options.Provider = MessageBusConfigurationEnum.InMemory;
});
Smart Naming Configuration
// Default templates: {projectName}-{environment}-command-{commandName}
services.AddMessageBusWithNaming(
configuration,
projectName: "MyProject", // Default: assembly name
environment: "production", // Default: "dev"
commandQueueTemplate: "{projectName}.{environment}.cmd.{commandName}",
eventTopicTemplate: "{projectName}.{environment}.evt.{eventName}"
);
Naming Examples:
// Class: CreateUserCommand
// Result: myproject-prod-command-createuser
// Class: CreateUserCommandCommand (duplicate suffix)
// Result: myproject-prod-command-createusercommand (removes last "Command")
// Class: UserCreatedEvent
// Result: myproject-prod-event-usercreated
// Class: UserCreatedEventEvent (duplicate suffix)
// Result: myproject-prod-event-usercreatedevent (removes last "Event")
Advanced Configuration
services.Configure<MessageBrokerSettings>(options =>
{
options.Host = "rabbitmq.example.com";
options.Port = 5672;
options.Username = "myuser";
options.Password = "mypassword";
options.VirtualHost = "/myapp";
options.UseSsl = true;
options.PrefetchCount = 50;
// Retry settings
options.MassTransit.RetryPolicy.RetryLimit = 3;
options.MassTransit.RetryPolicy.InitialIntervalMs = 1000;
options.MassTransit.RetryPolicy.MaxIntervalMs = 30000;
});
๐ ๏ธ Context and Multi-Tenancy
The library provides built-in context support for cross-cutting concerns:
Context Helper Utilities
using TAF.Infra.MassTransit.Helpers;
// Create context from HTTP headers (in controllers/middleware)
var context = ContextHelper.CreateFromHeaders(Request.Headers);
// Create context from parameters (in services/background jobs)
var context = ContextHelper.CreateFromParameters(
userId: "user-123",
tenantId: "tenant-456",
appId: "web-api",
environmentId: "production",
correlationId: Guid.NewGuid(),
properties: new Dictionary<string, object>
{
{ "TraceId", "trace-789" },
{ "ClientIP", "192.168.1.1" }
}
);
// Manual context creation
var context = new Context
{
CorrelationId = Guid.NewGuid(),
TenantId = "tenant-123",
AppId = "web-app",
UserId = "user-456",
EnvironmentId = "production"
};
// Add custom properties
context.SetProperty("TraceId", "trace-789");
context.SetProperty("ClientIP", "192.168.1.1");
Context in Middleware
public class ContextMiddleware
{
public async Task InvokeAsync(HttpContext httpContext, RequestDelegate next)
{
// Automatically extract context from headers
var messageContext = ContextHelper.CreateFromHeaders(httpContext.Request.Headers);
// Store in some context provider for later use
using var scope = new ContextScope(messageContext);
await next(httpContext);
}
}
๐ Execution Patterns
Local Execution (MediatR)
- Fast, in-process execution
- Suitable for simple operations
- Automatic transaction boundaries
- Commands and Queries
Distributed Execution (MassTransit)
- Resilient, scalable execution
- Cross-service communication
- Built-in retry and error handling
- Commands and Events only
๐งช Testing Support
// Use In-Memory provider for testing
services.AddMessageBusWithNaming(
options =>
{
options.Provider = MessageBusConfigurationEnum.InMemory;
},
projectName: "TestProject",
environment: "test"
);
๐ง Advanced Features
Custom Base Classes
public class AuditableCommand : BaseCommand
{
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public string CreatedBy { get; set; }
}
public class CreateOrderCommand : AuditableCommand
{
public List<OrderItem> Items { get; set; }
public decimal Total { get; set; }
}
Multiple Handlers
// Multiple event handlers for the same event
public class EmailNotificationHandler : IEventHandler<UserCreatedEvent> { ... }
public class AuditLogHandler : IEventHandler<UserCreatedEvent> { ... }
public class MetricsHandler : IEventHandler<UserCreatedEvent> { ... }
Queue Name Resolution
// The library automatically handles complex naming scenarios:
// Full class name: MyApp.Commands.User.CreateUserCommand
// Generated queue: myapp-prod-command-createuser
// Full class name: MyApp.Events.Order.OrderShippedEvent
// Generated topic: myapp-prod-event-ordershipped
// Duplicate suffix handling:
// CreateUserCommandCommand โ myapp-prod-command-createusercommand
// UserCreatedEventEvent โ myapp-prod-event-usercreatedevent
๐ API Reference
Core Interfaces
ICommandBus- Command execution (local/distributed)IQueryBus- Query execution (local only)IEventBus- Event publishing (distributed)ICommandHandler<T>- Command handler interfaceICommandHandler<T, R>- Command handler with response interfaceIQueryHandler<T, R>- Query handler interfaceIEventHandler<T>- Event handler interface
Base Classes
BaseCommand- Base class for commandsBaseCommand<T>- Base class for commands with responseBaseQuery<T>- Base class for queriesBaseEvent- Base class for events
Helper Classes
ContextHelper- Static helper for context creationMessageNamingConfiguration- Configuration for queue/topic namingContextScope- Scoped context management
Configuration Classes
MessageBusConfiguration- Main configurationMessageBrokerSettings- Broker-specific settingsMessageNamingConfiguration- Naming convention settings
๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
๐ License
This project is licensed under the MIT License.
๐ Issues
If you encounter any issues, please file them in the GitHub Issues section.
Built with โค๏ธ for distributed systems
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- MassTransit (>= 8.5.2)
- MassTransit.AmazonSQS (>= 8.5.2)
- MassTransit.Azure.ServiceBus.Core (>= 8.5.2)
- MassTransit.RabbitMQ (>= 8.5.2)
- MediatR (>= 13.0.0)
- Microsoft.AspNetCore.Http.Abstractions (>= 2.2.0)
- Microsoft.AspNetCore.Http.Features (>= 5.0.17)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.8)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.2)
- Newtonsoft.Json (>= 13.0.3)
- TAF.Infra.Contract (>= 1.11.5)
NuGet packages (11)
Showing the top 5 NuGet packages that depend on TAF.Infra.MassTransit:
| Package | Downloads |
|---|---|
|
TAF.MetaData.SDK
Professional HTTP-based SDK for TAF Metadata Service. v10.0.0: Clean release - no caching dependencies. Features Clean Architecture, SOLID principles, explicit BusContext parameter support, comprehensive error handling, screen management, batch relations support, and GUID-based lookups. |
|
|
TAF.CRUD.SDK
Official SDK for consuming TAF.CRUD microservice. v6.0.6: Trigger field-criteria now skips fields absent from UPDATE payloads โ prevents false "changed to null" detections when the caller didn't touch a selected field. |
|
|
TAF.Kibana.SDK
Professional SDK for TAF Kibana logging and analytics platform. Version 1.4.6 restores PropertyNameCaseInsensitive=true on the JsonSerializerOptions used by KibanaClient.PostAsync / DeleteAsync. The 1.4.5 dev-merge accidentally dropped this flag (only present on the pre-merge HEAD lineage). Without it, the camelCase naming policy fails to match the kibana service's PascalCase "IsSuccess" key โ OperationResult.IsSuccess silently deserialises as false and audit / global-search calls surface a misleading "No response from server" warning even when the upstream service returned 200. Version 1.4.5 merges dev-branch resilience work into the 1.4.x audit lineage: HttpClient is now configured once via the factory (BaseAddress / Timeout / ApiKey) with Polly retry + circuit-breaker attached as policy handlers, replacing the per-call hand-rolled retry; PostAsync/DeleteAsync surface BrokenCircuitException as a typed warning. The 1.4.x feature surface (audit middleware, AuditService crash fix, QueryString/RequestBody/ResponseBody schema fields) is preserved. Version 1.4.3 fixes a crash in AuditService.CreateAsync, GlobalSearchService.IndexAsync, and GlobalSearchService.DeleteAsync where OperationResult.Value was accessed on a failed result, throwing InvalidOperationException("Cannot access Value of a failed result"). Now checks OperationResult.IsSuccess first and surfaces the server-side Error.Message in the failure-result's ErrorMessage. Version 1.4.0 adds three optional fields to LogEntry / IndexLogRequestModel โ QueryString, RequestBody, ResponseBody. Version 1.3.0 added typed log-search filter inputs and Log Detail DTOs for GET /api/v1/logging/{id}. Version 1.2.0 added working WriteAsync / WriteBulkAsync log shipping. Previous versions cover Audit + GlobalSearch. |
|
|
TAF.Infra.WorkflowAction
Workflow Action abstractions, models, attributes, and helpers for TAF platform. Provides IWorkflowAction interface, ActionContext, ActionDefinition, ActionResult, and validation helpers for building workflow actions and transition hooks. ActionContext now includes OldRecordData โ a pre-update snapshot of the record that fired the trigger, enabling custom actions to compare old vs new field values on Edit events. |
|
|
TAF.FileManager.SDK
HTTP-based SDK for TAF FileManager Service. Provides BusContext extension methods for file operations. Features: context.GetImageAsync(path) for images, context.GetFileAsync(path, fileName) for file downloads, context.UploadFileAsync(stream, fileName, path) for file uploads (returns actual MediaId from TABD_Media), graceful 404 handling (returns null), context header propagation (TenantId, AppId, EnvironmentId). Includes FileManager.Domain for constants and SharedKernel for upload response models. Designed for Base64 image embedding in PDF reports, Excel file downloads, and import result uploads. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated | |
|---|---|---|---|
| 2.2.14 | 105 | 5/20/2026 | |
| 2.2.13 | 159 | 5/19/2026 | |
| 2.2.10 | 130 | 5/14/2026 | |
| 2.2.9 | 217 | 5/5/2026 | |
| 2.2.7 | 399 | 4/22/2026 | |
| 2.2.5 | 440 | 3/1/2026 | |
| 2.2.4 | 584 | 2/10/2026 | |
| 2.2.3 | 1,071 | 2/5/2026 | |
| 2.2.2 | 337 | 1/2/2026 | |
| 2.2.1 | 389 | 12/29/2025 | |
| 2.1.0 | 909 | 12/19/2025 | |
| 2.0.0 | 1,120 | 9/24/2025 | |
| 1.0.4 | 418 | 9/2/2025 | |
| 1.0.3 | 308 | 9/1/2025 | |
| 1.0.2 | 304 | 9/1/2025 | |
| 1.0.1 | 362 | 8/30/2025 | |
| 1.0.0 | 351 | 8/29/2025 |
v2.2.14 - Add context fields to RegisterUserCommand:
- RegisterUserCommand now carries TenantId, AppId, EnvironmentId (Guid?) from the originating BusContext
- Allows Identity service consumer to apply proper tenant/app/env isolation when registering cross-service users
v2.2.13 - Add publish timeout to fail fast when RabbitMQ is unreachable:
- CommandBus.PublishAsync and EventBus.PublishAsync now time out after PublishTimeoutSeconds (default 5s)
- Prevents API from hanging while MassTransit retries the broker connection internally
- IsBrokerException extended to also catch MassTransit.RabbitMqConnectionException (outer exception)
- MessageBrokerSettings.PublishTimeoutSeconds configurable via MessageBus:Settings:PublishTimeoutSeconds
v2.2.12 - RabbitMQ connection failure returns HTTP 503:
- CommandBus.PublishAsync and EventBus.PublishAsync now wrap RabbitMQ.Client exceptions as MessageBrokerException
- ExceptionToErrorMapper maps MessageBrokerException to HTTP 503 SERVICE_UNAVAILABLE
- Previously broker failures returned a generic HTTP 500
v2.2.11 - Re-pack to materialize HeaderConstants.ServiceName in the binary:
- The 2.2.9 release notes added Constants/HeaderConstants.ServiceName in source, but the 2.2.9 / 2.2.10 packed assemblies shipped without the constant (source change post-dated the pack).
- 2.2.11 re-packs the current source so ServiceName is actually present at runtime. No source-level changes from 2.2.10.
v2.2.10 - Cross-service register-user bridge (CRUD โ Identity):
- Added Commands/Identity/RegisterUserCommand.cs (request/response shape)
- Added Commands/Identity/RegisterUserCommandResponse.cs
- Added Constants/ServiceNameConstants.cs (target-service entry-assembly names for queue routing)
v2.2.9 - Added shared log-property key for service identifier:
- Added Constants/HeaderConstants.ServiceName so log-context property keys can be sourced from a single canonical constant across services (Rule 13).
v2.2.7 - Added EnvironmentSyncCommand for Identity env DB fan-out (Phase 2):
- Added Commands/Identity/EnvironmentSyncCommand.cs
- Added Constants/SyncActionsConstants.cs
v2.2.6 - Added Identity sync events for CRUD-to-Identity pipeline:
- Added Events/Identity/IdentityUserCreatedEvent.cs
- Added Events/Identity/IdentityUserUpdatedEvent.cs
- Added Events/Identity/IdentityRoleCreatedEvent.cs
- Added Events/Identity/IdentityRoleUpdatedEvent.cs
- Added Events/Identity/IdentityUserRoleAssignedEvent.cs
- Added Events/Identity/IdentityUserRoleRemovedEvent.cs
- Added Events/Identity/IdentityUserEnvironmentLinkedEvent.cs
- Added Events/Identity/IdentityUserEnvironmentUnlinkedEvent.cs
- Added Events/Identity/IdentitySubscriptionUserCreatedEvent.cs
v2.2.5 - Added SettingsChangedEvent:
- Added Events/Settings/SettingsChangedEvent.cs
- Published by CRUD when TABD_Settings is modified
- Consumed by TAF.Infra.Configuration for instant local cache invalidation across all services
- Carries AppId, EnvironmentId, and optional Key that changed
v2.2.1 - JWT Authentication Patch:
- Removed TenantId extraction from JWT claims (TenantId comes from headers only)
- UserId extraction from JWT "sub" claim remains unchanged
- JWT UserId takes precedence over UserId header when authenticated
v2.2.0 - JWT Authentication Support:
- Added SetFromHttpContext() method to ContextHelper for JWT claim extraction
- Automatic UserId extraction from JWT "sub" claim when user is authenticated
- JWT claims take precedence over HTTP headers for UserId
- Backward compatible - existing SetFromHeaders() method unchanged
- All services using this package can now seamlessly integrate JWT authentication
v2.1.0 - Platform Constants and Default Environment Support:
- Added PlatformConstants with TAB platform identifiers (TAB_TENANT_ID, TAB_APP_ID, TAB_ENVIRONMENT_ID)
- ContextHelper now auto-applies default TAB environment when EnvironmentId header is missing
- Auto-generates CorrelationId if not provided in headers
- All services automatically use TAB platform environment for operations without explicit environment header
v2.0.0 - Initial release featuring:
- Unified CQRS pattern with Commands, Queries, and Events
- Dual execution modes (local via MediatR, distributed via MassTransit)
- Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory brokers
- Built-in context propagation for multi-tenant scenarios
- Automatic handler discovery and registration
- Configurable retry policies and error handling
- Type-safe message contracts and handlers