Myth.Flow
4.2.1-preview.1
dotnet add package Myth.Flow --version 4.2.1-preview.1
NuGet\Install-Package Myth.Flow -Version 4.2.1-preview.1
<PackageReference Include="Myth.Flow" Version="4.2.1-preview.1" />
<PackageVersion Include="Myth.Flow" Version="4.2.1-preview.1" />
<PackageReference Include="Myth.Flow" />
paket add Myth.Flow --version 4.2.1-preview.1
#r "nuget: Myth.Flow, 4.2.1-preview.1"
#:package Myth.Flow@4.2.1-preview.1
#addin nuget:?package=Myth.Flow&version=4.2.1-preview.1&prerelease
#tool nuget:?package=Myth.Flow&version=4.2.1-preview.1&prerelease
<img style="float: right;" src="myth-flow-logo.png" alt="drawing" width="250"/>
Myth.Flow
A powerful .NET library for building maintainable and testable data processing pipelines with a fluent, chainable interface. Built with enterprise-grade features including automatic retry policies, OpenTelemetry integration, global service provider integration, and comprehensive error handling.
⭐ Features
- Fluent Interface: Simple, chainable API design for readable code
- Type Safety: Strong typing with context transformation support
- Automatic Retry: Configurable retry policies with exponential backoff
- OpenTelemetry Integration: Built-in distributed tracing and observability
- Global Service Provider: Seamless integration with Myth.Commons centralized DI container
- Result Pattern: Railway-oriented programming with
Result<T> - Error Handling: Comprehensive error handling with exception filtering
- Conditional Execution: Execute steps based on context predicates
- Side Effects: Tap into pipeline for logging, metrics, and events
- Async/Await: First-class async support with CancellationToken
- Zero Boilerplate: No service locator pattern - clean, straightforward code
📦 Installation
dotnet add package Myth.Flow
🚀 Quick Start
Basic Usage
public class OrderService
{
private readonly ValidationService _validationService;
private readonly PaymentService _paymentService;
private readonly InventoryService _inventoryService;
public OrderService(
ValidationService validationService,
PaymentService paymentService,
InventoryService inventoryService)
{
_validationService = validationService;
_paymentService = paymentService;
_inventoryService = inventoryService;
}
public async Task<Result<OrderContext>> ProcessOrderAsync(int orderId)
{
var input = new OrderContext { OrderId = orderId };
var result = await Pipeline.Start(input)
.StepAsync(ctx => _validationService.ValidateOrderAsync(ctx))
.StepAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
.StepAsync(ctx => _inventoryService.ReserveItemsAsync(ctx))
.Tap(ctx => Console.WriteLine($"Order {ctx.OrderId} completed"))
.ExecuteAsync();
if (result.IsSuccess)
{
Console.WriteLine("Order processed successfully!");
}
return result;
}
}
Setup
ASP.NET Core Applications
For ASP.NET Core applications, use builder.BuildApp() instead of builder.Build() to automatically initialize the global service provider:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddFlow(config => config
.UseTelemetry()
.UseLogging()
.UseRetry(3, 100));
builder.Services.AddScoped<ValidationService>();
builder.Services.AddScoped<PaymentService>();
builder.Services.AddScoped<InventoryService>();
var app = builder.BuildApp();
app.Run();
Console Applications
For console applications or background services, use services.BuildWithGlobalProvider():
var services = new ServiceCollection();
services.AddFlow(config => config
.UseTelemetry()
.UseRetry(3, 100));
services.AddScoped<ValidationService>();
services.AddScoped<ProcessingService>();
var serviceProvider = services.BuildWithGlobalProvider();
Using in Controllers/Services
public class OrderController : ControllerBase
{
private readonly OrderValidationService _validationService;
private readonly OrderCreationService _creationService;
private readonly OrderEventService _eventService;
public OrderController(
OrderValidationService validationService,
OrderCreationService creationService,
OrderEventService eventService)
{
_validationService = validationService;
_creationService = creationService;
_eventService = eventService;
}
[HttpPost("orders")]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
var context = new OrderContext { Request = request };
var result = await Pipeline.Start(context)
.WithTelemetry("CreateOrder")
.WithRetry(maxAttempts: 3, backoffMs: 100)
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.StepResultAsync(ctx => _creationService.CreateAsync(ctx))
.TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
.Transform<OrderResponse>(ctx => new OrderResponse
{
OrderId = ctx.CreatedOrder!.Id,
Status = ctx.CreatedOrder.Status
})
.ExecuteAsync();
if (result.IsFailure)
return BadRequest(new { error = result.ErrorMessage });
return Ok(result.Value);
}
}
🔧 Configuration
Basic Configuration
builder.Services.AddFlow();
Advanced Configuration with Fluent Builder
builder.Services.AddFlow(config => config
.UseTelemetry()
.UseLogging()
.UseRetry(3, 100)
.UseActivitySource("MyApp.Pipeline")
.UseExceptionFilter<ArgumentException>()
.UseExceptionFilter<InvalidOperationException>());
Configuration Options
- UseTelemetry() / DisableTelemetry(): Enable/disable OpenTelemetry distributed tracing
- UseLogging() / DisableLogging(): Enable/disable Microsoft.Extensions.Logging integration
- UseRetry(attempts, backoffMs) / DisableRetry(): Configure default retry policy with exponential backoff
- UseActivitySource(name, version?): Set custom ActivitySource for telemetry
- UseExceptionFilter<TException>(): Configure exception types to propagate without handling
Per-Pipeline Configuration
Override global settings for specific pipelines:
public class MyService
{
private readonly ProcessingService _processingService;
public MyService(ProcessingService processingService)
{
_processingService = processingService;
}
public async Task<Result<MyContext>> ExecuteAsync(MyContext context)
{
return await Pipeline.Start(context)
.WithTelemetry("OperationName")
.WithRetry(maxAttempts: 5, backoffMs: 200)
.StepAsync(ctx => _processingService.ProcessAsync(ctx))
.ExecuteAsync();
}
}
Pipeline.Start Options
Start with Default Configuration
var result = await Pipeline.Start(context)
.StepAsync(ctx => ProcessAsync(ctx))
.ExecuteAsync();
Start with Custom Configuration
var result = await Pipeline.Start(context, config => {
config.EnableTelemetry = true;
config.DefaultRetryAttempts = 5;
})
.StepAsync(ctx => ProcessAsync(ctx))
.ExecuteAsync();
🔄 Pipeline Steps
Synchronous Steps
public class MyService
{
private readonly TransformService _transformService;
public MyService(TransformService transformService)
{
_transformService = transformService;
}
public MyContext ProcessData(MyContext ctx)
{
var result = Pipeline.Start(ctx)
.Step(context =>
{
// Synchronous processing
context.Data = _transformService.Transform(context.Data);
return context;
})
.Execute();
return result.Value;
}
}
Asynchronous Steps
public class MyService
{
private readonly ProcessingService _processingService;
public MyService(ProcessingService processingService)
{
_processingService = processingService;
}
public async Task<Result<MyContext>> ProcessAsync(MyContext context)
{
return await Pipeline.Start(context)
.StepAsync(ctx => _processingService.ProcessAsync(ctx))
.ExecuteAsync();
}
}
Steps with Result Pattern
Use StepResult and StepResultAsync for operations that can succeed or fail:
public class OrderProcessor
{
private readonly ValidationService _validationService;
public OrderProcessor(ValidationService validationService)
{
_validationService = validationService;
}
public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
{
return await Pipeline.Start(context)
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.ExecuteAsync();
}
}
The Result<T> pattern allows steps to return success or failure. Failed results are automatically converted to exceptions:
public async Task<Result<OrderContext>> ValidateAsync(OrderContext context)
{
if (string.IsNullOrEmpty(context.Request.Email))
return Result<OrderContext>.Failure("Email is required");
if (context.Request.Amount <= 0)
return Result<OrderContext>.Failure("Amount must be positive");
return Result<OrderContext>.Success(context);
}
Steps with CancellationToken Support
Pass cancellation tokens for operations that support cancellation:
public async Task<Result<MyContext>> ProcessAsync(MyContext context, CancellationToken ct)
{
return await Pipeline.Start(context)
.StepAsync((ctx, token) => LongRunningOperationAsync(ctx, token))
.StepResultAsync((ctx, token) => ValidateAsync(ctx, token))
.ExecuteAsync(ct);
}
Context Transformation
Transform the pipeline context to a different type. All previous steps are executed before transformation:
public async Task<Result<OrderResponse>> ProcessOrderAsync(OrderContext context)
{
return await Pipeline.Start(context)
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.StepResultAsync(ctx => _creationService.CreateAsync(ctx))
.Transform<OrderResponse>(ctx => new OrderResponse
{
Id = ctx.Entity.Id,
Name = ctx.Entity.Name
})
.ExecuteAsync();
}
Async Transformation
.TransformAsync<OutputContext>(async ctx =>
{
var data = await _service.GetDataAsync(ctx.Id);
return new OutputContext { Data = data };
})
Side Effects (Tap)
Execute actions without modifying the context. Perfect for logging, metrics, and event publishing:
public class OrderProcessor
{
private readonly ILogger<OrderProcessor> _logger;
private readonly EventPublisher _eventPublisher;
private readonly MetricsService _metricsService;
public OrderProcessor(
ILogger<OrderProcessor> logger,
EventPublisher eventPublisher,
MetricsService metricsService)
{
_logger = logger;
_eventPublisher = eventPublisher;
_metricsService = metricsService;
}
public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
{
return await Pipeline.Start(context)
.Tap(ctx => Console.WriteLine($"Processing: {ctx.Id}"))
.TapAsync(ctx => _eventPublisher.PublishAsync(new OrderCreated(ctx.OrderId)))
.Tap(ctx => _metricsService.IncrementCounter("orders_created"))
.ExecuteAsync();
}
}
Note: Tap steps don't support retry logic and exceptions are propagated immediately.
Conditional Execution
public class OrderProcessor
{
private readonly FraudDetectionService _fraudDetectionService;
private readonly ApprovalService _approvalService;
public OrderProcessor(
FraudDetectionService fraudDetectionService,
ApprovalService approvalService)
{
_fraudDetectionService = fraudDetectionService;
_approvalService = approvalService;
}
public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
{
return await Pipeline.Start(context)
.When(
ctx => ctx.Amount > 1000,
pipeline => pipeline
.StepAsync(ctx => _fraudDetectionService.CheckAsync(ctx))
.StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))
.ExecuteAsync();
}
}
🔁 Retry Policies
Configure retry behavior for resilient pipelines with exponential backoff.
Global Retry Configuration
builder.Services.AddFlow(config => config
.UseRetry(3, 100));
Per-Pipeline Retry
Override global retry settings for specific pipelines:
var result = await Pipeline.Start(context)
.WithRetry(maxAttempts: 5, backoffMs: 200)
.StepAsync(ctx => UnreliableOperationAsync(ctx))
.ExecuteAsync();
Retry Behavior
- Exponential backoff:
delay = backoffMs × attemptNumber - Retries only on exceptions:
Result.Failuredoes not trigger retry - Never retries:
OperationCanceledExceptionand configured exception filters - Per-step configuration: Steps added after
.WithRetry()inherit that configuration - Default: No retry (attempts = 0)
Retry Example
public class ApiIntegrationService
{
private readonly ExternalApiService _externalApiService;
public ApiIntegrationService(ExternalApiService externalApiService)
{
_externalApiService = externalApiService;
}
public async Task<Result<ApiContext>> CallApiAsync(ApiContext context)
{
return await Pipeline.Start(context)
.WithRetry(maxAttempts: 3, backoffMs: 100)
.StepAsync(ctx => _externalApiService.CallUnreliableApiAsync(ctx))
.ExecuteAsync();
}
}
Retry attempts: 1st retry after 100ms, 2nd after 200ms, 3rd after 300ms
📊 Observability & Telemetry
OpenTelemetry Integration
Myth.Flow uses the standard System.Diagnostics.ActivitySource for distributed tracing:
builder.Services.AddFlow(config => config
.UseTelemetry()
.UseActivitySource("MyApp.Pipeline", "1.0.0"));
public class UserService
{
private readonly ValidationService _validationService;
private readonly UserCreationService _creationService;
public UserService(
ValidationService validationService,
UserCreationService creationService)
{
_validationService = validationService;
_creationService = creationService;
}
public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
{
return await Pipeline.Start(context)
.WithTelemetry("CreateUser")
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.StepResultAsync(ctx => _creationService.CreateAsync(ctx))
.ExecuteAsync();
}
}
Activity Structure
- Root Activity: Created with the operation name from
.WithTelemetry() - Step Activities: Child activities named
Step_{index}_{stepName} - Tags:
pipeline.input.type: Context type name- Step-specific timing and metadata
- Status:
Okfor success,Errorwith message for failures
Logging Integration
public class UserCreationService
{
private readonly ILogger<UserCreationService> _logger;
public async Task<Result<UserContext>> CreateAsync(UserContext context)
{
_logger.LogInformation("Creating user: {Email}", context.Request.Email);
try
{
// Create user logic
_logger.LogInformation(
"User {Email} created successfully with ID: {UserId}",
context.CreatedUser.Email,
context.CreatedUser.Id);
return Result<UserContext>.Success(context);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create user: {Email}", context.Request.Email);
return Result<UserContext>.Failure("Failed to create user", ex);
}
}
}
Metrics Example
public interface IUserMetrics
{
void IncrementUserCreated();
void IncrementUserFailed();
}
public class UserMetricsService : IUserMetrics
{
private int _usersCreated;
private int _usersFailed;
public void IncrementUserCreated() => Interlocked.Increment(ref _usersCreated);
public void IncrementUserFailed() => Interlocked.Increment(ref _usersFailed);
}
public class UserProcessor
{
private readonly UserObservabilityService _observabilityService;
public UserProcessor(UserObservabilityService observabilityService)
{
_observabilityService = observabilityService;
}
public async Task<Result<UserContext>> ProcessAsync(UserContext context)
{
return await Pipeline.Start(context)
.TapAsync(ctx => _observabilityService.RecordMetricsAsync(ctx))
.ExecuteAsync();
}
}
🏗️ Advanced Patterns
Repository Pattern with Transactions
public class UserCreationService
{
private readonly IUserRepository _repository;
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<UserCreationService> _logger;
public async Task<Result<UserContext>> CreateUserAsync(UserContext context)
{
try
{
await _unitOfWork.BeginTransactionAsync();
var user = new User
{
Email = context.Request.Email,
PasswordHash = context.PasswordHash,
Role = context.Request.Role
};
context.CreatedUser = await _repository.CreateAsync(user);
await _unitOfWork.CommitAsync();
_logger.LogInformation("User {Email} created", user.Email);
return Result<UserContext>.Success(context);
}
catch (Exception ex)
{
await _unitOfWork.RollbackAsync();
_logger.LogError(ex, "Failed to create user");
return Result<UserContext>.Failure("Failed to create user", ex);
}
}
}
Event-Driven Architecture
public class OrderPipeline
{
private readonly OrderValidationService _validationService;
private readonly OrderCreationService _creationService;
private readonly OrderEventService _eventService;
private readonly NotificationService _notificationService;
private readonly MetricsService _metricsService;
public OrderPipeline(
OrderValidationService validationService,
OrderCreationService creationService,
OrderEventService eventService,
NotificationService notificationService,
MetricsService metricsService)
{
_validationService = validationService;
_creationService = creationService;
_eventService = eventService;
_notificationService = notificationService;
_metricsService = metricsService;
}
public async Task<Result<OrderResponse>> ProcessOrderAsync(OrderContext context)
{
var result = await Pipeline.Start(context)
.WithTelemetry("ProcessOrder")
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.StepResultAsync(ctx => _creationService.CreateAsync(ctx))
.TapAsync(ctx => _eventService.PublishOrderCreatedAsync(ctx))
.TapAsync(ctx => _notificationService.SendConfirmationEmailAsync(ctx))
.TapAsync(ctx => _metricsService.RecordOrderCreatedAsync(ctx))
.Transform<OrderResponse>(ctx => new OrderResponse
{
OrderId = ctx.Order.Id,
Status = ctx.Order.Status,
CreatedAt = ctx.Order.CreatedAt
})
.ExecuteAsync();
return result;
}
}
Multi-Step Validation Pipeline
public class UserRegistrationPipeline
{
private readonly EmailValidationService _emailValidationService;
private readonly PasswordValidationService _passwordValidationService;
private readonly RoleValidationService _roleValidationService;
private readonly UserCreationService _userCreationService;
private readonly EventPublisher _eventPublisher;
private readonly EmailService _emailService;
private readonly MetricsService _metricsService;
public UserRegistrationPipeline(
EmailValidationService emailValidationService,
PasswordValidationService passwordValidationService,
RoleValidationService roleValidationService,
UserCreationService userCreationService,
EventPublisher eventPublisher,
EmailService emailService,
MetricsService metricsService)
{
_emailValidationService = emailValidationService;
_passwordValidationService = passwordValidationService;
_roleValidationService = roleValidationService;
_userCreationService = userCreationService;
_eventPublisher = eventPublisher;
_emailService = emailService;
_metricsService = metricsService;
}
public async Task<Result<UserResponse>> RegisterAsync(RegisterUserRequest request)
{
var context = new UserContext { Request = request };
var result = await Pipeline.Start(context)
.WithTelemetry("RegisterUser")
.WithRetry(maxAttempts: 2)
// Validation steps
.StepResultAsync(ctx => _emailValidationService.ValidateEmailAsync(ctx))
.StepResultAsync(ctx => _passwordValidationService.ValidatePasswordAsync(ctx))
.StepResultAsync(ctx => _roleValidationService.ValidateRoleAsync(ctx))
// Creation step
.StepResultAsync(ctx => _userCreationService.CreateUserAsync(ctx))
// Side effects
.TapAsync(ctx => _eventPublisher.PublishAsync(new UserRegistered(ctx.CreatedUser.Id)))
.TapAsync(ctx => _emailService.SendWelcomeEmailAsync(ctx.CreatedUser.Email))
.TapAsync(ctx => _metricsService.IncrementUserRegistrations())
// Transform to response
.Transform<UserResponse>(ctx => new UserResponse
{
Id = ctx.CreatedUser.Id,
Email = ctx.CreatedUser.Email,
Role = ctx.CreatedUser.Role,
CreatedAt = ctx.CreatedUser.CreatedAt
})
.ExecuteAsync();
return result;
}
}
Conditional Processing
public class OrderProcessor
{
private readonly OrderValidationService _validationService;
private readonly FraudCheckService _fraudCheckService;
private readonly ManagerApprovalService _approvalService;
private readonly ComplianceService _complianceService;
private readonly CurrencyConversionService _currencyService;
private readonly PaymentService _paymentService;
public OrderProcessor(
OrderValidationService validationService,
FraudCheckService fraudCheckService,
ManagerApprovalService approvalService,
ComplianceService complianceService,
CurrencyConversionService currencyService,
PaymentService paymentService)
{
_validationService = validationService;
_fraudCheckService = fraudCheckService;
_approvalService = approvalService;
_complianceService = complianceService;
_currencyService = currencyService;
_paymentService = paymentService;
}
public async Task<Result<OrderContext>> ProcessAsync(OrderContext context)
{
var result = await Pipeline.Start(context)
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.When(
ctx => ctx.Order.Amount > 1000,
pipeline => pipeline
.StepAsync(ctx => _fraudCheckService.CheckAsync(ctx))
.StepAsync(ctx => _approvalService.RequestApprovalAsync(ctx)))
.When(
ctx => ctx.Order.IsInternational,
pipeline => pipeline
.StepAsync(ctx => _complianceService.CheckComplianceAsync(ctx))
.StepAsync(ctx => _currencyService.ConvertCurrencyAsync(ctx)))
.StepResultAsync(ctx => _paymentService.ProcessPaymentAsync(ctx))
.ExecuteAsync();
return result;
}
}
❌ Error Handling
Result Pattern
The library uses the Result pattern for explicit error handling:
public readonly struct Result<T>
{
public bool IsSuccess { get; }
public bool IsFailure { get; }
public T? Value { get; }
public string? ErrorMessage { get; }
public Exception? Exception { get; }
}
Handling Pipeline Results
public class DataProcessor
{
private readonly ValidationService _validationService;
private readonly ProcessingService _processingService;
public DataProcessor(
ValidationService validationService,
ProcessingService processingService)
{
_validationService = validationService;
_processingService = processingService;
}
public async Task<DataContext?> ProcessDataAsync(DataContext context)
{
var result = await Pipeline.Start(context)
.StepResultAsync(ctx => _validationService.ValidateAsync(ctx))
.StepResultAsync(ctx => _processingService.ProcessAsync(ctx))
.ExecuteAsync();
if (result.IsSuccess)
{
var data = result.Value;
// Handle success
return data;
}
else
{
var errorMessage = result.ErrorMessage;
var exception = result.Exception;
// Handle failure
return null;
}
}
}
Exception Types
PipelineException: General pipeline execution errorsPipelineConfigurationException: Configuration errors (missing services, invalid setup)
Configuration exceptions are fail-fast and are always re-thrown to prevent silent failures.
Exception Filtering
By default, all exceptions are handled internally by the pipeline and returned as failure results. However, you can configure specific exception types to be propagated (thrown) instead of being handled:
// Configure during service registration
builder.Services.AddFlow(config => config
.UseExceptionFilter<ArgumentException>() // Propagate ArgumentException
.UseExceptionFilter<InvalidOperationException>() // Propagate InvalidOperationException
.UseExceptionFilter(typeof(UnauthorizedAccessException)) // Propagate using Type
);
// Example usage
public class ValidationService
{
public async Task<Result<UserContext>> ValidateAsync(UserContext context)
{
return await Pipeline.Start(context)
.StepAsync(ctx =>
{
if (string.IsNullOrEmpty(ctx.Email))
throw new ArgumentException("Email is required"); // This will be propagated
if (ctx.Age < 0)
throw new InvalidDataException("Invalid age"); // This will be handled
return Task.FromResult(ctx);
})
.ExecuteAsync(); // ArgumentException will be thrown, InvalidDataException will return failure result
}
}
Key Features:
- Exception inheritance is supported (e.g.,
ArgumentNullExceptioninherits fromArgumentException) - Multiple exception types can be configured
- Configuration applies to all pipelines in the application
PipelineConfigurationExceptionandOperationCanceledExceptionare always propagated regardless of configuration
Success and Error Callbacks
public class MyProcessor
{
private readonly MyService _myService;
private readonly ILogger<MyProcessor> _logger;
public MyProcessor(MyService myService, ILogger<MyProcessor> logger)
{
_myService = myService;
_logger = logger;
}
public async Task<Result<MyContext>> ProcessAsync(MyContext context)
{
return await Pipeline.Start(context)
.StepAsync(
ctx => _myService.ProcessAsync(ctx),
onSuccess: ctx => _logger.LogInformation("Step succeeded"),
onError: ex => _logger.LogError(ex, "Step failed"))
.ExecuteAsync();
}
}
🧪 Testing
The pipeline design makes testing straightforward:
[Fact]
public async Task CreateUser_WithValidData_ShouldSucceed()
{
// Arrange
var services = new ServiceCollection();
services.AddSingleton<IUserRepository>(mockRepository);
services.AddSingleton<IPasswordValidator>(mockPasswordValidator);
services.AddSingleton<UserValidationService>();
services.AddSingleton<UserCreationService>();
services.AddSingleton<UserRegistrationPipeline>();
services.AddLogging();
services.AddFlow();
var serviceProvider = services.BuildServiceProvider();
var pipeline = serviceProvider.GetRequiredService<UserRegistrationPipeline>();
var request = new RegisterUserRequest { Email = "test@example.com", Name = "Test User" };
// Act
var result = await pipeline.RegisterAsync(request);
// Assert
Assert.True(result.IsSuccess);
Assert.NotNull(result.Value?.CreatedUser);
Assert.Equal(request.Email, result.Value.CreatedUser.Email);
}
[Fact]
public async Task ProcessData_WithValidContext_ShouldExecuteAllSteps()
{
// Arrange
var validationService = new Mock<ValidationService>();
var creationService = new Mock<UserCreationService>();
validationService
.Setup(x => x.ValidateAsync(It.IsAny<UserContext>()))
.ReturnsAsync(Result<UserContext>.Success(new UserContext()));
var processor = new UserProcessor(
validationService.Object,
creationService.Object);
var context = new UserContext { Request = validRequest };
// Act
var result = await processor.ProcessAsync(context);
// Assert
Assert.True(result.IsSuccess);
validationService.Verify(x => x.ValidateAsync(It.IsAny<UserContext>()), Times.Once);
creationService.Verify(x => x.CreateAsync(It.IsAny<UserContext>()), Times.Once);
}
📋 Best Practices
- Use BuildApp() or BuildWithGlobalProvider(): Initialize the global service provider for seamless DI
- Use Result Pattern: Return
Result<T>from services for explicit error handling - Configure Dependencies: Register all services in Program.cs/Startup.cs before building
- Enable Telemetry: Use
WithTelemetry()for production observability and distributed tracing - Configure Retry Policies: Set appropriate retry policies for unreliable operations (APIs, databases)
- Separate Concerns: Keep each step focused on a single responsibility
- Use Tap for Side Effects: Keep logging, metrics, and events separate from the main flow
- Handle Errors Gracefully: Always check
IsSuccessbefore accessingValue - Use CancellationToken: Pass cancellation tokens for long-running operations
- Test Pipeline Steps: Test individual services and complete pipelines separately
- Use Conditional Execution: Keep conditional logic readable with
When() - Configure Exception Filters: Use
.UseExceptionFilter<T>()for business exceptions that should propagate - Keep Context Immutable: Avoid mutating context objects; create new instances when needed
- Use Transform for Type Changes: Transform context types when crossing architectural boundaries
📊 Response Information
Every pipeline execution returns comprehensive information:
var result = await Pipeline.Start(context)...ExecuteAsync();
Console.WriteLine($"Is Success: {result.IsSuccess}");
Console.WriteLine($"Is Failure: {result.IsFailure}");
Console.WriteLine($"Error Message: {result.ErrorMessage}");
if (result.IsSuccess)
{
var value = result.Value;
// Process successful result
}
🌐 Global Service Provider
Myth.Flow uses the Myth.Commons centralized service provider for seamless cross-library dependency resolution.
How It Works
When you call builder.BuildApp() (ASP.NET Core) or services.BuildWithGlobalProvider() (console apps), the global service provider is automatically initialized. This allows Pipeline.Start() to access all registered services without manual configuration.
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddFlow();
builder.Services.AddScoped<ValidationService>();
builder.Services.AddScoped<ProcessingService>();
var app = builder.BuildApp();
Now all pipelines can access these services:
public async Task<Result<MyContext>> ProcessAsync(MyContext context)
{
return await Pipeline.Start(context)
.StepAsync(ctx => ProcessDataAsync(ctx))
.ExecuteAsync();
}
Benefits
- Zero Configuration: No need to pass service providers around
- Cross-Library Integration: Works seamlessly with Myth.Guard, Myth.Flow.Actions, etc.
- Clean Code: No service locator anti-pattern in your business logic
- Type Safety: Services are resolved at startup, not runtime
📄 License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
📧 Support
For issues, questions, or contributions, please visit the GitHub repository.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Myth.Commons (>= 4.2.1-preview.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Myth.Flow:
| Package | Downloads |
|---|---|
|
Myth.Flow.Actions
CQRS and event-driven architecture extension for Myth.Flow with dispatcher, event bus, message brokers (Kafka, RabbitMQ), caching, circuit breaker, and dead letter queue for resilient distributed systems. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 4.2.1-preview.1 | 145 | 12/2/2025 |
| 4.2.0 | 208 | 11/30/2025 |
| 4.2.0-preview.1 | 58 | 11/29/2025 |
| 4.1.0 | 169 | 11/27/2025 |
| 4.1.0-preview.3 | 125 | 11/27/2025 |
| 4.1.0-preview.2 | 126 | 11/27/2025 |
| 4.1.0-preview.1 | 127 | 11/26/2025 |
| 4.0.1 | 159 | 11/22/2025 |
| 4.0.1-preview.8 | 142 | 11/22/2025 |
| 4.0.1-preview.7 | 152 | 11/22/2025 |
| 4.0.1-preview.6 | 136 | 11/22/2025 |
| 4.0.1-preview.5 | 199 | 11/21/2025 |
| 4.0.1-preview.4 | 202 | 11/21/2025 |
| 4.0.1-preview.3 | 208 | 11/21/2025 |
| 4.0.1-preview.2 | 231 | 11/21/2025 |
| 4.0.1-preview.1 | 241 | 11/21/2025 |
| 4.0.0 | 386 | 11/20/2025 |
| 4.0.0-preview.3 | 337 | 11/19/2025 |
| 4.0.0-preview.2 | 86 | 11/15/2025 |
| 4.0.0-preview.1 | 105 | 11/15/2025 |
| 3.10.0 | 168 | 11/15/2025 |
| 3.0.5-preview.15 | 148 | 11/14/2025 |
| 3.0.5-preview.14 | 221 | 11/12/2025 |
| 3.0.5-preview.13 | 222 | 11/12/2025 |
| 3.0.5-preview.12 | 222 | 11/11/2025 |
| 3.0.5-preview.11 | 224 | 11/11/2025 |
| 3.0.5-preview.10 | 219 | 11/11/2025 |
| 3.0.5-preview.9 | 215 | 11/10/2025 |
| 3.0.5-preview.8 | 87 | 11/8/2025 |
| 3.0.5-preview.7 | 87 | 11/8/2025 |
| 3.0.5-preview.6 | 86 | 11/8/2025 |
| 3.0.5-preview.5 | 88 | 11/8/2025 |
| 3.0.5-preview.4 | 66 | 11/7/2025 |
| 3.0.5-preview.3 | 130 | 11/4/2025 |
| 3.0.5-preview.2 | 132 | 11/4/2025 |
| 3.0.5-preview.1 | 135 | 11/4/2025 |
| 3.0.4 | 195 | 11/3/2025 |
| 3.0.4-preview.19 | 74 | 11/2/2025 |
| 3.0.4-preview.18 | 73 | 11/1/2025 |
| 3.0.4-preview.17 | 71 | 11/1/2025 |
| 3.0.4-preview.16 | 74 | 11/1/2025 |
| 3.0.4-preview.15 | 67 | 10/31/2025 |
| 3.0.4-preview.14 | 132 | 10/31/2025 |
| 3.0.4-preview.13 | 133 | 10/30/2025 |
| 3.0.4-preview.12 | 125 | 10/23/2025 |
| 3.0.4-preview.11 | 122 | 10/23/2025 |
| 3.0.4-preview.10 | 120 | 10/23/2025 |
| 3.0.4-preview.9 | 121 | 10/23/2025 |
| 3.0.4-preview.8 | 124 | 10/22/2025 |
| 3.0.4-preview.6 | 116 | 10/21/2025 |
| 3.0.4-preview.5 | 116 | 10/21/2025 |
| 3.0.4-preview.4 | 123 | 10/20/2025 |
| 3.0.4-preview.3 | 118 | 10/20/2025 |
| 3.0.4-preview.2 | 50 | 10/18/2025 |
| 3.0.4-preview.1 | 121 | 10/7/2025 |