Myth.Flow.Actions
3.0.4-preview.9
See the version list below for details.
dotnet add package Myth.Flow.Actions --version 3.0.4-preview.9
NuGet\Install-Package Myth.Flow.Actions -Version 3.0.4-preview.9
<PackageReference Include="Myth.Flow.Actions" Version="3.0.4-preview.9" />
<PackageVersion Include="Myth.Flow.Actions" Version="3.0.4-preview.9" />
<PackageReference Include="Myth.Flow.Actions" />
paket add Myth.Flow.Actions --version 3.0.4-preview.9
#r "nuget: Myth.Flow.Actions, 3.0.4-preview.9"
#:package Myth.Flow.Actions@3.0.4-preview.9
#addin nuget:?package=Myth.Flow.Actions&version=3.0.4-preview.9&prerelease
#tool nuget:?package=Myth.Flow.Actions&version=3.0.4-preview.9&prerelease
Myth.Flow.Actions
A powerful .NET library implementing CQRS and Event-Driven Architecture patterns with seamless integration to Myth.Flow pipelines. Built for scalability with support for multiple message brokers, caching strategies, and enterprise-grade resilience features.
π Action-First Pipeline API
This library features a revolutionary Action-First approach that eliminates context boilerplate and dramatically simplifies pipeline development:
// β OLD: Context-based (lots of boilerplate)
Pipeline.Start(context)
.Process<Context, Command>(ctx => new Command { ... }, (ctx, result) => ctx.Result = result)
// β
NEW: Action-First (clean and direct)
Pipeline.Start(new Command { ... }, serviceProvider)
.Process<Command, Result>()
Benefits:
- 70% less boilerplate code - No context classes needed
- Type-safe transformations - Direct object-to-object pipeline flow
- Intuitive developer experience - Actions ready for execution
- Fluent cache configuration -
x => x.UseCache("key", TimeSpan.FromMinutes(5)) - Utility pipelines - Start without parameters for functional scenarios
β Features
- Action-First API: Revolutionary pipeline approach with zero context boilerplate
- CQRS Pattern: Clean separation of Commands, Queries, and Events
- Pipeline Integration: Fluent integration with Myth.Flow for composable workflows
- Multiple Message Brokers: InMemory (dev/test), Kafka, and RabbitMQ support
- Query Caching: Built-in caching with Memory and Redis providers with fluent configuration
- Event-Driven Architecture: Publish/subscribe with multiple handler support
- Resilience Patterns: Retry policies with exponential backoff, circuit breakers, and dead letter queues
- Auto-Discovery: Automatic handler registration via assembly scanning
- OpenTelemetry Integration: Built-in distributed tracing and observability
- Type Safety: Fully typed APIs with compile-time safety
π¦ Installation
dotnet add package Myth.Flow.Actions
Optional Dependencies
# For Kafka support
dotnet add package Confluent.Kafka
# For RabbitMQ support
dotnet add package RabbitMQ.Client
# For Redis distributed caching
dotnet add package Microsoft.Extensions.Caching.StackExchangeRedis
π Quick Start
1. Configure Services
using Myth.Flow.Actions.Extensions;
using Myth.Flow.Actions.Settings;
builder.Services.AddFlowActions(config =>
{
config.BrokerType = MessageBrokerType.InMemory;
config.TelemetryEnabled = true;
config.CachingEnabled = true;
config.AssembliesToScan.Add(typeof(Program).Assembly);
});
2. Define Commands, Queries, and Events
Command
using Myth.Interfaces;
using Myth.Models;
public record CreateUserCommand : ICommand<Guid>
{
public required string Email { get; init; }
public required string Name { get; init; }
}
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand, Guid>
{
private readonly IUserRepository _repository;
public CreateUserCommandHandler(IUserRepository repository)
{
_repository = repository;
}
public async Task<CommandResult<Guid>> HandleAsync(
CreateUserCommand command,
CancellationToken cancellationToken = default)
{
var user = new User
{
Id = Guid.NewGuid(),
Email = command.Email,
Name = command.Name
};
await _repository.AddAsync(user, cancellationToken);
return CommandResult<Guid>.Success(user.Id);
}
}
Query
public record GetUserQuery : IQuery<UserDto>
{
public required Guid UserId { get; init; }
}
public class GetUserQueryHandler : IQueryHandler<GetUserQuery, UserDto>
{
private readonly IUserRepository _repository;
public GetUserQueryHandler(IUserRepository repository)
{
_repository = repository;
}
public async Task<QueryResult<UserDto>> HandleAsync(
GetUserQuery query,
CancellationToken cancellationToken = default)
{
var user = await _repository.GetByIdAsync(query.UserId, cancellationToken);
if (user == null)
return QueryResult<UserDto>.Failure("User not found");
var dto = new UserDto
{
Id = user.Id,
Email = user.Email,
Name = user.Name
};
return QueryResult<UserDto>.Success(dto);
}
}
Event
public record UserCreatedEvent : DomainEvent
{
public required Guid UserId { get; init; }
public required string Email { get; init; }
}
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
public UserCreatedEventHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task HandleAsync(
UserCreatedEvent @event,
CancellationToken cancellationToken = default)
{
await _emailService.SendWelcomeEmailAsync(@event.Email, cancellationToken);
}
}
3. Use Action-First Pipeline
Simple Pipeline Example
using Myth.Flow.Actions;
// Direct action execution - no context needed!
var result = await Pipeline
.Start(new CreateUserCommand { Email = "user@example.com", Name = "John Doe" }, serviceProvider)
.Process<CreateUserCommand, Guid>()
.ExecuteAsync();
if (result.IsSuccess)
{
Console.WriteLine($"User created with ID: {result.Value}");
}
Complex Workflow Example
// Chain operations with transformations
var result = await Pipeline
.Start(new CreateUserCommand { Email = "user@example.com", Name = "John Doe" }, serviceProvider)
.Process<CreateUserCommand, Guid>() // Command β Guid
.Transform(userId => new GetUserQuery { UserId = userId }) // Guid β Query
.Query<GetUserQuery, UserDto>(x => x.UseCache($"user:{userId}", TimeSpan.FromMinutes(10))) // Query with cache
.Transform(user => new UserCreatedEvent { UserId = user.Id, Email = user.Email }) // User β Event
.Publish<UserCreatedEvent>() // Publish event
.ExecuteAsync();
if (result.IsSuccess)
{
Console.WriteLine("User creation workflow completed successfully!");
}
Utility Pipeline (Empty Start)
// Start without initial data for utility functions
var result = await PipelineExtensions
.Start(serviceProvider)
.Transform(() => new GetActiveUsersQuery())
.Query<GetActiveUsersQuery, List<UserDto>>()
.Transform(users => new GenerateReportCommand { Users = users })
.Process<GenerateReportCommand, ReportDto>()
.ExecuteAsync();
π οΈ Intermediate Pipeline Steps
The Action-First API now supports all Myth.Flow pipeline methods for adding custom logic, validation, telemetry, and resilience patterns between action operations:
Validation and Custom Steps
var result = await Pipeline
.Start(new CreateUserCommand { Email = "user@example.com" }, serviceProvider)
// Validate input before processing
.Step<IValidationService>((validator, state) => {
validator.ValidateEmail(state.CurrentRequest!.Email);
return state;
})
// Add custom business logic
.StepAsync<IUserService>(async (userService, state) => {
await userService.CheckUserLimitsAsync(state.CurrentRequest!.Email);
return state;
})
.Process<CreateUserCommand, Guid>()
.ExecuteAsync();
Side Effects and Logging
var result = await Pipeline
.Start(new GetUserQuery { UserId = userId }, serviceProvider)
// Log the start of the operation
.Tap<ILogger>((logger, state) =>
logger.LogInformation("Querying user {UserId}", state.CurrentRequest!.UserId))
.Query<GetUserQuery, UserDto>()
// Log after successful query
.TapAsync<IMetricsService>(async (metrics, state) =>
await metrics.RecordQueryExecutionAsync("GetUser"))
.ExecuteAsync();
Conditional Execution
var result = await Pipeline
.Start(new ProcessOrderCommand { OrderId = orderId }, serviceProvider)
// Only validate payment if order requires it
.When(state => state.CurrentRequest!.RequiresPayment, builder =>
builder.StepAsync<IPaymentService>((payment, state) =>
payment.ValidatePaymentMethodAsync(state.CurrentRequest!.PaymentInfo)))
.Process<ProcessOrderCommand, OrderResult>()
.ExecuteAsync();
Resilience and Telemetry
var result = await Pipeline
.Start(new CallExternalApiQuery { Endpoint = "users" }, serviceProvider)
// Add telemetry tracking
.WithTelemetry("ExternalApiCall")
// Configure retry policy for external calls
.WithRetry(maxAttempts: 3, backoffMs: 1000)
.Query<CallExternalApiQuery, ApiResponse>()
.ExecuteAsync();
Complex Workflows with Multiple Steps
var result = await Pipeline
.Start(new CreateOrderCommand { CustomerId = customerId, Items = items }, serviceProvider)
// Validate customer
.StepAsync<ICustomerService>(async (service, state) => {
await service.ValidateCustomerAsync(state.CurrentRequest!.CustomerId);
return state;
})
// Check inventory
.StepAsync<IInventoryService>(async (service, state) => {
await service.ReserveItemsAsync(state.CurrentRequest!.Items);
return state;
})
// Log before processing
.Tap<ILogger>((logger, state) =>
logger.LogInformation("Processing order for customer {CustomerId}",
state.CurrentRequest!.CustomerId))
// Process the order
.Process<CreateOrderCommand, OrderResult>()
// Send notification after success
.TapAsync<INotificationService>(async (notifications, state) =>
await notifications.SendOrderConfirmationAsync(state.CurrentRequest!))
.ExecuteAsync();
Available Intermediate Methods
Step<TService>()- Synchronous operations with dependency injectionStepAsync<TService>()- Asynchronous operations with dependency injectionStepResult<TService>()- Operations returningResult<T>for error handlingStepResultAsync<TService>()- Async operations returningResult<T>Tap<TService>()- Side effects (logging, metrics, events) with DITapAsync<TService>()- Async side effects with DIWhen(predicate, configure)- Conditional pipeline executionWithRetry(maxAttempts, backoffMs)- Retry policies with exponential backoffWithTelemetry(operationName)- OpenTelemetry distributed tracing
All methods maintain the fluent API design and can be chained together for complex workflows while preserving type safety and the Action-First approach.
π§ Configuration
InMemory Broker
services.AddFlowActions(config =>
{
config.BrokerType = MessageBrokerType.InMemory;
config.AssembliesToScan.Add(typeof(Program).Assembly);
});
Kafka
services.AddFlowActions(config =>
{
config.BrokerType = MessageBrokerType.Kafka;
config.BrokerConfigurationFactory = () => new KafkaOptions
{
BootstrapServers = "localhost:9092",
GroupId = "my-service",
ClientId = "my-service-instance-1",
EnableAutoCommit = false,
SessionTimeoutMs = 30000,
AutoOffsetReset = "earliest"
};
config.TelemetryEnabled = true;
config.AssembliesToScan.Add(typeof(Program).Assembly);
});
RabbitMQ
services.AddFlowActions(config =>
{
config.BrokerType = MessageBrokerType.RabbitMQ;
config.BrokerConfigurationFactory = () => new RabbitMQOptions
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/",
ExchangeName = "my-service-events",
ExchangeType = "topic"
};
config.CachingEnabled = true;
config.CacheConfiguration = cache =>
{
cache.ProviderType = CacheProviderType.Distributed;
cache.ConnectionString = "localhost:6379";
cache.DefaultTtl = TimeSpan.FromMinutes(5);
};
config.AssembliesToScan.AddRange(AppDomain.CurrentDomain.GetAssemblies());
});
π Action-First Pipeline API
Start Pipeline
// Start with a request object
PipelineExtensions.Start(command, serviceProvider)
PipelineExtensions.Start(query, serviceProvider)
PipelineExtensions.Start(event, serviceProvider)
// Start without initial data (for utility functions)
PipelineExtensions.Start(serviceProvider)
Process (Commands)
// Command without response (when TCommand : ICommand)
.Process<TCommand>()
// Command with typed response (when TCommand : ICommand<TResponse>)
.Process<TCommand, TResponse>()
Query (Read Operations)
// Query without caching
.Query<TQuery, TResponse>()
// Query with cache configuration using fluent API
.Query<TQuery, TResponse>(x => x
.UseCache("cache-key", TimeSpan.FromMinutes(10))
.WithSlidingExpiration())
// Query with simple cache configuration
.Query<TQuery, TResponse>(x => x.UseCache($"key:{someId}", TimeSpan.FromMinutes(5)))
Publish (Events)
// Publish event (when TEvent : IEvent)
.Publish<TEvent>()
Transform
// Transform current request to new type
.Transform<TNext>(current => new TNext { /* ... */ })
// Async transformation
.TransformAsync<TNext>(async current => await CreateNextAsync(current))
// Conditional transformation
.TransformIf<TNext>(
condition: current => current.IsValid,
transform: current => new TNext { /* ... */ })
// Conditional with true/false branches
.TransformIf<TNext>(
condition: current => current.Type == "Premium",
transformTrue: current => new PremiumAction { /* ... */ },
transformFalse: current => new StandardAction { /* ... */ })
// Empty pipeline transformation (when starting without data)
.Transform<TRequest>(() => new TRequest { /* ... */ })
.TransformAsync<TRequest>(async () => await CreateRequestAsync())
π Direct Dispatcher Usage
For scenarios where you don't need the full pipeline:
public class UserService
{
private readonly IDispatcher _dispatcher;
public UserService(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
}
public async Task<Guid> CreateUserAsync(string email, string name)
{
// Execute command
var command = new CreateUserCommand { Email = email, Name = name };
var result = await _dispatcher.DispatchCommandAsync<CreateUserCommand, Guid>(command);
if (result.IsFailure)
throw new InvalidOperationException(result.ErrorMessage);
// Publish event
await _dispatcher.PublishEventAsync(new UserCreatedEvent
{
UserId = result.Data,
Email = email
});
return result.Data;
}
public async Task<UserDto?> GetUserAsync(Guid userId)
{
// Execute query with caching
var query = new GetUserQuery { UserId = userId };
var cacheOptions = new CacheOptions
{
Enabled = true,
CacheKey = $"user:{userId}",
Ttl = TimeSpan.FromMinutes(10)
};
var result = await _dispatcher.DispatchQueryAsync<GetUserQuery, UserDto>(
query,
cacheOptions);
return result.IsSuccess ? result.Data : null;
}
}
π‘οΈ Resilience Features
Retry Policy
using Myth.Flow.Resilience;
var retryPolicy = new RetryPolicy(
maxAttempts: 3,
baseBackoffMs: 1000,
exponentialBackoff: true,
logger: logger);
var result = await retryPolicy.ExecuteAsync(async () =>
{
return await externalService.CallAsync();
});
Circuit Breaker
var circuitBreaker = new CircuitBreakerPolicy(
failureThreshold: 5,
openDuration: TimeSpan.FromSeconds(30),
logger: logger);
var result = await circuitBreaker.ExecuteAsync(async () =>
{
return await unreliableService.CallAsync();
});
// Check circuit state
if (circuitBreaker.State == CircuitState.Open)
{
// Circuit is open, service calls are blocked
}
Dead Letter Queue
services.AddFlowActions(config =>
{
config.BrokerType = MessageBrokerType.InMemory;
config.BrokerConfigurationFactory = () => new InMemoryBrokerOptions
{
EnableDeadLetterQueue = true,
MaxRetries = 3
};
});
// Access dead letter queue
public class MonitoringService
{
private readonly DeadLetterQueue _dlq;
public MonitoringService(DeadLetterQueue dlq)
{
_dlq = dlq;
}
public IEnumerable<DeadLetterMessage> GetFailedMessages()
{
return _dlq.GetAll();
}
public void RetryFailedMessage()
{
if (_dlq.TryDequeue(out var message))
{
// Retry processing the failed message
}
}
}
π Telemetry & Observability
OpenTelemetry Integration
services.AddFlowActions(config =>
{
config.TelemetryEnabled = true;
config.BrokerType = MessageBrokerType.InMemory;
config.AssembliesToScan.Add(typeof(Program).Assembly);
});
// Activities are automatically created with the following names:
// - Command.{CommandName}
// - Query.{QueryName}
// - Event.{EventName}
// - EventBus.Publish.{EventName}
// - EventHandler.{HandlerName}
Activity Tags
Each activity includes relevant tags:
pipeline.input.type: The context type namecache.hit: Whether the query result was served from cache- Additional custom tags from metadata
π― Advanced Patterns
Multiple Event Handlers
All handlers for an event execute in parallel:
public class UserCreatedEmailHandler : IEventHandler<UserCreatedEvent>
{
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
{
// Send welcome email
}
}
public class UserCreatedAnalyticsHandler : IEventHandler<UserCreatedEvent>
{
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
{
// Track analytics
}
}
public class UserCreatedNotificationHandler : IEventHandler<UserCreatedEvent>
{
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
{
// Send push notification
}
}
// All three handlers execute concurrently when event is published
High-Value Order Processing
// Action-first pipeline with conditional logic using TransformIf
var result = await PipelineExtensions
.Start(new ValidateOrderCommand { OrderId = orderId }, serviceProvider)
.Process<ValidateOrderCommand, OrderDto>()
.TransformIf<FraudCheckCommand>(
order => order.TotalAmount > 1000,
order => new FraudCheckCommand { OrderId = order.Id })
.Process<FraudCheckCommand>()
.Transform(fraudResult => new ProcessPaymentCommand { OrderId = orderId })
.Process<ProcessPaymentCommand>()
.Transform(paymentResult => new OrderCompletedEvent { OrderId = orderId })
.Publish<OrderCompletedEvent>()
.ExecuteAsync();
Order to Shipment Workflow
// Direct transformations between different action types
var result = await PipelineExtensions
.Start(new CreateOrderCommand
{
Items = items,
CustomerId = customerId,
ShippingAddress = address
}, serviceProvider)
.Process<CreateOrderCommand, Guid>() // Create order β OrderId
.Transform(orderId => new GetOrderQuery { OrderId = orderId }) // OrderId β Query
.Query<GetOrderQuery, OrderDto>() // Get full order details
.Transform(order => new CreateShipmentCommand // Order β Shipment command
{
OrderId = order.Id,
ShipmentId = Guid.NewGuid(),
Address = order.ShippingAddress,
Items = order.Items
})
.Process<CreateShipmentCommand, ShipmentDto>() // Process shipment
.Transform(shipment => new ShipmentCreatedEvent // Shipment β Event
{
OrderId = shipment.OrderId,
ShipmentId = shipment.Id,
TrackingNumber = shipment.TrackingNumber
})
.Publish<ShipmentCreatedEvent>() // Notify about shipment
.ExecuteAsync();
Report Generation Pipeline
// Utility pipeline starting without initial data
var result = await PipelineExtensions
.Start(serviceProvider)
.Transform(() => new GetMonthlyOrdersQuery { Month = DateTime.Now.Month })
.Query<GetMonthlyOrdersQuery, List<OrderDto>>(x => x.UseCache("monthly-orders", TimeSpan.FromHours(1)))
.Transform(orders => new GenerateReportCommand
{
Orders = orders,
ReportType = ReportType.Monthly,
GeneratedBy = currentUserId
})
.Process<GenerateReportCommand, ReportDto>()
.Transform(report => new ReportGeneratedEvent
{
ReportId = report.Id,
GeneratedBy = currentUserId
})
.Publish<ReportGeneratedEvent>()
.ExecuteAsync();
π§ͺ Testing
Testing Action-First Pipelines
using Xunit;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Myth.Flow.Actions.Extensions;
public class UserPipelineTests
{
private readonly IServiceProvider _serviceProvider;
public UserPipelineTests()
{
var services = new ServiceCollection();
services.AddLogging();
services.AddFlow();
services.AddFlowActions(config =>
{
config.UseInMemory()
.EnableCaching(cache => cache.ProviderType = CacheProviderType.Memory)
.ScanAssemblies(typeof(CreateUserCommand).Assembly);
});
services.AddScoped<IUserRepository, InMemoryUserRepository>();
services.AddScoped<IEmailService, FakeEmailService>();
_serviceProvider = services.BuildServiceProvider();
}
[Fact]
public async Task CreateUser_WithActionFirstAPI_ShouldSucceed()
{
// Arrange
var command = new CreateUserCommand
{
Email = "test@example.com",
Name = "Test User"
};
// Act - Using action-first API
var result = await PipelineExtensions
.Start(command, _serviceProvider)
.Process<CreateUserCommand, Guid>()
.ExecuteAsync();
// Assert
result.IsSuccess.Should().BeTrue();
result.Value.Should().NotBe(Guid.Empty);
}
[Fact]
public async Task CompleteUserWorkflow_ShouldChainOperations()
{
// Arrange
var command = new CreateUserCommand
{
Email = "workflow@example.com",
Name = "Workflow User"
};
// Act - Chain multiple operations
var result = await PipelineExtensions
.Start(command, _serviceProvider)
.Process<CreateUserCommand, Guid>() // Create user
.Transform(userId => new GetUserQuery { UserId = userId }) // Transform to query
.Query<GetUserQuery, UserDto>(x => x.UseCache($"user:{userId}", TimeSpan.FromMinutes(5))) // Get user with cache
.Transform(user => new UserCreatedEvent { UserId = user.Id, Email = user.Email }) // Transform to event
.Publish<UserCreatedEvent>() // Publish event
.ExecuteAsync();
// Assert
result.IsSuccess.Should().BeTrue();
result.Value.Should().NotBeNull();
}
[Fact]
public async Task EmptyPipeline_WithTransforms_ShouldWork()
{
// Act - Start without initial data
var result = await PipelineExtensions
.Start(_serviceProvider)
.Transform(() => new GetActiveUsersQuery())
.Query<GetActiveUsersQuery, List<UserDto>>()
.ExecuteAsync();
// Assert
result.IsSuccess.Should().BeTrue();
result.Value.Should().NotBeNull();
}
[Fact]
public async Task ConditionalWorkflow_ShouldExecuteBasedOnCondition()
{
// Arrange
var command = new CreateUserCommand
{
Email = "premium@example.com",
Name = "Premium User"
};
// Act - Conditional transformation
var result = await PipelineExtensions
.Start(command, _serviceProvider)
.Process<CreateUserCommand, Guid>()
.Transform(userId => new GetUserQuery { UserId = userId })
.Query<GetUserQuery, UserDto>()
.TransformIf<SendWelcomeEmailCommand>(
user => user.Email.Contains("premium"),
user => new SendWelcomeEmailCommand { Email = user.Email, IsPremium = true })
.Process<SendWelcomeEmailCommand>()
.ExecuteAsync();
// Assert
result.IsSuccess.Should().BeTrue();
}
}
## Testing Individual Handlers
```csharp
public class CreateUserCommandHandlerTests
{
[Fact]
public async Task Handle_WithValidCommand_ShouldReturnSuccess()
{
// Arrange
var repository = new InMemoryUserRepository();
var handler = new CreateUserCommandHandler(repository);
var command = new CreateUserCommand
{
Email = "test@example.com",
Name = "Test User"
};
// Act
var result = await handler.HandleAsync(command);
// Assert
result.IsSuccess.Should().BeTrue();
result.Data.Should().NotBe(Guid.Empty);
}
}
ποΈ Architecture
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Myth.Flow Pipeline β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β .Process() β .Query() β .Publish() β .When() β .Tap()β
ββββββββββββββββ΄βββββββββββββ΄βββββββββββββββ΄ββββββββββββ΄ββββββββ
βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IDispatcher β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β DispatchCommandAsync β DispatchQueryAsync β PublishEventβ
ββββββββββββββββββββββββββ΄βββββββββββββββββββββββ΄βββββββββββββββ
βΌ
βββββββββββββββββββββββ¬βββββββββββββββββββββ¬ββββββββββββββββββββ
β Command Handlers β Query Handlers β IEventBus β
β (Write Operations) β (Read + Cache) β (Pub/Sub) β
βββββββββββββββββββββββ΄βββββββββββββββββββββ΄ββββββββββββββββββββ
βΌ
ββββββββββββββββββββββββββββββββββββ
β IMessageBroker β
ββββββββββββββββββββββββββββββββββββ€
β InMemory β Kafka β RabbitMQ β
ββββββββββββββββββββββββββββββββββββ
βΌ
ββββββββββββββββββββββββββββββββββββ
β Event Handlers β
ββββββββββββββββββββββββββββββββββββ€
β Parallel execution per event β
ββββββββββββββββββββββββββββββββββββ
π― Best Practices
- Commands: Use for state-changing operations, imperative naming (CreateUser, UpdateOrder)
- Queries: Use for read operations, leverage caching, prefix with Get/Find
- Events: Use for decoupled communication, past tense naming (UserCreated, OrderProcessed)
- Handlers: Keep focused and testable, single responsibility principle
- Pipeline: Chain operations logically, use .When() for conditional flows
- Testing: Use InMemory broker for fast, isolated unit tests
- Production: Use Kafka/RabbitMQ with retry policies and dead letter queues
- Caching: Cache expensive queries, use appropriate TTL values
- Telemetry: Enable for production to track command/query/event flows
- Result Pattern: Always check IsSuccess before accessing Data
π Naming Conventions
- Commands:
{Verb}{Noun}Command(CreateUserCommand, UpdateOrderCommand) - Queries:
{Get|Find}{Noun}Query(GetUserQuery, FindOrdersQuery) - Events:
{Noun}{PastTenseVerb}Event(UserCreatedEvent, OrderProcessedEvent) - Handlers:
{Request}Handler(CreateUserCommandHandler, UserCreatedEventHandler) - Results: Use CommandResult, QueryResult with proper success/failure handling
π€ Contributing
Contributions are welcome! Please follow the existing code style and add tests for new features.
π License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
π Related Projects
- Myth.Flow - Core pipeline orchestration framework
- Myth.Commons - Common utilities and extensions
- Myth.Repository - Repository pattern implementation
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.11.1)
- Microsoft.Extensions.Caching.Memory (>= 8.0.1)
- Microsoft.Extensions.Caching.StackExchangeRedis (>= 8.0.2)
- Microsoft.Extensions.DependencyInjection (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.2)
- Myth.Commons (>= 3.0.4-preview.9)
- Myth.Flow (>= 3.0.4-preview.9)
- RabbitMQ.Client (>= 7.1.2)
- System.Diagnostics.DiagnosticSource (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 4.3.0-preview.2 | 123 | 12/22/2025 |
| 4.2.1-preview.1 | 612 | 12/2/2025 |
| 4.2.0 | 411 | 11/30/2025 |
| 4.2.0-preview.1 | 66 | 11/29/2025 |
| 4.1.0 | 291 | 11/27/2025 |
| 4.1.0-preview.3 | 127 | 11/27/2025 |
| 4.1.0-preview.2 | 129 | 11/27/2025 |
| 4.1.0-preview.1 | 131 | 11/26/2025 |
| 4.0.1 | 150 | 11/22/2025 |
| 4.0.1-preview.8 | 150 | 11/22/2025 |
| 4.0.1-preview.7 | 153 | 11/22/2025 |
| 4.0.1-preview.6 | 135 | 11/22/2025 |
| 4.0.1-preview.5 | 199 | 11/21/2025 |
| 4.0.1-preview.4 | 208 | 11/21/2025 |
| 4.0.1-preview.3 | 212 | 11/21/2025 |
| 4.0.1-preview.2 | 237 | 11/21/2025 |
| 4.0.1-preview.1 | 248 | 11/21/2025 |
| 4.0.0 | 388 | 11/20/2025 |
| 4.0.0-preview.3 | 342 | 11/19/2025 |
| 4.0.0-preview.2 | 89 | 11/15/2025 |
| 4.0.0-preview.1 | 108 | 11/15/2025 |
| 3.10.0 | 161 | 11/15/2025 |
| 3.0.5-preview.15 | 151 | 11/14/2025 |
| 3.0.5-preview.14 | 219 | 11/12/2025 |
| 3.0.5-preview.13 | 226 | 11/12/2025 |
| 3.0.5-preview.12 | 223 | 11/11/2025 |
| 3.0.5-preview.11 | 227 | 11/11/2025 |
| 3.0.5-preview.10 | 225 | 11/11/2025 |
| 3.0.5-preview.9 | 216 | 11/10/2025 |
| 3.0.5-preview.8 | 84 | 11/8/2025 |
| 3.0.5-preview.7 | 88 | 11/8/2025 |
| 3.0.5-preview.6 | 88 | 11/8/2025 |
| 3.0.5-preview.5 | 89 | 11/8/2025 |
| 3.0.5-preview.4 | 61 | 11/7/2025 |
| 3.0.5-preview.3 | 131 | 11/4/2025 |
| 3.0.5-preview.2 | 136 | 11/4/2025 |
| 3.0.5-preview.1 | 134 | 11/4/2025 |
| 3.0.4 | 186 | 11/3/2025 |
| 3.0.4-preview.19 | 74 | 11/2/2025 |
| 3.0.4-preview.18 | 76 | 11/1/2025 |
| 3.0.4-preview.17 | 74 | 11/1/2025 |
| 3.0.4-preview.16 | 81 | 11/1/2025 |
| 3.0.4-preview.15 | 71 | 10/31/2025 |
| 3.0.4-preview.14 | 145 | 10/31/2025 |
| 3.0.4-preview.13 | 136 | 10/30/2025 |
| 3.0.4-preview.12 | 126 | 10/23/2025 |
| 3.0.4-preview.11 | 124 | 10/23/2025 |
| 3.0.4-preview.10 | 125 | 10/23/2025 |
| 3.0.4-preview.9 | 119 | 10/23/2025 |
| 3.0.4-preview.8 | 124 | 10/22/2025 |
| 3.0.4-preview.6 | 120 | 10/21/2025 |
| 3.0.4-preview.5 | 120 | 10/21/2025 |
| 3.0.4-preview.4 | 121 | 10/20/2025 |
| 3.0.4-preview.3 | 126 | 10/20/2025 |
| 3.0.4-preview.2 | 43 | 10/18/2025 |