Myth.Flow.Actions 3.0.4-preview.9

This is a prerelease version of Myth.Flow.Actions.
There is a newer version of this package available.
See the version list below for details.
dotnet add package Myth.Flow.Actions --version 3.0.4-preview.9
                    
NuGet\Install-Package Myth.Flow.Actions -Version 3.0.4-preview.9
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Myth.Flow.Actions" Version="3.0.4-preview.9" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Myth.Flow.Actions" Version="3.0.4-preview.9" />
                    
Directory.Packages.props
<PackageReference Include="Myth.Flow.Actions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Myth.Flow.Actions --version 3.0.4-preview.9
                    
#r "nuget: Myth.Flow.Actions, 3.0.4-preview.9"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Myth.Flow.Actions@3.0.4-preview.9
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Myth.Flow.Actions&version=3.0.4-preview.9&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Myth.Flow.Actions&version=3.0.4-preview.9&prerelease
                    
Install as a Cake Tool

Myth.Flow.Actions

NuGet Version NuGet Version

License

pt-br en

A powerful .NET library implementing CQRS and Event-Driven Architecture patterns with seamless integration to Myth.Flow pipelines. Built for scalability with support for multiple message brokers, caching strategies, and enterprise-grade resilience features.

πŸš€ Action-First Pipeline API

This library features a revolutionary Action-First approach that eliminates context boilerplate and dramatically simplifies pipeline development:

// ❌ OLD: Context-based (lots of boilerplate)
Pipeline.Start(context)
    .Process<Context, Command>(ctx => new Command { ... }, (ctx, result) => ctx.Result = result)

// βœ… NEW: Action-First (clean and direct)
Pipeline.Start(new Command { ... }, serviceProvider)
    .Process<Command, Result>()

Benefits:

  • 70% less boilerplate code - No context classes needed
  • Type-safe transformations - Direct object-to-object pipeline flow
  • Intuitive developer experience - Actions ready for execution
  • Fluent cache configuration - x => x.UseCache("key", TimeSpan.FromMinutes(5))
  • Utility pipelines - Start without parameters for functional scenarios

⭐ Features

  • Action-First API: Revolutionary pipeline approach with zero context boilerplate
  • CQRS Pattern: Clean separation of Commands, Queries, and Events
  • Pipeline Integration: Fluent integration with Myth.Flow for composable workflows
  • Multiple Message Brokers: InMemory (dev/test), Kafka, and RabbitMQ support
  • Query Caching: Built-in caching with Memory and Redis providers with fluent configuration
  • Event-Driven Architecture: Publish/subscribe with multiple handler support
  • Resilience Patterns: Retry policies with exponential backoff, circuit breakers, and dead letter queues
  • Auto-Discovery: Automatic handler registration via assembly scanning
  • OpenTelemetry Integration: Built-in distributed tracing and observability
  • Type Safety: Fully typed APIs with compile-time safety

πŸ“¦ Installation

dotnet add package Myth.Flow.Actions

Optional Dependencies

# For Kafka support
dotnet add package Confluent.Kafka

# For RabbitMQ support
dotnet add package RabbitMQ.Client

# For Redis distributed caching
dotnet add package Microsoft.Extensions.Caching.StackExchangeRedis

πŸš€ Quick Start

1. Configure Services

using Myth.Flow.Actions.Extensions;
using Myth.Flow.Actions.Settings;

builder.Services.AddFlowActions(config =>
{
    config.BrokerType = MessageBrokerType.InMemory;
    config.TelemetryEnabled = true;
    config.CachingEnabled = true;
    config.AssembliesToScan.Add(typeof(Program).Assembly);
});

2. Define Commands, Queries, and Events

Command

using Myth.Interfaces;
using Myth.Models;

public record CreateUserCommand : ICommand<Guid>
{
    public required string Email { get; init; }
    public required string Name { get; init; }
}

public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand, Guid>
{
    private readonly IUserRepository _repository;

    public CreateUserCommandHandler(IUserRepository repository)
    {
        _repository = repository;
    }

    public async Task<CommandResult<Guid>> HandleAsync(
        CreateUserCommand command,
        CancellationToken cancellationToken = default)
    {
        var user = new User
        {
            Id = Guid.NewGuid(),
            Email = command.Email,
            Name = command.Name
        };

        await _repository.AddAsync(user, cancellationToken);

        return CommandResult<Guid>.Success(user.Id);
    }
}

Query

public record GetUserQuery : IQuery<UserDto>
{
    public required Guid UserId { get; init; }
}

public class GetUserQueryHandler : IQueryHandler<GetUserQuery, UserDto>
{
    private readonly IUserRepository _repository;

    public GetUserQueryHandler(IUserRepository repository)
    {
        _repository = repository;
    }

    public async Task<QueryResult<UserDto>> HandleAsync(
        GetUserQuery query,
        CancellationToken cancellationToken = default)
    {
        var user = await _repository.GetByIdAsync(query.UserId, cancellationToken);

        if (user == null)
            return QueryResult<UserDto>.Failure("User not found");

        var dto = new UserDto
        {
            Id = user.Id,
            Email = user.Email,
            Name = user.Name
        };

        return QueryResult<UserDto>.Success(dto);
    }
}

Event

public record UserCreatedEvent : DomainEvent
{
    public required Guid UserId { get; init; }
    public required string Email { get; init; }
}

public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;

    public UserCreatedEventHandler(IEmailService emailService)
    {
        _emailService = emailService;
    }

    public async Task HandleAsync(
        UserCreatedEvent @event,
        CancellationToken cancellationToken = default)
    {
        await _emailService.SendWelcomeEmailAsync(@event.Email, cancellationToken);
    }
}

3. Use Action-First Pipeline

Simple Pipeline Example

using Myth.Flow.Actions;

// Direct action execution - no context needed!
var result = await Pipeline
    .Start(new CreateUserCommand { Email = "user@example.com", Name = "John Doe" }, serviceProvider)
    .Process<CreateUserCommand, Guid>()
    .ExecuteAsync();

if (result.IsSuccess)
{
    Console.WriteLine($"User created with ID: {result.Value}");
}

Complex Workflow Example

// Chain operations with transformations
var result = await Pipeline
    .Start(new CreateUserCommand { Email = "user@example.com", Name = "John Doe" }, serviceProvider)
    .Process<CreateUserCommand, Guid>()                                        // Command β†’ Guid
    .Transform(userId => new GetUserQuery { UserId = userId })                 // Guid β†’ Query
    .Query<GetUserQuery, UserDto>(x => x.UseCache($"user:{userId}", TimeSpan.FromMinutes(10))) // Query with cache
    .Transform(user => new UserCreatedEvent { UserId = user.Id, Email = user.Email })          // User β†’ Event
    .Publish<UserCreatedEvent>()                                               // Publish event
    .ExecuteAsync();

if (result.IsSuccess)
{
    Console.WriteLine("User creation workflow completed successfully!");
}

Utility Pipeline (Empty Start)

// Start without initial data for utility functions
var result = await PipelineExtensions
    .Start(serviceProvider)
    .Transform(() => new GetActiveUsersQuery())
    .Query<GetActiveUsersQuery, List<UserDto>>()
    .Transform(users => new GenerateReportCommand { Users = users })
    .Process<GenerateReportCommand, ReportDto>()
    .ExecuteAsync();

πŸ› οΈ Intermediate Pipeline Steps

The Action-First API now supports all Myth.Flow pipeline methods for adding custom logic, validation, telemetry, and resilience patterns between action operations:

Validation and Custom Steps

var result = await Pipeline
    .Start(new CreateUserCommand { Email = "user@example.com" }, serviceProvider)
    // Validate input before processing
    .Step<IValidationService>((validator, state) => {
        validator.ValidateEmail(state.CurrentRequest!.Email);
        return state;
    })
    // Add custom business logic
    .StepAsync<IUserService>(async (userService, state) => {
        await userService.CheckUserLimitsAsync(state.CurrentRequest!.Email);
        return state;
    })
    .Process<CreateUserCommand, Guid>()
    .ExecuteAsync();

Side Effects and Logging

var result = await Pipeline
    .Start(new GetUserQuery { UserId = userId }, serviceProvider)
    // Log the start of the operation
    .Tap<ILogger>((logger, state) =>
        logger.LogInformation("Querying user {UserId}", state.CurrentRequest!.UserId))
    .Query<GetUserQuery, UserDto>()
    // Log after successful query
    .TapAsync<IMetricsService>(async (metrics, state) =>
        await metrics.RecordQueryExecutionAsync("GetUser"))
    .ExecuteAsync();

Conditional Execution

var result = await Pipeline
    .Start(new ProcessOrderCommand { OrderId = orderId }, serviceProvider)
    // Only validate payment if order requires it
    .When(state => state.CurrentRequest!.RequiresPayment, builder =>
        builder.StepAsync<IPaymentService>((payment, state) =>
            payment.ValidatePaymentMethodAsync(state.CurrentRequest!.PaymentInfo)))
    .Process<ProcessOrderCommand, OrderResult>()
    .ExecuteAsync();

Resilience and Telemetry

var result = await Pipeline
    .Start(new CallExternalApiQuery { Endpoint = "users" }, serviceProvider)
    // Add telemetry tracking
    .WithTelemetry("ExternalApiCall")
    // Configure retry policy for external calls
    .WithRetry(maxAttempts: 3, backoffMs: 1000)
    .Query<CallExternalApiQuery, ApiResponse>()
    .ExecuteAsync();

Complex Workflows with Multiple Steps

var result = await Pipeline
    .Start(new CreateOrderCommand { CustomerId = customerId, Items = items }, serviceProvider)
    // Validate customer
    .StepAsync<ICustomerService>(async (service, state) => {
        await service.ValidateCustomerAsync(state.CurrentRequest!.CustomerId);
        return state;
    })
    // Check inventory
    .StepAsync<IInventoryService>(async (service, state) => {
        await service.ReserveItemsAsync(state.CurrentRequest!.Items);
        return state;
    })
    // Log before processing
    .Tap<ILogger>((logger, state) =>
        logger.LogInformation("Processing order for customer {CustomerId}",
            state.CurrentRequest!.CustomerId))
    // Process the order
    .Process<CreateOrderCommand, OrderResult>()
    // Send notification after success
    .TapAsync<INotificationService>(async (notifications, state) =>
        await notifications.SendOrderConfirmationAsync(state.CurrentRequest!))
    .ExecuteAsync();

Available Intermediate Methods

  • Step<TService>() - Synchronous operations with dependency injection
  • StepAsync<TService>() - Asynchronous operations with dependency injection
  • StepResult<TService>() - Operations returning Result<T> for error handling
  • StepResultAsync<TService>() - Async operations returning Result<T>
  • Tap<TService>() - Side effects (logging, metrics, events) with DI
  • TapAsync<TService>() - Async side effects with DI
  • When(predicate, configure) - Conditional pipeline execution
  • WithRetry(maxAttempts, backoffMs) - Retry policies with exponential backoff
  • WithTelemetry(operationName) - OpenTelemetry distributed tracing

All methods maintain the fluent API design and can be chained together for complex workflows while preserving type safety and the Action-First approach.

πŸ”§ Configuration

InMemory Broker

services.AddFlowActions(config =>
{
    config.BrokerType = MessageBrokerType.InMemory;
    config.AssembliesToScan.Add(typeof(Program).Assembly);
});

Kafka

services.AddFlowActions(config =>
{
    config.BrokerType = MessageBrokerType.Kafka;
    config.BrokerConfigurationFactory = () => new KafkaOptions
    {
        BootstrapServers = "localhost:9092",
        GroupId = "my-service",
        ClientId = "my-service-instance-1",
        EnableAutoCommit = false,
        SessionTimeoutMs = 30000,
        AutoOffsetReset = "earliest"
    };
    config.TelemetryEnabled = true;
    config.AssembliesToScan.Add(typeof(Program).Assembly);
});

RabbitMQ

services.AddFlowActions(config =>
{
    config.BrokerType = MessageBrokerType.RabbitMQ;
    config.BrokerConfigurationFactory = () => new RabbitMQOptions
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "guest",
        Password = "guest",
        VirtualHost = "/",
        ExchangeName = "my-service-events",
        ExchangeType = "topic"
    };
    config.CachingEnabled = true;
    config.CacheConfiguration = cache =>
    {
        cache.ProviderType = CacheProviderType.Distributed;
        cache.ConnectionString = "localhost:6379";
        cache.DefaultTtl = TimeSpan.FromMinutes(5);
    };
    config.AssembliesToScan.AddRange(AppDomain.CurrentDomain.GetAssemblies());
});

πŸ“š Action-First Pipeline API

Start Pipeline

// Start with a request object
PipelineExtensions.Start(command, serviceProvider)
PipelineExtensions.Start(query, serviceProvider)
PipelineExtensions.Start(event, serviceProvider)

// Start without initial data (for utility functions)
PipelineExtensions.Start(serviceProvider)

Process (Commands)

// Command without response (when TCommand : ICommand)
.Process<TCommand>()

// Command with typed response (when TCommand : ICommand<TResponse>)
.Process<TCommand, TResponse>()

Query (Read Operations)

// Query without caching
.Query<TQuery, TResponse>()

// Query with cache configuration using fluent API
.Query<TQuery, TResponse>(x => x
    .UseCache("cache-key", TimeSpan.FromMinutes(10))
    .WithSlidingExpiration())

// Query with simple cache configuration
.Query<TQuery, TResponse>(x => x.UseCache($"key:{someId}", TimeSpan.FromMinutes(5)))

Publish (Events)

// Publish event (when TEvent : IEvent)
.Publish<TEvent>()

Transform

// Transform current request to new type
.Transform<TNext>(current => new TNext { /* ... */ })

// Async transformation
.TransformAsync<TNext>(async current => await CreateNextAsync(current))

// Conditional transformation
.TransformIf<TNext>(
    condition: current => current.IsValid,
    transform: current => new TNext { /* ... */ })

// Conditional with true/false branches
.TransformIf<TNext>(
    condition: current => current.Type == "Premium",
    transformTrue: current => new PremiumAction { /* ... */ },
    transformFalse: current => new StandardAction { /* ... */ })

// Empty pipeline transformation (when starting without data)
.Transform<TRequest>(() => new TRequest { /* ... */ })
.TransformAsync<TRequest>(async () => await CreateRequestAsync())

πŸ” Direct Dispatcher Usage

For scenarios where you don't need the full pipeline:

public class UserService
{
    private readonly IDispatcher _dispatcher;

    public UserService(IDispatcher dispatcher)
    {
        _dispatcher = dispatcher;
    }

    public async Task<Guid> CreateUserAsync(string email, string name)
    {
        // Execute command
        var command = new CreateUserCommand { Email = email, Name = name };
        var result = await _dispatcher.DispatchCommandAsync<CreateUserCommand, Guid>(command);

        if (result.IsFailure)
            throw new InvalidOperationException(result.ErrorMessage);

        // Publish event
        await _dispatcher.PublishEventAsync(new UserCreatedEvent
        {
            UserId = result.Data,
            Email = email
        });

        return result.Data;
    }

    public async Task<UserDto?> GetUserAsync(Guid userId)
    {
        // Execute query with caching
        var query = new GetUserQuery { UserId = userId };
        var cacheOptions = new CacheOptions
        {
            Enabled = true,
            CacheKey = $"user:{userId}",
            Ttl = TimeSpan.FromMinutes(10)
        };

        var result = await _dispatcher.DispatchQueryAsync<GetUserQuery, UserDto>(
            query,
            cacheOptions);

        return result.IsSuccess ? result.Data : null;
    }
}

πŸ›‘οΈ Resilience Features

Retry Policy

using Myth.Flow.Resilience;

var retryPolicy = new RetryPolicy(
    maxAttempts: 3,
    baseBackoffMs: 1000,
    exponentialBackoff: true,
    logger: logger);

var result = await retryPolicy.ExecuteAsync(async () =>
{
    return await externalService.CallAsync();
});

Circuit Breaker

var circuitBreaker = new CircuitBreakerPolicy(
    failureThreshold: 5,
    openDuration: TimeSpan.FromSeconds(30),
    logger: logger);

var result = await circuitBreaker.ExecuteAsync(async () =>
{
    return await unreliableService.CallAsync();
});

// Check circuit state
if (circuitBreaker.State == CircuitState.Open)
{
    // Circuit is open, service calls are blocked
}

Dead Letter Queue

services.AddFlowActions(config =>
{
    config.BrokerType = MessageBrokerType.InMemory;
    config.BrokerConfigurationFactory = () => new InMemoryBrokerOptions
    {
        EnableDeadLetterQueue = true,
        MaxRetries = 3
    };
});

// Access dead letter queue
public class MonitoringService
{
    private readonly DeadLetterQueue _dlq;

    public MonitoringService(DeadLetterQueue dlq)
    {
        _dlq = dlq;
    }

    public IEnumerable<DeadLetterMessage> GetFailedMessages()
    {
        return _dlq.GetAll();
    }

    public void RetryFailedMessage()
    {
        if (_dlq.TryDequeue(out var message))
        {
            // Retry processing the failed message
        }
    }
}

πŸ“Š Telemetry & Observability

OpenTelemetry Integration

services.AddFlowActions(config =>
{
    config.TelemetryEnabled = true;
    config.BrokerType = MessageBrokerType.InMemory;
    config.AssembliesToScan.Add(typeof(Program).Assembly);
});

// Activities are automatically created with the following names:
// - Command.{CommandName}
// - Query.{QueryName}
// - Event.{EventName}
// - EventBus.Publish.{EventName}
// - EventHandler.{HandlerName}

Activity Tags

Each activity includes relevant tags:

  • pipeline.input.type: The context type name
  • cache.hit: Whether the query result was served from cache
  • Additional custom tags from metadata

🎯 Advanced Patterns

Multiple Event Handlers

All handlers for an event execute in parallel:

public class UserCreatedEmailHandler : IEventHandler<UserCreatedEvent>
{
    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
    {
        // Send welcome email
    }
}

public class UserCreatedAnalyticsHandler : IEventHandler<UserCreatedEvent>
{
    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
    {
        // Track analytics
    }
}

public class UserCreatedNotificationHandler : IEventHandler<UserCreatedEvent>
{
    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken ct)
    {
        // Send push notification
    }
}

// All three handlers execute concurrently when event is published

High-Value Order Processing

// Action-first pipeline with conditional logic using TransformIf
var result = await PipelineExtensions
    .Start(new ValidateOrderCommand { OrderId = orderId }, serviceProvider)
    .Process<ValidateOrderCommand, OrderDto>()
    .TransformIf<FraudCheckCommand>(
        order => order.TotalAmount > 1000,
        order => new FraudCheckCommand { OrderId = order.Id })
    .Process<FraudCheckCommand>()
    .Transform(fraudResult => new ProcessPaymentCommand { OrderId = orderId })
    .Process<ProcessPaymentCommand>()
    .Transform(paymentResult => new OrderCompletedEvent { OrderId = orderId })
    .Publish<OrderCompletedEvent>()
    .ExecuteAsync();

Order to Shipment Workflow

// Direct transformations between different action types
var result = await PipelineExtensions
    .Start(new CreateOrderCommand
    {
        Items = items,
        CustomerId = customerId,
        ShippingAddress = address
    }, serviceProvider)
    .Process<CreateOrderCommand, Guid>()                           // Create order β†’ OrderId
    .Transform(orderId => new GetOrderQuery { OrderId = orderId }) // OrderId β†’ Query
    .Query<GetOrderQuery, OrderDto>()                              // Get full order details
    .Transform(order => new CreateShipmentCommand                  // Order β†’ Shipment command
    {
        OrderId = order.Id,
        ShipmentId = Guid.NewGuid(),
        Address = order.ShippingAddress,
        Items = order.Items
    })
    .Process<CreateShipmentCommand, ShipmentDto>()                 // Process shipment
    .Transform(shipment => new ShipmentCreatedEvent               // Shipment β†’ Event
    {
        OrderId = shipment.OrderId,
        ShipmentId = shipment.Id,
        TrackingNumber = shipment.TrackingNumber
    })
    .Publish<ShipmentCreatedEvent>()                              // Notify about shipment
    .ExecuteAsync();

Report Generation Pipeline

// Utility pipeline starting without initial data
var result = await PipelineExtensions
    .Start(serviceProvider)
    .Transform(() => new GetMonthlyOrdersQuery { Month = DateTime.Now.Month })
    .Query<GetMonthlyOrdersQuery, List<OrderDto>>(x => x.UseCache("monthly-orders", TimeSpan.FromHours(1)))
    .Transform(orders => new GenerateReportCommand
    {
        Orders = orders,
        ReportType = ReportType.Monthly,
        GeneratedBy = currentUserId
    })
    .Process<GenerateReportCommand, ReportDto>()
    .Transform(report => new ReportGeneratedEvent
    {
        ReportId = report.Id,
        GeneratedBy = currentUserId
    })
    .Publish<ReportGeneratedEvent>()
    .ExecuteAsync();

πŸ§ͺ Testing

Testing Action-First Pipelines

using Xunit;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Myth.Flow.Actions.Extensions;

public class UserPipelineTests
{
    private readonly IServiceProvider _serviceProvider;

    public UserPipelineTests()
    {
        var services = new ServiceCollection();
        services.AddLogging();
        services.AddFlow();
        services.AddFlowActions(config =>
        {
            config.UseInMemory()
                   .EnableCaching(cache => cache.ProviderType = CacheProviderType.Memory)
                   .ScanAssemblies(typeof(CreateUserCommand).Assembly);
        });

        services.AddScoped<IUserRepository, InMemoryUserRepository>();
        services.AddScoped<IEmailService, FakeEmailService>();

        _serviceProvider = services.BuildServiceProvider();
    }

    [Fact]
    public async Task CreateUser_WithActionFirstAPI_ShouldSucceed()
    {
        // Arrange
        var command = new CreateUserCommand
        {
            Email = "test@example.com",
            Name = "Test User"
        };

        // Act - Using action-first API
        var result = await PipelineExtensions
            .Start(command, _serviceProvider)
            .Process<CreateUserCommand, Guid>()
            .ExecuteAsync();

        // Assert
        result.IsSuccess.Should().BeTrue();
        result.Value.Should().NotBe(Guid.Empty);
    }

    [Fact]
    public async Task CompleteUserWorkflow_ShouldChainOperations()
    {
        // Arrange
        var command = new CreateUserCommand
        {
            Email = "workflow@example.com",
            Name = "Workflow User"
        };

        // Act - Chain multiple operations
        var result = await PipelineExtensions
            .Start(command, _serviceProvider)
            .Process<CreateUserCommand, Guid>()                                        // Create user
            .Transform(userId => new GetUserQuery { UserId = userId })                 // Transform to query
            .Query<GetUserQuery, UserDto>(x => x.UseCache($"user:{userId}", TimeSpan.FromMinutes(5))) // Get user with cache
            .Transform(user => new UserCreatedEvent { UserId = user.Id, Email = user.Email })          // Transform to event
            .Publish<UserCreatedEvent>()                                               // Publish event
            .ExecuteAsync();

        // Assert
        result.IsSuccess.Should().BeTrue();
        result.Value.Should().NotBeNull();
    }

    [Fact]
    public async Task EmptyPipeline_WithTransforms_ShouldWork()
    {
        // Act - Start without initial data
        var result = await PipelineExtensions
            .Start(_serviceProvider)
            .Transform(() => new GetActiveUsersQuery())
            .Query<GetActiveUsersQuery, List<UserDto>>()
            .ExecuteAsync();

        // Assert
        result.IsSuccess.Should().BeTrue();
        result.Value.Should().NotBeNull();
    }

    [Fact]
    public async Task ConditionalWorkflow_ShouldExecuteBasedOnCondition()
    {
        // Arrange
        var command = new CreateUserCommand
        {
            Email = "premium@example.com",
            Name = "Premium User"
        };

        // Act - Conditional transformation
        var result = await PipelineExtensions
            .Start(command, _serviceProvider)
            .Process<CreateUserCommand, Guid>()
            .Transform(userId => new GetUserQuery { UserId = userId })
            .Query<GetUserQuery, UserDto>()
            .TransformIf<SendWelcomeEmailCommand>(
                user => user.Email.Contains("premium"),
                user => new SendWelcomeEmailCommand { Email = user.Email, IsPremium = true })
            .Process<SendWelcomeEmailCommand>()
            .ExecuteAsync();

        // Assert
        result.IsSuccess.Should().BeTrue();
    }
}

## Testing Individual Handlers

```csharp
public class CreateUserCommandHandlerTests
{
    [Fact]
    public async Task Handle_WithValidCommand_ShouldReturnSuccess()
    {
        // Arrange
        var repository = new InMemoryUserRepository();
        var handler = new CreateUserCommandHandler(repository);
        var command = new CreateUserCommand
        {
            Email = "test@example.com",
            Name = "Test User"
        };

        // Act
        var result = await handler.HandleAsync(command);

        // Assert
        result.IsSuccess.Should().BeTrue();
        result.Data.Should().NotBe(Guid.Empty);
    }
}

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Myth.Flow Pipeline                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  .Process()  β”‚  .Query()  β”‚  .Publish()  β”‚  .When()  β”‚ .Tap()β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜
                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                       IDispatcher                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  DispatchCommandAsync  β”‚  DispatchQueryAsync  β”‚  PublishEventβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Command Handlers   β”‚  Query Handlers    β”‚    IEventBus      β”‚
β”‚  (Write Operations) β”‚  (Read + Cache)    β”‚  (Pub/Sub)        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                               β–Ό
                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                            β”‚      IMessageBroker              β”‚
                            β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
                            β”‚ InMemory β”‚ Kafka β”‚ RabbitMQ      β”‚
                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                               β–Ό
                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                            β”‚      Event Handlers              β”‚
                            β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
                            β”‚ Parallel execution per event     β”‚
                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

🎯 Best Practices

  1. Commands: Use for state-changing operations, imperative naming (CreateUser, UpdateOrder)
  2. Queries: Use for read operations, leverage caching, prefix with Get/Find
  3. Events: Use for decoupled communication, past tense naming (UserCreated, OrderProcessed)
  4. Handlers: Keep focused and testable, single responsibility principle
  5. Pipeline: Chain operations logically, use .When() for conditional flows
  6. Testing: Use InMemory broker for fast, isolated unit tests
  7. Production: Use Kafka/RabbitMQ with retry policies and dead letter queues
  8. Caching: Cache expensive queries, use appropriate TTL values
  9. Telemetry: Enable for production to track command/query/event flows
  10. Result Pattern: Always check IsSuccess before accessing Data

πŸ“ Naming Conventions

  • Commands: {Verb}{Noun}Command (CreateUserCommand, UpdateOrderCommand)
  • Queries: {Get|Find}{Noun}Query (GetUserQuery, FindOrdersQuery)
  • Events: {Noun}{PastTenseVerb}Event (UserCreatedEvent, OrderProcessedEvent)
  • Handlers: {Request}Handler (CreateUserCommandHandler, UserCreatedEventHandler)
  • Results: Use CommandResult, QueryResult with proper success/failure handling

🀝 Contributing

Contributions are welcome! Please follow the existing code style and add tests for new features.

πŸ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
4.3.0-preview.2 123 12/22/2025
4.2.1-preview.1 612 12/2/2025
4.2.0 411 11/30/2025
4.2.0-preview.1 66 11/29/2025
4.1.0 291 11/27/2025
4.1.0-preview.3 127 11/27/2025
4.1.0-preview.2 129 11/27/2025
4.1.0-preview.1 131 11/26/2025
4.0.1 150 11/22/2025
4.0.1-preview.8 150 11/22/2025
4.0.1-preview.7 153 11/22/2025
4.0.1-preview.6 135 11/22/2025
4.0.1-preview.5 199 11/21/2025
4.0.1-preview.4 208 11/21/2025
4.0.1-preview.3 212 11/21/2025
4.0.1-preview.2 237 11/21/2025
4.0.1-preview.1 248 11/21/2025
4.0.0 388 11/20/2025
4.0.0-preview.3 342 11/19/2025
4.0.0-preview.2 89 11/15/2025
4.0.0-preview.1 108 11/15/2025
3.10.0 161 11/15/2025
3.0.5-preview.15 151 11/14/2025
3.0.5-preview.14 219 11/12/2025
3.0.5-preview.13 226 11/12/2025
3.0.5-preview.12 223 11/11/2025
3.0.5-preview.11 227 11/11/2025
3.0.5-preview.10 225 11/11/2025
3.0.5-preview.9 216 11/10/2025
3.0.5-preview.8 84 11/8/2025
3.0.5-preview.7 88 11/8/2025
3.0.5-preview.6 88 11/8/2025
3.0.5-preview.5 89 11/8/2025
3.0.5-preview.4 61 11/7/2025
3.0.5-preview.3 131 11/4/2025
3.0.5-preview.2 136 11/4/2025
3.0.5-preview.1 134 11/4/2025
3.0.4 186 11/3/2025
3.0.4-preview.19 74 11/2/2025
3.0.4-preview.18 76 11/1/2025
3.0.4-preview.17 74 11/1/2025
3.0.4-preview.16 81 11/1/2025
3.0.4-preview.15 71 10/31/2025
3.0.4-preview.14 145 10/31/2025
3.0.4-preview.13 136 10/30/2025
3.0.4-preview.12 126 10/23/2025
3.0.4-preview.11 124 10/23/2025
3.0.4-preview.10 125 10/23/2025
3.0.4-preview.9 119 10/23/2025
3.0.4-preview.8 124 10/22/2025
3.0.4-preview.6 120 10/21/2025
3.0.4-preview.5 120 10/21/2025
3.0.4-preview.4 121 10/20/2025
3.0.4-preview.3 126 10/20/2025
3.0.4-preview.2 43 10/18/2025