Myth.Flow 4.2.1-preview.1

This is a prerelease version of Myth.Flow.
dotnet add package Myth.Flow --version 4.2.1-preview.1
                    
NuGet\Install-Package Myth.Flow -Version 4.2.1-preview.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Myth.Flow" Version="4.2.1-preview.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Myth.Flow" Version="4.2.1-preview.1" />
                    
Directory.Packages.props
<PackageReference Include="Myth.Flow" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Myth.Flow --version 4.2.1-preview.1
                    
#r "nuget: Myth.Flow, 4.2.1-preview.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Myth.Flow@4.2.1-preview.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Myth.Flow&version=4.2.1-preview.1&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Myth.Flow&version=4.2.1-preview.1&prerelease
                    
Install as a Cake Tool

<img style="float: right;" src="myth-flow-logo.png" alt="drawing" width="250"/>

Myth.Flow

NuGet Version NuGet Version

License

pt-br en

A powerful .NET library for building maintainable and testable data processing pipelines with a fluent, chainable interface. Built with enterprise-grade features including automatic retry policies, OpenTelemetry integration, global service provider integration, and comprehensive error handling.

⭐ Features

  • Fluent Interface: Simple, chainable API design for readable code
  • Type Safety: Strong typing with context transformation support
  • Automatic Retry: Configurable retry policies with exponential backoff
  • OpenTelemetry Integration: Built-in distributed tracing and observability
  • Global Service Provider: Seamless integration with Myth.Commons centralized DI container
  • Result Pattern: Railway-oriented programming with Result<T>
  • Error Handling: Comprehensive error handling with exception filtering
  • Conditional Execution: Execute steps based on context predicates
  • Side Effects: Tap into pipeline for logging, metrics, and events
  • Async/Await: First-class async support with CancellationToken
  • Zero Boilerplate: No service locator pattern - clean, straightforward code

📦 Installation

dotnet add package Myth.Flow

🚀 Quick Start

Basic Usage

public class OrderService
{
    private readonly ValidationService _validationService;
    private readonly PaymentService _paymentService;
    private readonly InventoryService _inventoryService;

    public OrderService(
        ValidationService validationService,
        PaymentService paymentService,
        InventoryService inventoryService)
    {
        _validationService = validationService;
        _paymentService = paymentService;
        _inventoryService = inventoryService;
    }

    public async Task<Result<OrderContext>> ProcessOrderAsync(int orderId)
    {
        var input = new OrderContext { OrderId = orderId };

        var result = await Pipeline.Start(input)
            .StepAsync(ctx => _validationService.ValidateOrderAsync(ctx))
            .StepAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
            .StepAsync(ctx => _inventoryService.ReserveItemsAsync(ctx))
            .Tap(ctx => Console.WriteLine($"Order {ctx.OrderId} completed"))
            .ExecuteAsync();

        if (result.IsSuccess)
        {
            Console.WriteLine("Order processed successfully!");
        }

        return result;
    }
}

Setup

ASP.NET Core Applications

For ASP.NET Core applications, use builder.BuildApp() instead of builder.Build() to automatically initialize the global service provider:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddFlow(config => config
    .UseTelemetry()
    .UseLogging()
    .UseRetry(3, 100));

builder.Services.AddScoped<ValidationService>();
builder.Services.AddScoped<PaymentService>();
builder.Services.AddScoped<InventoryService>();

var app = builder.BuildApp();

app.Run();

Console Applications

For console applications or background services, use services.BuildWithGlobalProvider():

var services = new ServiceCollection();

services.AddFlow(config => config
    .UseTelemetry()
    .UseRetry(3, 100));

services.AddScoped<ValidationService>();
services.AddScoped<ProcessingService>();

var serviceProvider = services.BuildWithGlobalProvider();

Using in Controllers/Services

public class OrderController : ControllerBase
{
    private readonly OrderValidationService _validationService;
    private readonly OrderCreationService _creationService;
    private readonly OrderEventService _eventService;

    public OrderController(
        OrderValidationService validationService,
        OrderCreationService creationService,
        OrderEventService eventService)
    {
        _validationService = validationService;
        _creationService = creationService;
        _eventService = eventService;
    }

    [HttpPost("orders")]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
    {
        var context = new OrderContext { Request = request };

        var result = await Pipeline.Start(context)
            .WithTelemetry("CreateOrder")
            .WithRetry(maxAttempts: 3, backoffMs: 100)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
            .Transform<OrderResponse>(ctx => new OrderResponse
            {
                OrderId = ctx.CreatedOrder!.Id,
                Status = ctx.CreatedOrder.Status
            })
            .ExecuteAsync();

        if (result.IsFailure)
            return BadRequest(new { error = result.ErrorMessage });

        return Ok(result.Value);
    }
}

🔧 Configuration

Basic Configuration

builder.Services.AddFlow();

Advanced Configuration with Fluent Builder

builder.Services.AddFlow(config => config
    .UseTelemetry()
    .UseLogging()
    .UseRetry(3, 100)
    .UseActivitySource("MyApp.Pipeline")
    .UseExceptionFilter<ArgumentException>()
    .UseExceptionFilter<InvalidOperationException>());

Configuration Options

  • UseTelemetry() / DisableTelemetry(): Enable/disable OpenTelemetry distributed tracing
  • UseLogging() / DisableLogging(): Enable/disable Microsoft.Extensions.Logging integration
  • UseRetry(attempts, backoffMs) / DisableRetry(): Configure default retry policy with exponential backoff
  • UseActivitySource(name, version?): Set custom ActivitySource for telemetry
  • UseExceptionFilter<TException>(): Configure exception types to propagate without handling

Per-Pipeline Configuration

Override global settings for specific pipelines:

public class MyService
{
    private readonly ProcessingService _processingService;

    public MyService(ProcessingService processingService)
    {
        _processingService = processingService;
    }

    public async Task<Result<MyContext>> ExecuteAsync(MyContext context)
    {
        return await Pipeline.Start(context)
            .WithTelemetry("OperationName")
            .WithRetry(maxAttempts: 5, backoffMs: 200)
            .StepAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();
    }
}

Pipeline.Start Options

Start with Default Configuration

var result = await Pipeline.Start(context)
    .StepAsync(ctx => ProcessAsync(ctx))
    .ExecuteAsync();

Start with Custom Configuration

var result = await Pipeline.Start(context, config => {
    config.EnableTelemetry = true;
    config.DefaultRetryAttempts = 5;
})
    .StepAsync(ctx => ProcessAsync(ctx))
    .ExecuteAsync();

🔄 Pipeline Steps

Synchronous Steps

public class MyService
{
    private readonly TransformService _transformService;

    public MyService(TransformService transformService)
    {
        _transformService = transformService;
    }

    public MyContext ProcessData(MyContext ctx)
    {
        var result = Pipeline.Start(ctx)
            .Step(context =>
            {
                // Synchronous processing
                context.Data = _transformService.Transform(context.Data);
                return context;
            })
            .Execute();

        return result.Value;
    }
}

Asynchronous Steps

public class MyService
{
    private readonly ProcessingService _processingService;

    public MyService(ProcessingService processingService)
    {
        _processingService = processingService;
    }

    public async Task<Result<MyContext>> ProcessAsync(MyContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();
    }
}

Steps with Result Pattern

Use StepResult and StepResultAsync for operations that can succeed or fail:

public class OrderProcessor
{
    private readonly ValidationService _validationService;

    public OrderProcessor(ValidationService validationService)
    {
        _validationService = validationService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .ExecuteAsync();
    }
}

The Result<T> pattern allows steps to return success or failure. Failed results are automatically converted to exceptions:

public async Task<Result<OrderContext>> ValidateAsync(OrderContext context)
{
    if (string.IsNullOrEmpty(context.Request.Email))
        return Result<OrderContext>.Failure("Email is required");

    if (context.Request.Amount <= 0)
        return Result<OrderContext>.Failure("Amount must be positive");

    return Result<OrderContext>.Success(context);
}

Steps with CancellationToken Support

Pass cancellation tokens for operations that support cancellation:

public async Task<Result<MyContext>> ProcessAsync(MyContext context, CancellationToken ct)
{
    return await Pipeline.Start(context)
        .StepAsync((ctx, token) => LongRunningOperationAsync(ctx, token))
        .StepResultAsync((ctx, token) => ValidateAsync(ctx, token))
        .ExecuteAsync(ct);
}

Context Transformation

Transform the pipeline context to a different type. All previous steps are executed before transformation:

public async Task<Result<OrderResponse>> ProcessOrderAsync(OrderContext context)
{
    return await Pipeline.Start(context)
        .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
        .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
        .Transform<OrderResponse>(ctx => new OrderResponse
        {
            Id = ctx.Entity.Id,
            Name = ctx.Entity.Name
        })
        .ExecuteAsync();
}

Async Transformation

.TransformAsync<OutputContext>(async ctx =>
{
    var data = await _service.GetDataAsync(ctx.Id);
    return new OutputContext { Data = data };
})

Side Effects (Tap)

Execute actions without modifying the context. Perfect for logging, metrics, and event publishing:

public class OrderProcessor
{
    private readonly ILogger<OrderProcessor> _logger;
    private readonly EventPublisher _eventPublisher;
    private readonly MetricsService _metricsService;

    public OrderProcessor(
        ILogger<OrderProcessor> logger,
        EventPublisher eventPublisher,
        MetricsService metricsService)
    {
        _logger = logger;
        _eventPublisher = eventPublisher;
        _metricsService = metricsService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            .Tap(ctx => Console.WriteLine($"Processing: {ctx.Id}"))
            .TapAsync(ctx => _eventPublisher.PublishAsync(new OrderCreated(ctx.OrderId)))
            .Tap(ctx => _metricsService.IncrementCounter("orders_created"))
            .ExecuteAsync();
    }
}

Note: Tap steps don't support retry logic and exceptions are propagated immediately.

Conditional Execution

public class OrderProcessor
{
    private readonly FraudDetectionService _fraudDetectionService;
    private readonly ApprovalService _approvalService;

    public OrderProcessor(
        FraudDetectionService fraudDetectionService,
        ApprovalService approvalService)
    {
        _fraudDetectionService = fraudDetectionService;
        _approvalService = approvalService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        return await Pipeline.Start(context)
            .When(
                ctx => ctx.Amount > 1000,
                pipeline => pipeline
                    .StepAsync(ctx => _fraudDetectionService.CheckAsync(ctx))
                    .StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))
            .ExecuteAsync();
    }
}

🔁 Retry Policies

Configure retry behavior for resilient pipelines with exponential backoff.

Global Retry Configuration

builder.Services.AddFlow(config => config
    .UseRetry(3, 100));

Per-Pipeline Retry

Override global retry settings for specific pipelines:

var result = await Pipeline.Start(context)
    .WithRetry(maxAttempts: 5, backoffMs: 200)
    .StepAsync(ctx => UnreliableOperationAsync(ctx))
    .ExecuteAsync();

Retry Behavior

  • Exponential backoff: delay = backoffMs × attemptNumber
  • Retries only on exceptions: Result.Failure does not trigger retry
  • Never retries: OperationCanceledException and configured exception filters
  • Per-step configuration: Steps added after .WithRetry() inherit that configuration
  • Default: No retry (attempts = 0)

Retry Example

public class ApiIntegrationService
{
    private readonly ExternalApiService _externalApiService;

    public ApiIntegrationService(ExternalApiService externalApiService)
    {
        _externalApiService = externalApiService;
    }

    public async Task<Result<ApiContext>> CallApiAsync(ApiContext context)
    {
        return await Pipeline.Start(context)
            .WithRetry(maxAttempts: 3, backoffMs: 100)
            .StepAsync(ctx => _externalApiService.CallUnreliableApiAsync(ctx))
            .ExecuteAsync();
    }
}

Retry attempts: 1st retry after 100ms, 2nd after 200ms, 3rd after 300ms

📊 Observability & Telemetry

OpenTelemetry Integration

Myth.Flow uses the standard System.Diagnostics.ActivitySource for distributed tracing:

builder.Services.AddFlow(config => config
    .UseTelemetry()
    .UseActivitySource("MyApp.Pipeline", "1.0.0"));

public class UserService
{
    private readonly ValidationService _validationService;
    private readonly UserCreationService _creationService;

    public UserService(
        ValidationService validationService,
        UserCreationService creationService)
    {
        _validationService = validationService;
        _creationService = creationService;
    }

    public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
    {
        return await Pipeline.Start(context)
            .WithTelemetry("CreateUser")
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .ExecuteAsync();
    }
}

Activity Structure

  • Root Activity: Created with the operation name from .WithTelemetry()
  • Step Activities: Child activities named Step_{index}_{stepName}
  • Tags:
    • pipeline.input.type: Context type name
    • Step-specific timing and metadata
  • Status: Ok for success, Error with message for failures

Logging Integration

public class UserCreationService
{
    private readonly ILogger<UserCreationService> _logger;

    public async Task<Result<UserContext>> CreateAsync(UserContext context)
    {
        _logger.LogInformation("Creating user: {Email}", context.Request.Email);

        try
        {
            // Create user logic
            _logger.LogInformation(
                "User {Email} created successfully with ID: {UserId}",
                context.CreatedUser.Email,
                context.CreatedUser.Id);

            return Result<UserContext>.Success(context);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to create user: {Email}", context.Request.Email);
            return Result<UserContext>.Failure("Failed to create user", ex);
        }
    }
}

Metrics Example

public interface IUserMetrics
{
    void IncrementUserCreated();
    void IncrementUserFailed();
}

public class UserMetricsService : IUserMetrics
{
    private int _usersCreated;
    private int _usersFailed;

    public void IncrementUserCreated() => Interlocked.Increment(ref _usersCreated);
    public void IncrementUserFailed() => Interlocked.Increment(ref _usersFailed);
}

public class UserProcessor
{
    private readonly UserObservabilityService _observabilityService;

    public UserProcessor(UserObservabilityService observabilityService)
    {
        _observabilityService = observabilityService;
    }

    public async Task<Result<UserContext>> ProcessAsync(UserContext context)
    {
        return await Pipeline.Start(context)
            .TapAsync(ctx => _observabilityService.RecordMetricsAsync(ctx))
            .ExecuteAsync();
    }
}

🏗️ Advanced Patterns

Repository Pattern with Transactions

public class UserCreationService
{
    private readonly IUserRepository _repository;
    private readonly IUnitOfWork _unitOfWork;
    private readonly ILogger<UserCreationService> _logger;

    public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
    {
        try
        {
            await _unitOfWork.BeginTransactionAsync();

            var user = new User
            {
                Email = context.Request.Email,
                PasswordHash = context.PasswordHash,
                Role = context.Request.Role
            };

            context.CreatedUser = await _repository.CreateAsync(user);
            await _unitOfWork.CommitAsync();

            _logger.LogInformation("User {Email} created", user.Email);
            return Result<UserContext>.Success(context);
        }
        catch (Exception ex)
        {
            await _unitOfWork.RollbackAsync();
            _logger.LogError(ex, "Failed to create user");
            return Result<UserContext>.Failure("Failed to create user", ex);
        }
    }
}

Event-Driven Architecture

public class OrderPipeline
{
    private readonly OrderValidationService _validationService;
    private readonly OrderCreationService _creationService;
    private readonly OrderEventService _eventService;
    private readonly NotificationService _notificationService;
    private readonly MetricsService _metricsService;

    public OrderPipeline(
        OrderValidationService validationService,
        OrderCreationService creationService,
        OrderEventService eventService,
        NotificationService notificationService,
        MetricsService metricsService)
    {
        _validationService = validationService;
        _creationService = creationService;
        _eventService = eventService;
        _notificationService = notificationService;
        _metricsService = metricsService;
    }

    public async Task<Result<OrderResponse>> ProcessOrderAsync(OrderContext context)
    {
        var result = await Pipeline.Start(context)
            .WithTelemetry("ProcessOrder")
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _creationService.CreateAsync(ctx))
            .TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
            .TapAsync(ctx => _notificationService.SendConfirmationEmailAsync(ctx))
            .TapAsync(ctx => _metricsService.RecordOrderCreatedAsync(ctx))
            .Transform<OrderResponse>(ctx => new OrderResponse
            {
                OrderId = ctx.Order.Id,
                Status = ctx.Order.Status,
                CreatedAt = ctx.Order.CreatedAt
            })
            .ExecuteAsync();

        return result;
    }
}

Multi-Step Validation Pipeline

public class UserRegistrationPipeline
{
    private readonly EmailValidationService _emailValidationService;
    private readonly PasswordValidationService _passwordValidationService;
    private readonly RoleValidationService _roleValidationService;
    private readonly UserCreationService _userCreationService;
    private readonly EventPublisher _eventPublisher;
    private readonly EmailService _emailService;
    private readonly MetricsService _metricsService;

    public UserRegistrationPipeline(
        EmailValidationService emailValidationService,
        PasswordValidationService passwordValidationService,
        RoleValidationService roleValidationService,
        UserCreationService userCreationService,
        EventPublisher eventPublisher,
        EmailService emailService,
        MetricsService metricsService)
    {
        _emailValidationService = emailValidationService;
        _passwordValidationService = passwordValidationService;
        _roleValidationService = roleValidationService;
        _userCreationService = userCreationService;
        _eventPublisher = eventPublisher;
        _emailService = emailService;
        _metricsService = metricsService;
    }

    public async Task<Result<UserResponse>> RegisterAsync(RegisterUserRequest request)
    {
        var context = new UserContext { Request = request };

        var result = await Pipeline.Start(context)
            .WithTelemetry("RegisterUser")
            .WithRetry(maxAttempts: 2)

            // Validation steps
            .StepResultAsync(ctx => _emailValidationService.ValidateEmailAsync(ctx))
            .StepResultAsync(ctx => _passwordValidationService.ValidatePasswordAsync(ctx))
            .StepResultAsync(ctx => _roleValidationService.ValidateRoleAsync(ctx))

            // Creation step
            .StepResultAsync(ctx => _userCreationService.CreateUserAsync(ctx))

            // Side effects
            .TapAsync(ctx => _eventPublisher.PublishAsync(new UserRegistered(ctx.CreatedUser.Id)))
            .TapAsync(ctx => _emailService.SendWelcomeEmailAsync(ctx.CreatedUser.Email))
            .TapAsync(ctx => _metricsService.IncrementUserRegistrations())

            // Transform to response
            .Transform<UserResponse>(ctx => new UserResponse
            {
                Id = ctx.CreatedUser.Id,
                Email = ctx.CreatedUser.Email,
                Role = ctx.CreatedUser.Role,
                CreatedAt = ctx.CreatedUser.CreatedAt
            })
            .ExecuteAsync();

        return result;
    }
}

Conditional Processing

public class OrderProcessor
{
    private readonly OrderValidationService _validationService;
    private readonly FraudCheckService _fraudCheckService;
    private readonly ManagerApprovalService _approvalService;
    private readonly ComplianceService _complianceService;
    private readonly CurrencyConversionService _currencyService;
    private readonly PaymentService _paymentService;

    public OrderProcessor(
        OrderValidationService validationService,
        FraudCheckService fraudCheckService,
        ManagerApprovalService approvalService,
        ComplianceService complianceService,
        CurrencyConversionService currencyService,
        PaymentService paymentService)
    {
        _validationService = validationService;
        _fraudCheckService = fraudCheckService;
        _approvalService = approvalService;
        _complianceService = complianceService;
        _currencyService = currencyService;
        _paymentService = paymentService;
    }

    public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
    {
        var result = await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))

            .When(
                ctx => ctx.Order.Amount > 1000,
                pipeline => pipeline
                    .StepAsync(ctx => _fraudCheckService.CheckAsync(ctx))
                    .StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))

            .When(
                ctx => ctx.Order.IsInternational,
                pipeline => pipeline
                    .StepAsync(ctx => _complianceService.CheckComplianceAsync(ctx))
                    .StepAsync(ctx => _currencyService.ConvertCurrencyAsync(ctx)))

            .StepResultAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
            .ExecuteAsync();

        return result;
    }
}

❌ Error Handling

Result Pattern

The library uses the Result pattern for explicit error handling:

public readonly struct Result<T>
{
    public bool IsSuccess { get; }
    public bool IsFailure { get; }
    public T? Value { get; }
    public string? ErrorMessage { get; }
    public Exception? Exception { get; }
}

Handling Pipeline Results

public class DataProcessor
{
    private readonly ValidationService _validationService;
    private readonly ProcessingService _processingService;

    public DataProcessor(
        ValidationService validationService,
        ProcessingService processingService)
    {
        _validationService = validationService;
        _processingService = processingService;
    }

    public async Task<DataContext?> ProcessDataAsync(DataContext context)
    {
        var result = await Pipeline.Start(context)
            .StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
            .StepResultAsync(ctx => _processingService.ProcessAsync(ctx))
            .ExecuteAsync();

        if (result.IsSuccess)
        {
            var data = result.Value;
            // Handle success
            return data;
        }
        else
        {
            var errorMessage = result.ErrorMessage;
            var exception = result.Exception;
            // Handle failure
            return null;
        }
    }
}

Exception Types

  • PipelineException: General pipeline execution errors
  • PipelineConfigurationException: Configuration errors (missing services, invalid setup)

Configuration exceptions are fail-fast and are always re-thrown to prevent silent failures.

Exception Filtering

By default, all exceptions are handled internally by the pipeline and returned as failure results. However, you can configure specific exception types to be propagated (thrown) instead of being handled:

// Configure during service registration
builder.Services.AddFlow(config => config
    .UseExceptionFilter<ArgumentException>()                    // Propagate ArgumentException
    .UseExceptionFilter<InvalidOperationException>()            // Propagate InvalidOperationException
    .UseExceptionFilter(typeof(UnauthorizedAccessException))    // Propagate using Type
);

// Example usage
public class ValidationService
{
    public async Task<Result<UserContext>> ValidateAsync(UserContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(ctx =>
            {
                if (string.IsNullOrEmpty(ctx.Email))
                    throw new ArgumentException("Email is required"); // This will be propagated

                if (ctx.Age < 0)
                    throw new InvalidDataException("Invalid age");     // This will be handled

                return Task.FromResult(ctx);
            })
            .ExecuteAsync(); // ArgumentException will be thrown, InvalidDataException will return failure result
    }
}

Key Features:

  • Exception inheritance is supported (e.g., ArgumentNullException inherits from ArgumentException)
  • Multiple exception types can be configured
  • Configuration applies to all pipelines in the application
  • PipelineConfigurationException and OperationCanceledException are always propagated regardless of configuration

Success and Error Callbacks

public class MyProcessor
{
    private readonly MyService _myService;
    private readonly ILogger<MyProcessor> _logger;

    public MyProcessor(MyService myService, ILogger<MyProcessor> logger)
    {
        _myService = myService;
        _logger = logger;
    }

    public async Task<Result<MyContext>> ProcessAsync(MyContext context)
    {
        return await Pipeline.Start(context)
            .StepAsync(
                ctx => _myService.ProcessAsync(ctx),
                onSuccess: ctx => _logger.LogInformation("Step succeeded"),
                onError: ex => _logger.LogError(ex, "Step failed"))
            .ExecuteAsync();
    }
}

🧪 Testing

The pipeline design makes testing straightforward:

[Fact]
public async Task CreateUser_WithValidData_ShouldSucceed()
{
    // Arrange
    var services = new ServiceCollection();
    services.AddSingleton<IUserRepository>(mockRepository);
    services.AddSingleton<IPasswordValidator>(mockPasswordValidator);
    services.AddSingleton<UserValidationService>();
    services.AddSingleton<UserCreationService>();
    services.AddSingleton<UserRegistrationPipeline>();
    services.AddLogging();
    services.AddFlow();

    var serviceProvider = services.BuildServiceProvider();
    var pipeline = serviceProvider.GetRequiredService<UserRegistrationPipeline>();
    var request = new RegisterUserRequest { Email = "test@example.com", Name = "Test User" };

    // Act
    var result = await pipeline.RegisterAsync(request);

    // Assert
    Assert.True(result.IsSuccess);
    Assert.NotNull(result.Value?.CreatedUser);
    Assert.Equal(request.Email, result.Value.CreatedUser.Email);
}

[Fact]
public async Task ProcessData_WithValidContext_ShouldExecuteAllSteps()
{
    // Arrange
    var validationService = new Mock<ValidationService>();
    var creationService = new Mock<UserCreationService>();

    validationService
        .Setup(x => x.ValidateAsync(It.IsAny<UserContext>()))
        .ReturnsAsync(Result<UserContext>.Success(new UserContext()));

    var processor = new UserProcessor(
        validationService.Object,
        creationService.Object);

    var context = new UserContext { Request = validRequest };

    // Act
    var result = await processor.ProcessAsync(context);

    // Assert
    Assert.True(result.IsSuccess);
    validationService.Verify(x => x.ValidateAsync(It.IsAny<UserContext>()), Times.Once);
    creationService.Verify(x => x.CreateAsync(It.IsAny<UserContext>()), Times.Once);
}

📋 Best Practices

  1. Use BuildApp() or BuildWithGlobalProvider(): Initialize the global service provider for seamless DI
  2. Use Result Pattern: Return Result<T> from services for explicit error handling
  3. Configure Dependencies: Register all services in Program.cs/Startup.cs before building
  4. Enable Telemetry: Use WithTelemetry() for production observability and distributed tracing
  5. Configure Retry Policies: Set appropriate retry policies for unreliable operations (APIs, databases)
  6. Separate Concerns: Keep each step focused on a single responsibility
  7. Use Tap for Side Effects: Keep logging, metrics, and events separate from the main flow
  8. Handle Errors Gracefully: Always check IsSuccess before accessing Value
  9. Use CancellationToken: Pass cancellation tokens for long-running operations
  10. Test Pipeline Steps: Test individual services and complete pipelines separately
  11. Use Conditional Execution: Keep conditional logic readable with When()
  12. Configure Exception Filters: Use .UseExceptionFilter<T>() for business exceptions that should propagate
  13. Keep Context Immutable: Avoid mutating context objects; create new instances when needed
  14. Use Transform for Type Changes: Transform context types when crossing architectural boundaries

📊 Response Information

Every pipeline execution returns comprehensive information:

var result = await Pipeline.Start(context)...ExecuteAsync();

Console.WriteLine($"Is Success: {result.IsSuccess}");
Console.WriteLine($"Is Failure: {result.IsFailure}");
Console.WriteLine($"Error Message: {result.ErrorMessage}");

if (result.IsSuccess)
{
    var value = result.Value;
    // Process successful result
}

🌐 Global Service Provider

Myth.Flow uses the Myth.Commons centralized service provider for seamless cross-library dependency resolution.

How It Works

When you call builder.BuildApp() (ASP.NET Core) or services.BuildWithGlobalProvider() (console apps), the global service provider is automatically initialized. This allows Pipeline.Start() to access all registered services without manual configuration.

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddFlow();
builder.Services.AddScoped<ValidationService>();
builder.Services.AddScoped<ProcessingService>();

var app = builder.BuildApp();

Now all pipelines can access these services:

public async Task<Result<MyContext>> ProcessAsync(MyContext context)
{
    return await Pipeline.Start(context)
        .StepAsync(ctx => ProcessDataAsync(ctx))
        .ExecuteAsync();
}

Benefits

  • Zero Configuration: No need to pass service providers around
  • Cross-Library Integration: Works seamlessly with Myth.Guard, Myth.Flow.Actions, etc.
  • Clean Code: No service locator anti-pattern in your business logic
  • Type Safety: Services are resolved at startup, not runtime

📄 License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

📧 Support

For issues, questions, or contributions, please visit the GitHub repository.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Myth.Flow:

Package Downloads
Myth.Flow.Actions

CQRS and event-driven architecture extension for Myth.Flow with dispatcher, event bus, message brokers (Kafka, RabbitMQ), caching, circuit breaker, and dead letter queue for resilient distributed systems.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
4.2.1-preview.1 145 12/2/2025
4.2.0 208 11/30/2025
4.2.0-preview.1 58 11/29/2025
4.1.0 169 11/27/2025
4.1.0-preview.3 125 11/27/2025
4.1.0-preview.2 126 11/27/2025
4.1.0-preview.1 127 11/26/2025
4.0.1 159 11/22/2025
4.0.1-preview.8 142 11/22/2025
4.0.1-preview.7 152 11/22/2025
4.0.1-preview.6 136 11/22/2025
4.0.1-preview.5 199 11/21/2025
4.0.1-preview.4 202 11/21/2025
4.0.1-preview.3 208 11/21/2025
4.0.1-preview.2 231 11/21/2025
4.0.1-preview.1 241 11/21/2025
4.0.0 386 11/20/2025
4.0.0-preview.3 337 11/19/2025
4.0.0-preview.2 86 11/15/2025
4.0.0-preview.1 105 11/15/2025
3.10.0 168 11/15/2025
3.0.5-preview.15 148 11/14/2025
3.0.5-preview.14 221 11/12/2025
3.0.5-preview.13 222 11/12/2025
3.0.5-preview.12 222 11/11/2025
3.0.5-preview.11 224 11/11/2025
3.0.5-preview.10 219 11/11/2025
3.0.5-preview.9 215 11/10/2025
3.0.5-preview.8 87 11/8/2025
3.0.5-preview.7 87 11/8/2025
3.0.5-preview.6 86 11/8/2025
3.0.5-preview.5 88 11/8/2025
3.0.5-preview.4 66 11/7/2025
3.0.5-preview.3 130 11/4/2025
3.0.5-preview.2 132 11/4/2025
3.0.5-preview.1 135 11/4/2025
3.0.4 195 11/3/2025
3.0.4-preview.19 74 11/2/2025
3.0.4-preview.18 73 11/1/2025
3.0.4-preview.17 71 11/1/2025
3.0.4-preview.16 74 11/1/2025
3.0.4-preview.15 67 10/31/2025
3.0.4-preview.14 132 10/31/2025
3.0.4-preview.13 133 10/30/2025
3.0.4-preview.12 125 10/23/2025
3.0.4-preview.11 122 10/23/2025
3.0.4-preview.10 120 10/23/2025
3.0.4-preview.9 121 10/23/2025
3.0.4-preview.8 124 10/22/2025
3.0.4-preview.6 116 10/21/2025
3.0.4-preview.5 116 10/21/2025
3.0.4-preview.4 123 10/20/2025
3.0.4-preview.3 118 10/20/2025
3.0.4-preview.2 50 10/18/2025
3.0.4-preview.1 121 10/7/2025