Myth.Flow 3.0.4-preview.15

This is a prerelease version of Myth.Flow.
There is a newer version of this package available.
See the version list below for details.
dotnet add package Myth.Flow --version 3.0.4-preview.15
                    
NuGet\Install-Package Myth.Flow -Version 3.0.4-preview.15
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Myth.Flow" Version="3.0.4-preview.15" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Myth.Flow" Version="3.0.4-preview.15" />
                    
Directory.Packages.props
<PackageReference Include="Myth.Flow" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Myth.Flow --version 3.0.4-preview.15
                    
#r "nuget: Myth.Flow, 3.0.4-preview.15"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Myth.Flow@3.0.4-preview.15
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Myth.Flow&version=3.0.4-preview.15&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Myth.Flow&version=3.0.4-preview.15&prerelease
                    
Install as a Cake Tool

Myth.Flow

NuGet Version NuGet Version

License

pt-br en

A powerful .NET library for building maintainable and testable data processing pipelines with a fluent, chainable interface. Built with enterprise-grade features including automatic retry policies, OpenTelemetry integration, dependency injection, and comprehensive error handling.

โญ Features

  • Fluent Interface: Simple, chainable API design for readable code
  • Type Safety: Strong typing with context transformation support
  • Automatic Retry: Configurable retry policies with exponential backoff
  • OpenTelemetry Integration: Built-in tracing and observability
  • Constructor Injection: Clean dependency injection via constructors (no service locator)
  • Result Pattern: Railway-oriented programming with Result<T>
  • Error Handling: Comprehensive error handling with custom exceptions
  • Conditional Execution: Execute steps based on context predicates
  • Side Effects: Tap into pipeline for logging, metrics, and events
  • Async/Await: First-class async support throughout

๐Ÿ“ฆ Installation

dotnet add package Myth.Flow

๐Ÿš€ Quick Start

Basic Usage

public class OrderService
{
    private readonly ValidationService _validationService;
    private readonly PaymentService _paymentService;
    private readonly InventoryService _inventoryService;

    public OrderService(
        ValidationService validationService,
        PaymentService paymentService,
        InventoryService inventoryService)
    {
        _validationService = validationService;
        _paymentService = paymentService;
        _inventoryService = inventoryService;
    }

    public async Task<Result<OrderContext>> ProcessOrderAsync(int orderId)
    {
        var input = new OrderContext { OrderId = orderId };

        var result = await Pipeline.Start(input)
            .StepAsync(ctx => _validationService.ValidateOrderAsync(ctx))
            .StepAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
            .StepAsync(ctx => _inventoryService.ReserveItemsAsync(ctx))
            .Tap(ctx => Console.WriteLine($"Order {ctx.OrderId} completed"))
            .ExecuteAsync();

        if (result.IsSuccess)
        {
            Console.WriteLine("Order processed successfully!");
        }

        return result;
    }
}

Dependency Injection Setup

Program.cs (Minimal API)

builder.Services.AddFlow(config =>
{
    config.EnableTelemetry = true;
    config.EnableLogging = true;
    config.DefaultRetryAttempts = 3;
    config.DefaultBackoffMs = 100;
});

// Register your services
builder.Services.AddScoped<ValidationService>();
builder.Services.AddScoped<PaymentService>();
builder.Services.AddScoped<InventoryService>();

Using in Controllers/Services

public class OrderController : ControllerBase
{
    private readonly OrderValidationService _validationService;
    private readonly OrderCreationService _creationService;
    private readonly OrderEventService _eventService;

    public OrderController(
        OrderValidationService validationService,
        OrderCreationService creationService,
        OrderEventService eventService)
    {
        _validationService = validationService;
        _creationService = creationService;
        _eventService = eventService;
    }

    [HttpPost("orders")]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
    {
        var context = new OrderContext { Request = request };

        var result = await Pipeline.Start(context)
            .WithTelemetry("CreateOrder")
            .WithRetry(maxAttempts: 3, backoffMs: 100)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
            .Transform<OrderResponse>(ctx => new OrderResponse
            {
                OrderId = ctx.CreatedOrder!.Id,
                Status = ctx.CreatedOrder.Status
            })
            .ExecuteAsync();

        if (result.IsFailure)
            return BadRequest(new { error = result.ErrorMessage });

        return Ok(result.Value);
    }
}

๐Ÿ”ง Configuration

Basic Configuration

// Simple configuration
builder.Services.AddFlow();

// Or with configuration options
builder.Services.AddFlow(config => config
    .UseTelemetry()                          // Enable OpenTelemetry tracing
    .UseLogging()                            // Enable logging
    .UseRetry(attempts: 3, backoffMs: 100)   // Default retry policy
    .UseActivitySource("MyApp.Pipeline")     // Custom ActivitySource name
    .UseExceptionFilter<ArgumentException>() // Propagate ArgumentException without handling
    .UseExceptionFilter(typeof(InvalidOperationException)) // Propagate specific exception types
);

Pipeline Configuration

public class MyService
{
    private readonly ProcessingService _processingService;

    public MyService(ProcessingService processingService)
    {
        _processingService = processingService;
    }

    public async Task<Result<MyContext>> ExecuteAsync(MyContext context)
    {
        var result = await Pipeline.Start(context)
            .WithTelemetry("OperationName")          // Set operation name for tracing
            .WithRetry(maxAttempts: 5, backoffMs: 200) // Configure retry for subsequent steps
            .StepAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();

        return result;
    }
}

๐Ÿ”„ Pipeline Steps

Synchronous Steps

public class MyService
{
    private readonly TransformService _transformService;

    public MyService(TransformService transformService)
    {
        _transformService = transformService;
    }

    public MyContext ProcessData(MyContext ctx)
    {
        var result = Pipeline.Start(ctx)
            .Step(context =>
            {
                // Synchronous processing
                context.Data = _transformService.Transform(context.Data);
                return context;
            })
            .Execute();

        return result.Value;
    }
}

Asynchronous Steps

public class MyService
{
    private readonly ProcessingService _processingService;

    public MyService(ProcessingService processingService)
    {
        _processingService = processingService;
    }

    public async Task<Result<MyContext>> ProcessAsync(MyContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();
    }
}

Steps with Result Pattern

public class OrderProcessor
{
    private readonly ValidationService _validationService;

    public OrderProcessor(ValidationService validationService)
    {
        _validationService = validationService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .ExecuteAsync();
    }
}

The Result<T> pattern allows steps to return success or failure:

public async Task<Result<OrderContext>> ValidateAsync(OrderContext context)
{
    if (string.IsNullOrEmpty(context.Request.Email))
        return Result<OrderContext>.Failure("Email is required");

    if (context.Request.Amount <= 0)
        return Result<OrderContext>.Failure("Amount must be positive");

    return Result<OrderContext>.Success(context);
}

Context Transformation

Transform the pipeline context to a different type:

.Transform<OutputContext>(ctx => new OutputContext
{
    Id = ctx.Entity.Id,
    Name = ctx.Entity.Name
})

.TransformAsync<OutputContext>(async ctx =>
{
    var data = await _service.GetDataAsync(ctx.Id);
    return new OutputContext { Data = data };
})

Side Effects (Tap)

Execute actions without modifying the context:

public class OrderProcessor
{
    private readonly ILogger<OrderProcessor> _logger;
    private readonly EventPublisher _eventPublisher;
    private readonly MetricsService _metricsService;

    public OrderProcessor(
        ILogger<OrderProcessor> logger,
        EventPublisher eventPublisher,
        MetricsService metricsService)
    {
        _logger = logger;
        _eventPublisher = eventPublisher;
        _metricsService = metricsService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            // Simple tap
            .Tap(ctx => Console.WriteLine($"Processing: {ctx.Id}"))

            // Async tap with logger
            .TapAsync(async ctx =>
                await _logger.LogAsync($"Step completed: {ctx.Id}"))

            // Tap with event publishing
            .TapAsync(ctx =>
                _eventPublisher.PublishAsync(new OrderCreated(ctx.OrderId)))

            // Tap with metrics
            .Tap(ctx =>
                _metricsService.IncrementCounter("orders_created"))
            .ExecuteAsync();
    }
}

Conditional Execution

public class OrderProcessor
{
    private readonly FraudDetectionService _fraudDetectionService;
    private readonly ApprovalService _approvalService;

    public OrderProcessor(
        FraudDetectionService fraudDetectionService,
        ApprovalService approvalService)
    {
        _fraudDetectionService = fraudDetectionService;
        _approvalService = approvalService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            .When(
                ctx => ctx.Amount > 1000,
                pipeline => pipeline
                    .StepAsync(ctx => _fraudDetectionService.CheckAsync(ctx))
                    .StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))
            .ExecuteAsync();
    }
}

๐Ÿ” Retry Policies

Configure retry behavior for resilient pipelines:

Global Retry Configuration

builder.Services.AddFlow(config =>
{
    config.DefaultRetryAttempts = 3;
    config.DefaultBackoffMs = 100;
});

Per-Pipeline Retry

.WithRetry(maxAttempts: 5, backoffMs: 200)

Retry behavior:

  • Exponential backoff: delay = backoffMs ร— attemptNumber
  • Retries only on exceptions (not on Result.Failure)
  • OperationCanceledException is never retried
  • Individual steps inherit the retry configuration

Retry Example

public class ApiIntegrationService
{
    private readonly ExternalApiService _externalApiService;

    public ApiIntegrationService(ExternalApiService externalApiService)
    {
        _externalApiService = externalApiService;
    }

    public async Task<Result<ApiContext>> CallApiAsync(ApiContext context)
    {
        var result = await Pipeline.Start(context)
            .WithRetry(maxAttempts: 3, backoffMs: 100)
            .StepAsync(ctx => _externalApiService.CallUnreliableApiAsync(ctx)) // Will retry on exceptions
            .ExecuteAsync();

        return result;
    }
}

๐Ÿ“Š Observability & Telemetry

OpenTelemetry Integration

The library automatically creates activities for distributed tracing:

public class UserService
{
    private readonly ValidationService _validationService;
    private readonly UserCreationService _creationService;

    public UserService(
        ValidationService validationService,
        UserCreationService creationService)
    {
        _validationService = validationService;
        _creationService = creationService;
    }

    public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
    {
        var result = await Pipeline.Start(context)
            .WithTelemetry("CreateUser")
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .ExecuteAsync();

        return result;
    }
}

Each step creates a child activity with tags:

  • pipeline.input.type: Context type name
  • Step-specific tags and timing information

Logging Integration

public class UserCreationService
{
    private readonly ILogger<UserCreationService> _logger;

    public async Task<Result<UserContext>> CreateAsync(UserContext context)
    {
        _logger.LogInformation("Creating user: {Email}", context.Request.Email);

        try
        {
            // Create user logic
            _logger.LogInformation(
                "User {Email} created successfully with ID: {UserId}",
                context.CreatedUser.Email,
                context.CreatedUser.Id);

            return Result<UserContext>.Success(context);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to create user: {Email}", context.Request.Email);
            return Result<UserContext>.Failure("Failed to create user", ex);
        }
    }
}

Metrics Example

public interface IUserMetrics
{
    void IncrementUserCreated();
    void IncrementUserFailed();
}

public class UserMetricsService : IUserMetrics
{
    private int _usersCreated;
    private int _usersFailed;

    public void IncrementUserCreated() => Interlocked.Increment(ref _usersCreated);
    public void IncrementUserFailed() => Interlocked.Increment(ref _usersFailed);
}

public class UserProcessor
{
    private readonly UserObservabilityService _observabilityService;

    public UserProcessor(UserObservabilityService observabilityService)
    {
        _observabilityService = observabilityService;
    }

    public async Task<Result<UserContext>> ProcessAsync(UserContext context)
    {
        return await Pipeline.Start(context)
            .TapAsync(ctx => _observabilityService.RecordMetricsAsync(ctx))
            .ExecuteAsync();
    }
}

๐Ÿ—๏ธ Advanced Patterns

Repository Pattern with Transactions

public class UserCreationService
{
    private readonly IUserRepository _repository;
    private readonly IUnitOfWork _unitOfWork;
    private readonly ILogger<UserCreationService> _logger;

    public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
    {
        try
        {
            await _unitOfWork.BeginTransactionAsync();

            var user = new User
            {
                Email = context.Request.Email,
                PasswordHash = context.PasswordHash,
                Role = context.Request.Role
            };

            context.CreatedUser = await _repository.CreateAsync(user);
            await _unitOfWork.CommitAsync();

            _logger.LogInformation("User {Email} created", user.Email);
            return Result<UserContext>.Success(context);
        }
        catch (Exception ex)
        {
            await _unitOfWork.RollbackAsync();
            _logger.LogError(ex, "Failed to create user");
            return Result<UserContext>.Failure("Failed to create user", ex);
        }
    }
}

Event-Driven Architecture

public class OrderPipeline
{
    private readonly OrderValidationService _validationService;
    private readonly OrderCreationService _creationService;
    private readonly OrderEventService _eventService;
    private readonly NotificationService _notificationService;
    private readonly MetricsService _metricsService;

    public OrderPipeline(
        OrderValidationService validationService,
        OrderCreationService creationService,
        OrderEventService eventService,
        NotificationService notificationService,
        MetricsService metricsService)
    {
        _validationService = validationService;
        _creationService = creationService;
        _eventService = eventService;
        _notificationService = notificationService;
        _metricsService = metricsService;
    }

    public async Task<Result<OrderResponse>> ProcessOrderAsync(OrderContext context)
    {
        var result = await Pipeline.Start(context)
            .WithTelemetry("ProcessOrder")
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
            .TapAsync(ctx => _notificationService.SendConfirmationEmailAsync(ctx))
            .TapAsync(ctx => _metricsService.RecordOrderCreatedAsync(ctx))
            .Transform<OrderResponse>(ctx => new OrderResponse
            {
                OrderId = ctx.Order.Id,
                Status = ctx.Order.Status,
                CreatedAt = ctx.Order.CreatedAt
            })
            .ExecuteAsync();

        return result;
    }
}

Multi-Step Validation Pipeline

public class UserRegistrationPipeline
{
    private readonly EmailValidationService _emailValidationService;
    private readonly PasswordValidationService _passwordValidationService;
    private readonly RoleValidationService _roleValidationService;
    private readonly UserCreationService _userCreationService;
    private readonly EventPublisher _eventPublisher;
    private readonly EmailService _emailService;
    private readonly MetricsService _metricsService;

    public UserRegistrationPipeline(
        EmailValidationService emailValidationService,
        PasswordValidationService passwordValidationService,
        RoleValidationService roleValidationService,
        UserCreationService userCreationService,
        EventPublisher eventPublisher,
        EmailService emailService,
        MetricsService metricsService)
    {
        _emailValidationService = emailValidationService;
        _passwordValidationService = passwordValidationService;
        _roleValidationService = roleValidationService;
        _userCreationService = userCreationService;
        _eventPublisher = eventPublisher;
        _emailService = emailService;
        _metricsService = metricsService;
    }

    public async Task<Result<UserResponse>> RegisterAsync(RegisterUserRequest request)
    {
        var context = new UserContext { Request = request };

        var result = await Pipeline.Start(context)
            .WithTelemetry("RegisterUser")
            .WithRetry(maxAttempts: 2)

            // Validation steps
            .StepResultAsync(ctx => _emailValidationService.ValidateEmailAsync(ctx))
            .StepResultAsync(ctx => _passwordValidationService.ValidatePasswordAsync(ctx))
            .StepResultAsync(ctx => _roleValidationService.ValidateRoleAsync(ctx))

            // Creation step
            .StepResultAsync(ctx => _userCreationService.CreateUserAsync(ctx))

            // Side effects
            .TapAsync(ctx => _eventPublisher.PublishAsync(new UserRegistered(ctx.CreatedUser.Id)))
            .TapAsync(ctx => _emailService.SendWelcomeEmailAsync(ctx.CreatedUser.Email))
            .TapAsync(ctx => _metricsService.IncrementUserRegistrations())

            // Transform to response
            .Transform<UserResponse>(ctx => new UserResponse
            {
                Id = ctx.CreatedUser.Id,
                Email = ctx.CreatedUser.Email,
                Role = ctx.CreatedUser.Role,
                CreatedAt = ctx.CreatedUser.CreatedAt
            })
            .ExecuteAsync();

        return result;
    }
}

Conditional Processing

public class OrderProcessor
{
    private readonly OrderValidationService _validationService;
    private readonly FraudCheckService _fraudCheckService;
    private readonly ManagerApprovalService _approvalService;
    private readonly ComplianceService _complianceService;
    private readonly CurrencyConversionService _currencyService;
    private readonly PaymentService _paymentService;

    public OrderProcessor(
        OrderValidationService validationService,
        FraudCheckService fraudCheckService,
        ManagerApprovalService approvalService,
        ComplianceService complianceService,
        CurrencyConversionService currencyService,
        PaymentService paymentService)
    {
        _validationService = validationService;
        _fraudCheckService = fraudCheckService;
        _approvalService = approvalService;
        _complianceService = complianceService;
        _currencyService = currencyService;
        _paymentService = paymentService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        var result = await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))

            .When(
                ctx => ctx.Order.Amount > 1000,
                pipeline => pipeline
                    .StepAsync(ctx => _fraudCheckService.CheckAsync(ctx))
                    .StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))

            .When(
                ctx => ctx.Order.IsInternational,
                pipeline => pipeline
                    .StepAsync(ctx => _complianceService.CheckComplianceAsync(ctx))
                    .StepAsync(ctx => _currencyService.ConvertCurrencyAsync(ctx)))

            .StepResultAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
            .ExecuteAsync();

        return result;
    }
}

โŒ Error Handling

Result Pattern

The library uses the Result pattern for explicit error handling:

public readonly struct Result<T>
{
    public bool IsSuccess { get; }
    public bool IsFailure { get; }
    public T? Value { get; }
    public string? ErrorMessage { get; }
    public Exception? Exception { get; }
}

Handling Pipeline Results

public class DataProcessor
{
    private readonly ValidationService _validationService;
    private readonly ProcessingService _processingService;

    public DataProcessor(
        ValidationService validationService,
        ProcessingService processingService)
    {
        _validationService = validationService;
        _processingService = processingService;
    }

    public async Task<DataContext?> ProcessDataAsync(DataContext context)
    {
        var result = await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();

        if (result.IsSuccess)
        {
            var data = result.Value;
            // Handle success
            return data;
        }
        else
        {
            var errorMessage = result.ErrorMessage;
            var exception = result.Exception;
            // Handle failure
            return null;
        }
    }
}

Exception Types

  • PipelineException: General pipeline execution errors
  • PipelineConfigurationException: Configuration errors (missing services, invalid setup)

Configuration exceptions are fail-fast and are always re-thrown to prevent silent failures.

Exception Filtering

By default, all exceptions are handled internally by the pipeline and returned as failure results. However, you can configure specific exception types to be propagated (thrown) instead of being handled:

// Configure during service registration
builder.Services.AddFlow(config => config
    .UseExceptionFilter<ArgumentException>()                    // Propagate ArgumentException
    .UseExceptionFilter<InvalidOperationException>()            // Propagate InvalidOperationException
    .UseExceptionFilter(typeof(UnauthorizedAccessException))    // Propagate using Type
);

// Example usage
public class ValidationService
{
    public async Task<Result<UserContext>> ValidateAsync(UserContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(ctx =>
            {
                if (string.IsNullOrEmpty(ctx.Email))
                    throw new ArgumentException("Email is required"); // This will be propagated

                if (ctx.Age < 0)
                    throw new InvalidDataException("Invalid age");     // This will be handled

                return Task.FromResult(ctx);
            })
            .ExecuteAsync(); // ArgumentException will be thrown, InvalidDataException will return failure result
    }
}

Key Features:

  • Exception inheritance is supported (e.g., ArgumentNullException inherits from ArgumentException)
  • Multiple exception types can be configured
  • Configuration applies to all pipelines in the application
  • PipelineConfigurationException and OperationCanceledException are always propagated regardless of configuration

Success and Error Callbacks

public class MyProcessor
{
    private readonly MyService _myService;
    private readonly ILogger<MyProcessor> _logger;

    public MyProcessor(MyService myService, ILogger<MyProcessor> logger)
    {
        _myService = myService;
        _logger = logger;
    }

    public async Task<Result<MyContext>> ProcessAsync(MyContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(
                ctx => _myService.ProcessAsync(ctx),
                onSuccess: ctx => _logger.LogInformation("Step succeeded"),
                onError: ex => _logger.LogError(ex, "Step failed"))
            .ExecuteAsync();
    }
}

๐Ÿงช Testing

The pipeline design makes testing straightforward:

[Fact]
public async Task CreateUser_WithValidData_ShouldSucceed()
{
    // Arrange
    var services = new ServiceCollection();
    services.AddSingleton<IUserRepository>(mockRepository);
    services.AddSingleton<IPasswordValidator>(mockPasswordValidator);
    services.AddSingleton<UserValidationService>();
    services.AddSingleton<UserCreationService>();
    services.AddSingleton<UserRegistrationPipeline>();
    services.AddLogging();
    services.AddFlow();

    var serviceProvider = services.BuildServiceProvider();
    var pipeline = serviceProvider.GetRequiredService<UserRegistrationPipeline>();
    var request = new RegisterUserRequest { Email = "test@example.com", Name = "Test User" };

    // Act
    var result = await pipeline.RegisterAsync(request);

    // Assert
    Assert.True(result.IsSuccess);
    Assert.NotNull(result.Value?.CreatedUser);
    Assert.Equal(request.Email, result.Value.CreatedUser.Email);
}

[Fact]
public async Task ProcessData_WithValidContext_ShouldExecuteAllSteps()
{
    // Arrange
    var validationService = new Mock<ValidationService>();
    var creationService = new Mock<UserCreationService>();

    validationService
        .Setup(x => x.ValidateAsync(It.IsAny<UserContext>()))
        .ReturnsAsync(Result<UserContext>.Success(new UserContext()));

    var processor = new UserProcessor(
        validationService.Object,
        creationService.Object);

    var context = new UserContext { Request = validRequest };

    // Act
    var result = await processor.ProcessAsync(context);

    // Assert
    Assert.True(result.IsSuccess);
    validationService.Verify(x => x.ValidateAsync(It.IsAny<UserContext>()), Times.Once);
    creationService.Verify(x => x.CreateAsync(It.IsAny<UserContext>()), Times.Once);
}

๐Ÿ“‹ Best Practices

  1. Use Constructor Injection: Inject all required services via constructor for better testability and maintainability
  2. Use Result Pattern: Return Result<T> from services for explicit error handling
  3. Configure Dependency Injection: Register all services in Program.cs/Startup.cs
  4. Enable Telemetry: Use WithTelemetry() for production observability
  5. Configure Retry Policies: Set appropriate retry policies for unreliable operations
  6. Separate Concerns: Keep steps focused on single responsibilities
  7. Use Tap for Side Effects: Keep side effects (logging, metrics, events) separate from main flow
  8. Handle Errors Gracefully: Always check IsSuccess before accessing Value
  9. Add Observability: Integrate logging and metrics for production monitoring
  10. Test Pipeline Steps: Test individual services and complete pipelines separately
  11. Use Conditional Execution: Keep conditional logic readable with When()
  12. Avoid Service Locator: Don't resolve services inside pipeline steps; use constructor injection instead

๐Ÿ“Š Response Information

Every pipeline execution returns comprehensive information:

var result = await Pipeline.Start(context)...ExecuteAsync();

Console.WriteLine($"Is Success: {result.IsSuccess}");
Console.WriteLine($"Is Failure: {result.IsFailure}");
Console.WriteLine($"Error Message: {result.ErrorMessage}");

if (result.IsSuccess)
{
    var value = result.Value;
    // Process successful result
}

๐Ÿ”„ Migration Guide

Moving from Service Locator Pattern

If you're upgrading from an earlier version that used the service locator pattern, here's how to migrate your code:

Old Pattern (Service Locator - Deprecated)

var result = await Pipeline.Start(context)
    .StepAsync<ValidationService>((svc, ctx) => svc.ValidateAsync(ctx))
    .StepAsync<ProcessingService>((svc, ctx) => svc.ProcessAsync(ctx))
    .ExecuteAsync();
public class MyPipeline
{
    private readonly ValidationService _validationService;
    private readonly ProcessingService _processingService;

    public MyPipeline(
        ValidationService validationService,
        ProcessingService processingService)
    {
        _validationService = validationService;
        _processingService = processingService;
    }

    public async Task<Result<MyContext>> ExecuteAsync(MyContext context)
    {
        var result = await Pipeline.Start(context)
            .StepAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();

        return result;
    }
}

Benefits of Constructor Injection

  1. Better Testability: Easy to mock dependencies in unit tests
  2. Explicit Dependencies: Clear what services are required
  3. Compile-Time Safety: Missing dependencies caught at startup, not runtime
  4. SOLID Principles: Follows Dependency Inversion Principle
  5. IDE Support: Better IntelliSense and code navigation
  6. No Hidden Dependencies: All dependencies visible in constructor

Migration Steps

  1. Identify Services: Find all .Step<TService>(), .StepAsync<TService>(), .Tap<TService>() calls
  2. Add Constructor Parameters: Add these services as constructor parameters
  3. Store as Fields: Save constructor parameters as private readonly fields
  4. Update Pipeline Calls: Remove <TService> generic parameter and use injected fields
  5. Register Services: Ensure all services are registered in DI container
  6. Test: Verify your pipelines work with the new injection approach

๐Ÿ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ง Support

For issues, questions, or contributions, please visit the GitHub repository.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Myth.Flow:

Package Downloads
Myth.Flow.Actions

CQRS and event-driven architecture extension for Myth.Flow with dispatcher, event bus, message brokers (Kafka, RabbitMQ), caching, circuit breaker, and dead letter queue for resilient distributed systems.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
4.2.1-preview.1 581 12/2/2025
4.2.0 410 11/30/2025
4.2.0-preview.1 58 11/29/2025
4.1.0 169 11/27/2025
4.1.0-preview.3 125 11/27/2025
4.1.0-preview.2 126 11/27/2025
4.1.0-preview.1 127 11/26/2025
4.0.1 159 11/22/2025
4.0.1-preview.8 142 11/22/2025
4.0.1-preview.7 152 11/22/2025
4.0.1-preview.6 136 11/22/2025
4.0.1-preview.5 199 11/21/2025
4.0.1-preview.4 202 11/21/2025
4.0.1-preview.3 208 11/21/2025
4.0.1-preview.2 231 11/21/2025
4.0.1-preview.1 241 11/21/2025
4.0.0 386 11/20/2025
4.0.0-preview.3 337 11/19/2025
4.0.0-preview.2 86 11/15/2025
4.0.0-preview.1 105 11/15/2025
3.10.0 168 11/15/2025
3.0.5-preview.15 148 11/14/2025
3.0.5-preview.14 221 11/12/2025
3.0.5-preview.13 222 11/12/2025
3.0.5-preview.12 222 11/11/2025
3.0.5-preview.11 224 11/11/2025
3.0.5-preview.10 219 11/11/2025
3.0.5-preview.9 215 11/10/2025
3.0.5-preview.8 87 11/8/2025
3.0.5-preview.7 87 11/8/2025
3.0.5-preview.6 86 11/8/2025
3.0.5-preview.5 88 11/8/2025
3.0.5-preview.4 66 11/7/2025
3.0.5-preview.3 130 11/4/2025
3.0.5-preview.2 132 11/4/2025
3.0.5-preview.1 135 11/4/2025
3.0.4 195 11/3/2025
3.0.4-preview.19 74 11/2/2025
3.0.4-preview.18 73 11/1/2025
3.0.4-preview.17 71 11/1/2025
3.0.4-preview.16 74 11/1/2025
3.0.4-preview.15 67 10/31/2025
3.0.4-preview.14 132 10/31/2025
3.0.4-preview.13 133 10/30/2025
3.0.4-preview.12 125 10/23/2025
3.0.4-preview.11 122 10/23/2025
3.0.4-preview.10 120 10/23/2025
3.0.4-preview.9 121 10/23/2025
3.0.4-preview.8 124 10/22/2025
3.0.4-preview.6 116 10/21/2025
3.0.4-preview.5 116 10/21/2025
3.0.4-preview.4 123 10/20/2025
3.0.4-preview.3 118 10/20/2025
3.0.4-preview.2 50 10/18/2025
3.0.4-preview.1 121 10/7/2025