Snail.Toolkit.Async.Pipeline 1.0.0

dotnet add package Snail.Toolkit.Async.Pipeline --version 1.0.0
                    
NuGet\Install-Package Snail.Toolkit.Async.Pipeline -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Snail.Toolkit.Async.Pipeline" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Snail.Toolkit.Async.Pipeline" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="Snail.Toolkit.Async.Pipeline" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Snail.Toolkit.Async.Pipeline --version 1.0.0
                    
#r "nuget: Snail.Toolkit.Async.Pipeline, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Snail.Toolkit.Async.Pipeline@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Snail.Toolkit.Async.Pipeline&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=Snail.Toolkit.Async.Pipeline&version=1.0.0
                    
Install as a Cake Tool

Snail.Toolkit.Async.Pipeline

AsyncPipeline<T> is a lightweight, fluent wrapper around IAsyncEnumerable<T>. It simplifies asynchronous stream processing in C# by providing LINQ-like operators, parallel execution capabilities, and seamless integration with standard collection types.


The SelectMany Operator

The SelectMany operator is used to flatten "nested" sequences. It projects each element of the source stream into a sub-sequence and flattens those sub-sequences into a single, continuous asynchronous stream.

1. Synchronous Flattening

Use this when each item in your async stream contains a standard collection (like List<T> or Array) that you want to expand.

// Example: Extracting all tags from a stream of Posts
IAsyncEnumerable<string> tagStream = postPipeline
    .SelectMany(post => post.Tags); // post.Tags is a List<string>

2. Asynchronous Flattening

Use this when the transformation itself returns another IAsyncEnumerable<T>. This is common when querying child records from a database or an API for every item in the source.

// Example: Fetching comments for every user in the pipeline
var commentsPipeline = userPipeline
    .SelectMany(user => commentService.GetCommentsAsync(user.Id));

3. Parallel Flattening

This overload allows you to process multiple source items concurrently. It triggers multiple sub-sequences at once and interleaves their results into the main pipeline as they arrive.

// Example: Scanning multiple directories for files in parallel
var filePipeline = directories
    .ToPipeline()
    .SelectMany(async (path, ct) => fileService.SearchFilesAsync(path, ct), 
                maxDegreeOfParallelism: 4);

Basic Usage & Entry Points

Creating a Pipeline

You can initialize a pipeline using the AsyncPipelineBuilder or the provided extension methods:

// From an IAsyncEnumerable
var pipe1 = myAsyncSource.ToPipeline();

// From a synchronous IEnumerable
var pipe2 = myList.ToPipeline();

// From a Task/ValueTask
var pipe3 = AsyncPipelineBuilder.From(GetSingleItemAsync());

Common Operations

The pipeline supports standard LINQ operations adapted for asynchrony:

Method Description
Where Filters items based on a sync or async predicate.
Select Transforms items (supports ordered parallel execution).
SelectFast Transforms items in parallel, yielding results as fast as possible (unordered).
Take / Skip Limits or bypasses a specific number of elements.
Distinct Removes duplicate elements from the stream.

Terminal Methods

These methods execute the pipeline and produce a result:

// Collect into a List
List<User> users = await pipeline.ToListAsync();

// Get the first item
User? first = await pipeline.FirstOrDefaultAsync();

// Aggregate values
int total = await pipeline.AggregateAsync(0, (sum, val) => sum + val);

Why use AsyncPipeline?

  • Readable Syntax: Avoids deep nesting of await foreach loops.
  • Performance: Built with ValueTask and System.Threading.Channels to ensure high throughput with low allocation overhead.
  • Parallelism Made Easy: Simplifies complex SemaphoreSlim logic into a single maxDegreeOfParallelism parameter.
  • Cancellation Support: First-class support for CancellationToken across all operators.

License

Snail.Toolkit.Async.Pipeline is a free and open source project, released under the permissible MIT license.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 65 4/13/2026