PipeFlowCore 2.1.0-archived

This is a prerelease version of PipeFlowCore.
dotnet add package PipeFlowCore --version 2.1.0-archived
                    
NuGet\Install-Package PipeFlowCore -Version 2.1.0-archived
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="PipeFlowCore" Version="2.1.0-archived" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="PipeFlowCore" Version="2.1.0-archived" />
                    
Directory.Packages.props
<PackageReference Include="PipeFlowCore" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add PipeFlowCore --version 2.1.0-archived
                    
#r "nuget: PipeFlowCore, 2.1.0-archived"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package PipeFlowCore@2.1.0-archived
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=PipeFlowCore&version=2.1.0-archived&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=PipeFlowCore&version=2.1.0-archived&prerelease
                    
Install as a Cake Tool

PipeFlow

NuGet Build Status License: MIT .NET

A modern, high-performance ETL pipeline library for .NET with builder pattern, async/await support, and Entity Framework integration. Process CSV, JSON, Excel, SQL and MongoDB data with minimal memory usage and maximum performance.

Installation

dotnet add package PipeFlowCore

Quick Start

// Modern builder pattern with lazy execution
var pipeline = PipeFlowBuilder
    .FromCsv("input.csv")
    .Filter(row => row["Status"] == "Active")
    .Build();

// Execute with async/await and CancellationToken
var result = await pipeline.ExecuteAsync(cancellationToken);

// Consistent From/To naming
await PipeFlowBuilder
    .FromSql(connectionString, "SELECT * FROM Orders")
    .ToExcelAsync("orders.xlsx", cancellationToken);

Supported Data Sources

PipeFlow can read from and write to:

  • CSV files
  • JSON files
  • Excel files (xlsx)
  • SQL Server databases
  • PostgreSQL databases (NEW)
  • MongoDB collections
  • REST APIs
  • In-memory collections

Key Features

  • Builder Pattern: Build complex pipelines without immediate execution
  • Async/Await: Full async support with CancellationToken
  • Entity Framework Integration: Direct support for IQueryable and EF Core
  • Lazy Execution: Pipelines execute only when explicitly called
  • Consistent API: From/To pattern for all data sources
  • Memory Efficient: Streaming support for large datasets
  • Parallel Processing: Built-in support for parallel execution
  • Type Safe: Strong typing with generics support
  • Extensible: Easy to add custom data sources and destinations

Examples

CSV Processing with Builder Pattern

// Build pipeline (not executed yet)
var pipeline = PipeFlowBuilder
    .FromCsv("sales.csv")
    .Filter(row => row.GetValue<decimal>("Amount") > 1000)
    .Map(row => new {
        Product = row["ProductName"],
        Revenue = row.GetValue<decimal>("Amount") * row.GetValue<int>("Quantity")
    })
    .Build();

// Execute when ready
var result = await pipeline.ExecuteAsync();
if (result.Success)
{
    await PipeFlowBuilder
        .FromCollection(result.Data)
        .ToCsvAsync("high_value_sales.csv");
}

Database Operations with Async Support

// Export from SQL Server with async
await PipeFlowBuilder
    .FromSql(connectionString, "SELECT * FROM Products WHERE InStock = 1")
    .ToJsonAsync("products.json", cancellationToken);

// Import to SQL Server
await PipeFlowBuilder
    .FromExcel("inventory.xlsx")
    .ToSqlAsync(connectionString, "Inventory", options =>
    {
        options.UseBulkInsert = true;
        options.BatchSize = 1000;
    }, cancellationToken);

PostgreSQL Support

// Read from PostgreSQL
var pipeline = PipeFlowBuilder
    .FromPostgreSql(connectionString, "SELECT * FROM products WHERE price > @minPrice", 
        new { minPrice = 100 })
    .Filter(row => row["in_stock"] == true)
    .Build();

// Write to PostgreSQL with upsert
await PipeFlowBuilder
    .FromCsv("products.csv")
    .ToPostgreSqlAsync(connectionString, "products", options =>
    {
        options.CreateTableIfNotExists = true;
        options.OnConflictUpdate("product_id");
        options.UseBulkInsert = true;
    }, cancellationToken);

Entity Framework Integration

// Read from Entity Framework with paging
await PipeFlowBuilder
    .FromQueryable(context.Customers.Where(c => c.IsActive))
    .WithPaging(pageSize: 500)
    .Map(c => new SupplierDto
    {
        Name = c.CompanyName,
        Email = c.Email
    })
    .ToEntityFrameworkAsync(context, options =>
    {
        options.UpsertPredicate = s => x => x.Email == s.Email;
        options.BatchSize = 100;
        options.UseTransaction = true;
    }, cancellationToken);

MongoDB Integration

// Query MongoDB
PipeFlow.From.MongoDB("mongodb://localhost", "store", "products")
    .Where("category", "Electronics")
    .Sort("price", ascending: false)
    .WriteToCsv("electronics.csv");

// Update MongoDB from CSV
PipeFlow.From.Csv("product_updates.csv")
    .ToMongoDB("mongodb://localhost", "store", "products", writer => writer
        .WithUpsert("sku")
        .WithBatchSize(500));

REST API Support

// Fetch from API
PipeFlow.From.Api("https://api.example.com/data")
    .Filter(item => item["active"] == true)
    .WriteToJson("active_items.json");

// Send to API with new builder pattern
await PipeFlowBuilder
    .FromCsv("upload.csv")
    .ToApiAsync("https://api.example.com/import", options =>
    {
        options.AuthToken = "api-key";
        options.BatchSize = 100;
    }, cancellationToken);

Data Transformations

// Complex transformations
PipeFlow.From.Csv("raw_data.csv")
    .RemoveDuplicates("Id")
    .FillMissing("Email", "unknown@example.com")
    .Map(row => {
        row["Email"] = row["Email"].ToString().ToLower();
        row["FullName"] = $"{row["FirstName"]} {row["LastName"]}";
        return row;
    })
    .GroupBy(row => row["Department"])
    .Select(group => new {
        Department = group.Key,
        EmployeeCount = group.Count(),
        AverageSalary = group.Average(r => r.GetValue<decimal>("Salary"))
    })
    .WriteToJson("department_summary.json");

Parallel Processing

For better performance with large datasets:

var pipeline = PipeFlowBuilder
    .FromCsv("large_file.csv")
    .AsParallel(maxDegreeOfParallelism: 8)
    .Map(row => {
        row["Hash"] = ComputeHash(row["Data"]);
        return row;
    })
    .Build();

var result = await pipeline.ExecuteAsync(cancellationToken);
await PipeFlowBuilder
    .FromCollection(result.Data)
    .ToCsvAsync("processed.csv", cancellationToken);

Data Validation

var validator = new DataValidator()
    .Required("Id", "Email")
    .Email("Email")
    .Range("Age", min: 0, max: 120);

PipeFlow.From.Csv("users.csv")
    .Validate(validator)
    .OnInvalid(ErrorStrategy.LogAndSkip)
    .WriteToCsv("valid_users.csv");

Performance

PipeFlow uses streaming to process data efficiently. Memory usage remains constant regardless of file size.

Benchmark results (1M records):

  • CSV read/write: ~3 seconds
  • JSON processing: ~5 seconds
  • SQL operations: ~4 seconds
  • Parallel processing: ~1.5 seconds (8 cores)

Configuration

PipeFlowConfig.Configure(config => {
    config.DefaultCsvDelimiter = ',';
    config.BufferSize = 8192;
    config.ThrowOnMissingColumns = false;
});

Requirements

  • .NET 6.0 or later
  • SQL Server 2012+ (for SQL features)
  • MongoDB 4.0+ (for MongoDB features)

Benchmarks

PipeFlow is optimized for performance:

Operation Records Time Memory
CSV Read 1M ~3s <50MB
Parallel Processing 1M ~1.5s <100MB
Streaming 10M ~30s <50MB

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

Changelog

See CHANGELOG.md for version history and release notes.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

Berkant - GitHub

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.0-archived 110 4/15/2026
1.0.0 372 9/8/2025