rio-command-pipeline
1.0.3
dotnet add package rio-command-pipeline --version 1.0.3
NuGet\Install-Package rio-command-pipeline -Version 1.0.3
<PackageReference Include="rio-command-pipeline" Version="1.0.3" />
paket add rio-command-pipeline --version 1.0.3
#r "nuget: rio-command-pipeline, 1.0.3"
// Install rio-command-pipeline as a Cake Addin #addin nuget:?package=rio-command-pipeline&version=1.0.3 // Install rio-command-pipeline as a Cake Tool #tool nuget:?package=rio-command-pipeline&version=1.0.3
Command Pipeline
An simple asynchronous EventHandler stream.
Overview
My goal for this project was to create a simple, efficient, and small API for handling events in .NET ecosystems asynchronously. This project does not utilize RX or .NET's built in Observer system. Instead, the goal was to create an EventHandler with the following characteristics:
- Returns a Task
- Is asynchronous (awaitable)
- Allows for various callbacks in the pipeline
- Catches and propagates Exceptions
- Is agnostic to the project type
- Keep it lightweight
- Does not use RX or TPL Dataflow
Features
- Subscribe to various processes within a pipeline: OnStart, OnStartAsync, OnSignalAsync, OnEnd, OnEndAsync, OnFinally, OnFinallyAsync, OnErrorCaught and OnErrorThrown
- API returns an awaitable Task when 'SignalAsync' is invoked
- You can choose to skip running the entire pipeline process and simply invoke specific pipeline processes (such as a specific callback)
- The auxiliary callbacks are completley optional, but allow the consumer to prep data, ensure an asynchronous method is in an appropriate state before firing, and exception handling.
- Is application agnostic (.NET)
Built With
- JetBrains Rider
- Tested with WPF & Blazor WASM
Getting Started
Clone/fork this repository and add a reference to the Command Pipeline project. OR, add this as a Nuget package to your project.
You can instantiate CommandPipelines as needed:
var pipeline = new CommandPipeline();
This project supports logging via Microsoft.Extensions.Logging.ILogger. The base constructor for CommandPipeline accepts an ILogger:
ILogger logger = new Logger<CommandPipeline>(new LoggerFactory());
var pipeline = new CommandPipeline(logger);
Once a pipeline instance is created, access to the fluent API becomes available:
Prerequisites and Dependencies
- .NET 6
- C# 10
- Microsoft.Extensions.Logging.7.0.0
Please feel free to contact me with any issues or concerns in regards to the dependencies defined above. We can work around the majority of them if needed.
Installation
- Clone or fork this repository. Once done, add a reference to this library in your project
- Download the latest dll and create a reference to it in your project
- Install via NPM
Usage
Standalone Instance(s)
using Rio.CommandPipeline;
public class Test {
public async Task MyTestMethod() {
// Create a new command pipeline
var pipeline = new CommandPipeline();
// Register a command with the pipeline
// The method signature must be CommandPipeline.CommandPipelineDelegate:
// delegate Task PipelineDelegate(object? o, PipelineObject? pObj, CancellationToken token)
pipeline.RegisterWork(MyWorkMethodAsync);
// Register optional callbacks
// Read through the available fluent methods to see what you can do
pipeline.RegisterOnStart(OnStart)
.RegisterOnEnd(OnEnd);
// The fluent API allows you to chain method calls
// We can also register asynchronous callbacks
// Register asynchronous callbacks requires the method signature to match CommandPipeline.CommandPipelineDelegate
pipeline.RegisterOnStartAsync(OnStartAsync)
.RegisterOnEndAsync(OnEndAsync);
// Start the pipeline
// PipelineObject are analogous to EventArgs
// You should create a derived class from PipelineObject to allow for transmission through the pipeline
// This class should contain any required information or state for the work registered to the pipeline;
// you decide.
await pipeline.SignalAsync(new PipelineObject());
// await pipeline.SignalAsync(PipelineObject.Empty);
// You can rerun this pipeline as needed.
// If needed, unregister any work or callbacks
pipeline.UnregisterOnStartAsync(OnStartAsync);
pipeline.UnregisterOnEnd(OnEnd);
// All fluent API methods allow for multiple registrations
pipeline.RegisterWork(MyWorkMethodAsync, MyWorkMethodAsync, MyWorkMethodAsync);
pipeline.RegisterOnStart(OnStart, OnStart, OnStart, OnStart);
// Errors will be caught and thrown
// ErrorCaught is invoked first, followed by ErrorThrown
// This allows you to handle errors as they are caught, followed by thrown
pipeline.RegisterOnErrorCaught(OnErrorCaught);
pipeline.RegisterOnErrorThrown(OnErrorThrown);
}
async Task MyWorkMethodAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {
// Do some work
await Task.Delay(5000, token);
}
void OnStart(PipelineObject? pipelineObject) {
// Do something when the pipeline starts
}
void OnEnd(PipelineObject? pipelineObject) {
// Do something when the pipeline ends
}
void OnErrorCaught(PipelineObject? pipelineObject, Exception exception) {
// Do something when an exception is caught
}
void OnErrorThrown(PipelineObject? pipelineObject, Exception exception) {
// Do something when an exception is thrown
}
Task OnStartAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {
// Do something when the pipeline starts asynchronously
return Task.CompletedTask;
}
Task OnEndAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {
// Do something when the pipeline ends asynchronously
return Task.CompletedTask;
}
}
Using the CommandPipelineBroker
using Rio.CommandPipeline;
public class Test {
public async Task MyTestMethod() {
// Instantiate a new CommandPipelineBroker
// The default takes params string[] ids
// These ids are used to register new instances of ICommandPipeline to an internal container.
// The container key is the id.
// You can register as many ids as you want.
// With an instance of CommandPipelineBroker, you can access the ICommandPipeline instances via the indexer.
var pipelineBroker1 = new CommandPipelineBroker("myIdOne", "myIdTwo");
// You also do not need to provide any ids
var pipelineBroker2 = new CommandPipelineBroker();
var pipelineOne = pipelineBroker1["myIdOne"];
var pipelineTwo = pipelineBroker1["myIdTwo"];
// You can also add new instances of ICommandPipeline to the broker.
// These pipeline instances functional identically to the "StandAlone" example.
pipelineOne.RegisterWork(MyWorkMethodAsync);
pipelineTwo.RegisterOnFinally(OnEnd);
// You can signal the pipeline to start by calling SignalAsync after retrieving an instance from the broker.
await pipelineOne.SignalAsync();
//await pipelineOne.SignalAsync(PipelineObject.Empty, new CancellationToken());
// Or let the broker do it for you.
await pipelineBroker1.SignalAsync("myIdOne");
// await pipelineBroker1.SignalAsync("myIdOne", PipelineObject.Empty, new CancellationToken());
// You can also register event handlers to the pipeline.
pipelineBroker1.Register("myNewPipeline", new CommandPipeline());
}
async Task MyWorkMethodAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {
// Do some work
await Task.Delay(5000, token);
}
void OnEnd(PipelineObject? pipelineObject) {
// Do something when the pipeline ends
}
}
Roadmap
There is currently no future features planned.
Contributing
Contributions are absolutely welcome. This is an open source project.
- Fork the repository
- Create a feature branch
git checkout -b feature/your-feature-branch
- Commit changes on your feature branch
git commit -m 'Summary feature'
- Push your changes to your branch
git push origin feature/your-feature-branch
- Open a pull request to merge/incorporate your feature
License
Distributed under the MIT License.
Contact
Acknowledgments and Credit
- Stephen Cleary's Blog
- In particular, his blog on Async Events in OOP provided me with inspiration for this.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net7.0
- Microsoft.Extensions.Logging (>= 7.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Official version 1 release