Snail.Toolkit.Async.Pipeline
1.0.0
dotnet add package Snail.Toolkit.Async.Pipeline --version 1.0.0
NuGet\Install-Package Snail.Toolkit.Async.Pipeline -Version 1.0.0
<PackageReference Include="Snail.Toolkit.Async.Pipeline" Version="1.0.0" />
<PackageVersion Include="Snail.Toolkit.Async.Pipeline" Version="1.0.0" />
<PackageReference Include="Snail.Toolkit.Async.Pipeline" />
paket add Snail.Toolkit.Async.Pipeline --version 1.0.0
#r "nuget: Snail.Toolkit.Async.Pipeline, 1.0.0"
#:package Snail.Toolkit.Async.Pipeline@1.0.0
#addin nuget:?package=Snail.Toolkit.Async.Pipeline&version=1.0.0
#tool nuget:?package=Snail.Toolkit.Async.Pipeline&version=1.0.0
Snail.Toolkit.Async.Pipeline
AsyncPipeline<T> is a lightweight, fluent wrapper around IAsyncEnumerable<T>. It simplifies asynchronous stream processing in C# by providing LINQ-like operators, parallel execution capabilities, and seamless integration with standard collection types.
The SelectMany Operator
The SelectMany operator is used to flatten "nested" sequences. It projects each element of the source stream into a sub-sequence and flattens those sub-sequences into a single, continuous asynchronous stream.
1. Synchronous Flattening
Use this when each item in your async stream contains a standard collection (like List<T> or Array) that you want to expand.
// Example: Extracting all tags from a stream of Posts
IAsyncEnumerable<string> tagStream = postPipeline
.SelectMany(post => post.Tags); // post.Tags is a List<string>
2. Asynchronous Flattening
Use this when the transformation itself returns another IAsyncEnumerable<T>. This is common when querying child records from a database or an API for every item in the source.
// Example: Fetching comments for every user in the pipeline
var commentsPipeline = userPipeline
.SelectMany(user => commentService.GetCommentsAsync(user.Id));
3. Parallel Flattening
This overload allows you to process multiple source items concurrently. It triggers multiple sub-sequences at once and interleaves their results into the main pipeline as they arrive.
// Example: Scanning multiple directories for files in parallel
var filePipeline = directories
.ToPipeline()
.SelectMany(async (path, ct) => fileService.SearchFilesAsync(path, ct),
maxDegreeOfParallelism: 4);
Basic Usage & Entry Points
Creating a Pipeline
You can initialize a pipeline using the AsyncPipelineBuilder or the provided extension methods:
// From an IAsyncEnumerable
var pipe1 = myAsyncSource.ToPipeline();
// From a synchronous IEnumerable
var pipe2 = myList.ToPipeline();
// From a Task/ValueTask
var pipe3 = AsyncPipelineBuilder.From(GetSingleItemAsync());
Common Operations
The pipeline supports standard LINQ operations adapted for asynchrony:
| Method | Description |
|---|---|
Where |
Filters items based on a sync or async predicate. |
Select |
Transforms items (supports ordered parallel execution). |
SelectFast |
Transforms items in parallel, yielding results as fast as possible (unordered). |
Take / Skip |
Limits or bypasses a specific number of elements. |
Distinct |
Removes duplicate elements from the stream. |
Terminal Methods
These methods execute the pipeline and produce a result:
// Collect into a List
List<User> users = await pipeline.ToListAsync();
// Get the first item
User? first = await pipeline.FirstOrDefaultAsync();
// Aggregate values
int total = await pipeline.AggregateAsync(0, (sum, val) => sum + val);
Why use AsyncPipeline?
- Readable Syntax: Avoids deep nesting of
await foreachloops. - Performance: Built with
ValueTaskandSystem.Threading.Channelsto ensure high throughput with low allocation overhead. - Parallelism Made Easy: Simplifies complex
SemaphoreSlimlogic into a singlemaxDegreeOfParallelismparameter. - Cancellation Support: First-class support for
CancellationTokenacross all operators.
License
Snail.Toolkit.Async.Pipeline is a free and open source project, released under the permissible MIT license.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- System.Linq.Async (>= 7.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 65 | 4/13/2026 |