Bielu.PersistentQueues.Storage.ZoneTree
0.6.0-beta.639117966284424630
Prefix Reserved
dotnet add package Bielu.PersistentQueues.Storage.ZoneTree --version 0.6.0-beta.639117966284424630
NuGet\Install-Package Bielu.PersistentQueues.Storage.ZoneTree -Version 0.6.0-beta.639117966284424630
<PackageReference Include="Bielu.PersistentQueues.Storage.ZoneTree" Version="0.6.0-beta.639117966284424630" />
<PackageVersion Include="Bielu.PersistentQueues.Storage.ZoneTree" Version="0.6.0-beta.639117966284424630" />
<PackageReference Include="Bielu.PersistentQueues.Storage.ZoneTree" />
paket add Bielu.PersistentQueues.Storage.ZoneTree --version 0.6.0-beta.639117966284424630
#r "nuget: Bielu.PersistentQueues.Storage.ZoneTree, 0.6.0-beta.639117966284424630"
#:package Bielu.PersistentQueues.Storage.ZoneTree@0.6.0-beta.639117966284424630
#addin nuget:?package=Bielu.PersistentQueues.Storage.ZoneTree&version=0.6.0-beta.639117966284424630&prerelease
#tool nuget:?package=Bielu.PersistentQueues.Storage.ZoneTree&version=0.6.0-beta.639117966284424630&prerelease
Bielu.PersistentQueues - Fast Persistent Queues for .NET
Bielu.PersistentQueues is a high-performance, lightweight, store-and-forward message queue for .NET applications with pluggable storage backends. It ensures fast and durable persistence for sending and receiving messages, making it an excellent choice for lightweight and cross-platform message queuing needs.
This project is a fork of LightningQueues, originally created by Corey Kaylor. It extends the original with additional features, bug fixes, and alignment with the bielu package ecosystem.
Why Bielu.PersistentQueues?
- Simple API: Easily interact with the message queue through an intuitive API.
- Strongly-Typed Messages: Send, enqueue, and receive messages as C# objects — serialization is handled automatically via DI.
- Pluggable Storage: Choose from multiple storage backends (LMDB included, more coming).
- Microsoft DI Integration: First-class support for
Microsoft.Extensions.DependencyInjectionwith a fluent builder API. - No Administration: Unlike MSMQ or other Server / Brokers, it requires zero administrative setup.
- XCopy Deployable: No complex installation; just copy and run.
- Cross-Platform: Works on Windows, macOS, and Linux.
- Durable Storage: High-performance reliable message storage.
- TLS Encryption: Optionally secure your transport layer. You have full control.
- Batch Receive: Efficiently receive and process messages in batches.
Packages
Installation
Install the core package and the storage provider you want to use:
dotnet add package Bielu.PersistentQueues
dotnet add package Bielu.PersistentQueues.Storage.LMDB
Getting Started
Using Microsoft Dependency Injection (Recommended)
The recommended way to configure Bielu.PersistentQueues is via the fluent DI builder API:
using Bielu.PersistentQueues;
using Bielu.PersistentQueues.Storage.LMDB;
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("C:\\path_to_your_queue_folder")
.ListenOn(new IPEndPoint(IPAddress.Loopback, 5050))
.CreateQueues("my-queue");
});
You can also customize the LMDB storage configuration:
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("C:\\path_to_your_queue_folder", config =>
{
config.EnvironmentConfiguration = new LightningDB.EnvironmentConfiguration
{
MaxDatabases = 10,
MapSize = 1024 * 1024 * 500 // 500 MB
};
config.StorageOptions = LmdbStorageOptions.MaxThroughput();
})
.AutomaticEndpoint()
.CreateQueues("queue-a", "queue-b");
});
Then inject IQueue wherever you need it:
public class MyService(IQueue queue)
{
public void SendMessage()
{
// Strongly-typed: serialization is handled automatically
queue.Send(new OrderMessage("ORD-001", 99.99m, "USD"),
destinationUri: "lq.tcp://localhost:5050",
queueName: "my-queue");
}
public void EnqueueMessage()
{
// Enqueue a typed object for local processing
queue.Enqueue(new OrderMessage("ORD-002", 49.99m, "EUR"),
queueName: "my-queue");
}
}
You can also send raw Message objects if you need full control:
queue.Send(new Message
{
Data = "hello"u8.ToArray(),
Id = MessageId.GenerateRandom(),
Queue = "my-queue",
Destination = new Uri("lq.tcp://localhost:5050")
});
Manual Configuration (Without DI)
You can also configure the queue manually using QueueConfiguration:
using Bielu.PersistentQueues;
var queue = new QueueConfiguration()
.WithDefaults()
.StoreWithLmdb("C:\\path_to_your_queue_folder")
.BuildAndStart("queue-name");
Sending Messages
Send strongly-typed objects — the content serializer (configured via DI or defaulting to JSON) handles serialization automatically:
// Strongly-typed: serialize and send a C# object
queue.Send(new OrderMessage("ORD-001", 99.99m, "USD"),
destinationUri: "lq.tcp://localhost:port",
queueName: "queue-name");
// Strongly-typed: enqueue for local processing
queue.Enqueue(new OrderMessage("ORD-002", 49.99m, "EUR"),
queueName: "queue-name");
Or send raw Message objects when you need full control over serialization:
var message = new Message
{
Data = "hello"u8.ToArray(),
Id = MessageId.GenerateRandom(),
Queue = "queue-name",
Destination = new Uri("lq.tcp://localhost:port")
};
queue.Send(message);
Receiving Messages
await foreach (var msg in queue.Receive("queue-name", token))
{
// Deserialize the message payload to a strongly-typed object
var order = msg.Message.GetContent<OrderMessage>();
// Process the message and respond with one or more of the following:
msg.QueueContext.SuccessfullyReceived(); // Done processing
msg.QueueContext.ReceiveLater(TimeSpan.FromSeconds(1)); // Retry later
msg.QueueContext.Enqueue(msg.Message); // Re-enqueue to same/other queue
msg.QueueContext.Send(msg.Message); // Send to another endpoint
msg.QueueContext.MoveTo("other-queue"); // Move to different queue
// Strongly-typed context operations (serializer resolved from DI):
msg.QueueContext.Enqueue(new FollowUpMessage(...), queueName: "follow-ups");
msg.QueueContext.Send(new ResponseMessage(...), destinationUri: "lq.tcp://...");
msg.QueueContext.CommitChanges(); // Commit all changes atomically
}
Dead Letter Queue (DLQ)
Bielu.PersistentQueues has built-in support for a dead letter queue. When a message cannot be processed after a configurable number of attempts, it is automatically moved to the DLQ — or you can explicitly dead-letter it from your consumer code.
How It Works
- There is a single, shared dead letter queue named
dead-letter. - The DLQ is created automatically when any queue is created (and DLQ is enabled).
- Dead-lettered messages carry an
original-queueheader so you always know where they came from. - A processing-attempts counter is incremented on every
ReceiveLatercall.
Message ID Persistence
Message IDs remain constant throughout the entire message lifecycle, including:
- Multiple
ReceiveLateroperations (retry/defer) - Moving to the dead letter queue (automatic or explicit)
- Moving between queues with
MoveTo - Reading from storage multiple times
Each MessageId consists of two components:
SourceInstanceId: Identifies the queue instance that generated the messageMessageIdentifier: A unique GUID (using COMB algorithm for performance)
Both components are preserved across all operations, enabling reliable message tracking, deduplication, and correlation in distributed systems. This persistence is guaranteed by the immutable Message struct design.
Automatic Dead-Lettering (MaxAttempts)
Set maxAttempts when creating a message. When ReceiveLater is called and the processing attempt count reaches the limit, the message is moved to the DLQ automatically:
// Create a message that will be dead-lettered after 3 failed processing attempts
var message = Message.Create(
data: Encoding.UTF8.GetBytes("order-data"),
queue: "orders",
maxAttempts: 3);
queue.Enqueue(message);
// Consumer — each ReceiveLater call increments the processing attempt counter.
// After 3 attempts, the message is auto-moved to "dead-letter".
await foreach (var ctx in queue.Receive("orders", cancellationToken: token))
{
try
{
ProcessOrder(ctx.Message);
ctx.QueueContext.SuccessfullyReceived();
}
catch
{
ctx.QueueContext.ReceiveLater(TimeSpan.FromSeconds(5));
}
ctx.QueueContext.CommitChanges();
}
Explicit Dead-Lettering
You can also move a message to the DLQ manually:
await foreach (var ctx in queue.Receive("orders", cancellationToken: token))
{
if (IsPoisonMessage(ctx.Message))
{
ctx.QueueContext.MoveToDeadLetter(); // → "dead-letter" queue
ctx.QueueContext.CommitChanges();
continue;
}
// normal processing …
}
Batch consumers can dead-letter individual messages or the entire batch:
await foreach (var batch in queue.ReceiveBatch("orders", maxMessages: 10, cancellationToken: token))
{
var poison = batch.Messages.Where(IsPoisonMessage).ToArray();
var good = batch.Messages.Except(poison).ToArray();
batch.MoveToDeadLetter(poison); // dead-letter the bad ones
batch.SuccessfullyReceived(good); // acknowledge the good ones
batch.CommitChanges();
}
You can also mix subset operations with batch-wide operations. Messages already processed by subset operations are automatically excluded from batch-wide calls:
await foreach (var batch in queue.ReceiveBatch("orders", maxMessages: 10, cancellationToken: token))
{
// Process some messages individually
batch.ReceiveLater(new[] { msg1.Id.MessageIdentifier }, TimeSpan.FromMinutes(5));
batch.MoveToDeadLetter(new[] { msg2.Id.MessageIdentifier });
// Then successfully receive the rest of the batch
// This will only affect messages not already processed above
batch.SuccessfullyReceived();
batch.CommitChanges();
}
Inspecting DLQ Messages
foreach (var msg in queue.Store.PersistedIncoming(DeadLetterConstants.QueueName))
{
Console.WriteLine($"Original queue: {msg.OriginalQueue}");
Console.WriteLine($"Attempts: {msg.ProcessingAttempts}");
Console.WriteLine($"Data: {Encoding.UTF8.GetString(msg.Data.Span)}");
}
Requeuing DLQ Messages
Once you've fixed the underlying issue, you can move all messages from the DLQ back to their original queues in a single call. Processing attempt counters are reset to zero:
int count = queue.RequeueDeadLetterMessages();
Console.WriteLine($"Requeued {count} messages back to their original queues");
Clearing DLQ Messages
If you want to permanently remove all messages from the DLQ without requeuing them:
int count = queue.ClearDeadLetterQueue();
Console.WriteLine($"Permanently deleted {count} messages from the dead letter queue");
Warning: This operation permanently deletes all messages in the DLQ and cannot be undone. Use with caution.
Enabling the DLQ
The DLQ is disabled by default. Enable it via the builder API:
Using DI:
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("./queue_data")
.WithDeadLetterQueue() // ← enable dead letter queue with default settings
.CreateQueues("my-queue");
});
Manual configuration:
var queue = new QueueConfiguration()
.WithDefaults()
.WithDeadLetterQueue()
.StoreWithLmdb("./queue_data")
.BuildAndStart("my-queue");
When the DLQ is not enabled:
ReceiveLaternever auto-moves messages to the DLQ, even whenMaxAttemptsis exceeded — the message is retried indefinitely.- Outgoing messages that fail all send retries are silently discarded.
- Calling
MoveToDeadLetter()throwsInvalidOperationException.
Global Max Attempts
In addition to per-message maxAttempts, you can configure a global maximum that applies to all messages:
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("./queue_data")
.WithDeadLetterQueue(options =>
{
options.GlobalMaxAttemptsForMessage = 5; // Default is 3
})
.CreateQueues("my-queue");
});
How it works:
- If a message has
maxAttemptsset, the lower of the two values takes precedence. - Example:
GlobalMaxAttemptsForMessage = 5, message hasmaxAttempts: 10→ DLQ after 5 attempts. - Example:
GlobalMaxAttemptsForMessage = 5, message hasmaxAttempts: 3→ DLQ after 3 attempts. - Messages without
maxAttemptsare dead-lettered after reaching the global max.
DLQ Metrics (OpenTelemetry)
When OpenTelemetry is enabled, the following DLQ-specific metrics are emitted:
| Metric | Type | Description |
|---|---|---|
bielupersistentqueues.messages.dead_lettered |
Counter | Total dead-lettered messages (tags: queue.name, reason) |
bielupersistentqueues.dead_letter.queue.depth |
Gauge | Current message count in the DLQ |
Reason values: manual, max_processing_attempts, send_failed.
Architecture
Pluggable Storage
The core library defines a storage-agnostic IMessageStore interface with IStoreTransaction for atomic operations.
Storage providers are distributed as separate NuGet packages, each implementing these interfaces.
Currently available:
- LMDB (
Bielu.PersistentQueues.Storage.LMDB) — High-performance embedded key-value store powered by LightningDB. - ZoneTree (
Bielu.PersistentQueues.Storage.ZoneTree) — Pure C# LSM-tree storage with no native dependencies.
Extensibility:
- Additional storage backends can be added by creating a package that references
Bielu.PersistentQueuesand implementsIMessageStoreandIStoreTransaction.
Storage Provider Comparison
Two storage backends are available. Choose based on your requirements:
| Feature | LMDB | ZoneTree |
|---|---|---|
| Implementation | C wrapper (LightningDB) | Pure C# (LSM tree) |
| Native dependencies | Yes (lmdb) | None |
| Transaction model | Full ACID | Buffered commit |
| Memory mapping | Yes (mmap) | No |
| Best for | Low-latency point lookups | Write-heavy workloads |
Benchmark Results
Benchmarked on Linux (Ubuntu 24.04), AMD EPYC 7763, .NET 10.0.5. Times are in microseconds (μs). Lower is better.
Small batches (100 messages)
| Operation | LMDB (64B) | ZoneTree (64B) | LMDB (512B) | ZoneTree (512B) |
|---|---|---|---|---|
| StoreIncoming | 308 μs | 171 μs | 404 μs | 204 μs |
| GetMessage | 655 μs | 506 μs | 717 μs | 486 μs |
| PersistedIncoming | 595 μs | 608 μs | 948 μs | 837 μs |
| DeleteIncoming | 808 μs | 446 μs | 762 μs | 289 μs |
| QueueCycle | 848 μs | 668 μs | 966 μs | 656 μs |
| OutgoingCycle | 24,987 μs | 828 μs | 25,402 μs | 844 μs |
Larger batches (1,000 messages)
| Operation | LMDB (64B) | ZoneTree (64B) | LMDB (512B) | ZoneTree (512B) |
|---|---|---|---|---|
| StoreIncoming | 1,541 μs | 2,041 μs | 2,117 μs | 2,149 μs |
| GetMessage | 4,406 μs | 4,703 μs | 4,863 μs | 5,060 μs |
| PersistedIncoming | 4,254 μs | 5,161 μs | 4,390 μs | 5,172 μs |
| DeleteIncoming | 2,871 μs | 3,575 μs | 3,664 μs | 3,766 μs |
| QueueCycle | 5,268 μs | 6,719 μs | 6,028 μs | 6,746 μs |
| OutgoingCycle | 261,570 μs | 8,298 μs | 335,853 μs | 8,341 μs |
Update date benchmark results: https://bielu.github.io/Bielu.PersistentQueues/benchmarks/
Summary
- ZoneTree excels at small-batch operations (100 messages) where it is consistently faster across all operations, particularly for store and delete.
- LMDB excels at larger batches (1,000+ messages) where its transactional batch commit model amortizes overhead and its B+ tree provides excellent read locality.
- LMDB has lower memory allocations due to zero-copy reads from memory-mapped files.
- In the benchmarked outgoing message workflow using
PersistedOutgoing()with per-messageSuccessfullySent(), ZoneTree dramatically outperforms LMDB at all scales measured (30x faster at 1K messages). - These outgoing-cycle results do not measure the production raw/bulk send path (
PersistedOutgoingRaw+SuccessfullySentByIds), so they should not be treated as a universal conclusion for all send scenarios. - Choose LMDB for read-heavy, large-batch, low-latency workloads with available native dependencies.
- Choose ZoneTree for write-heavy, small-batch workloads or when native dependencies are not available, especially when your usage matches the benchmarked API pattern above.
Custom Storage Providers
To implement your own storage provider:
- Create a new project referencing
Bielu.PersistentQueues - Implement
IMessageStoreandIStoreTransaction - Create an extension method on
PersistentQueuesBuilderto register your storage:
public static PersistentQueuesBuilder AddMyStorage(
this PersistentQueuesBuilder builder,
string connectionString)
{
builder.UseStorage(sp =>
{
// Create and return your IMessageStore implementation
return new MyMessageStore(connectionString);
});
return builder;
}
Strongly-Typed Messages
Bielu.PersistentQueues supports sending, enqueuing, and receiving strongly-typed C# objects as message payloads. Serialization is handled automatically — the content serializer is resolved from DI, so you never need to manually serialize or pass a serializer instance.
Sending and Enqueuing Typed Objects
The IQueue and IQueueContext interfaces include Send<T> and Enqueue<T> methods that accept any object and serialize it using the DI-configured IContentSerializer (JSON by default):
// Send a typed object to a remote endpoint
queue.Send(new OrderMessage("ORD-001", 99.99m, "USD"),
destinationUri: "lq.tcp://localhost:5050",
queueName: "orders");
// Enqueue a typed object for local processing
queue.Enqueue(new OrderMessage("ORD-002", 49.99m, "EUR"),
queueName: "orders");
Inside a message handler, the same methods are available on IQueueContext:
await foreach (var msg in queue.Receive("orders", token))
{
var order = msg.Message.GetContent<OrderMessage>();
// Enqueue a follow-up message (serializer from DI)
msg.QueueContext.Enqueue(new AuditEvent(order.OrderId, "processed"),
queueName: "audit");
// Send a response to another endpoint
msg.QueueContext.Send(new OrderConfirmation(order.OrderId),
destinationUri: "lq.tcp://notifications:5050",
queueName: "confirmations");
msg.QueueContext.SuccessfullyReceived();
msg.QueueContext.CommitChanges();
}
Deserializing Message Content
Use Message.GetContent<T>() to deserialize the Data payload:
var order = message.GetContent<OrderMessage>();
Custom Content Serializer
The default serializer uses System.Text.Json. You can replace it globally by registering your own IContentSerializer before calling AddPersistentQueues:
// Register a custom serializer (e.g., MessagePack, Protobuf)
services.AddSingleton<IContentSerializer, MyMessagePackSerializer>();
// Queue will automatically use it for all Send<T>/Enqueue<T>/GetContent<T> calls
services.AddPersistentQueues(builder => { ... });
For one-off overrides, extension methods accept an explicit serializer:
var msgPackSerializer = new MyMessagePackSerializer();
queue.Send(order, msgPackSerializer, destinationUri: "lq.tcp://...", queueName: "orders");
queue.Enqueue(order, msgPackSerializer, queueName: "orders");
When using manual configuration (without DI), you can configure the serializer via the builder:
var queue = new QueueConfiguration()
.WithDefaults()
.SerializeContentWith(new MyMessagePackSerializer())
.StoreWithLmdb("C:\\queue_path")
.BuildAndStart("queue-name");
Running Tests
To ensure everything is running smoothly, clone the repository and run:
dotnet test src/Bielu.PersistentQueues.slnx
OpenTelemetry Integration
Bielu.PersistentQueues provides built-in OpenTelemetry support for distributed tracing and metrics through the PersistentQueueOtelDecorator. This enables comprehensive observability of your queue operations.
Installation
dotnet add package Bielu.PersistentQueues.OpenTelemetry
Usage
Add instrumentation to your service collection to automatically decorate the queue with OpenTelemetry:
using Bielu.PersistentQueues.OpenTelemetry;
services.AddBieluPersistentQueueInstrumentation();
Metrics Collected
The decorator automatically collects the following metrics:
bielupersistentqueues.messages.sent- Counter: Total number of messages sentbielupersistentqueues.messages.received- Counter: Total number of messages receivedbielupersistentqueues.messages.enqueued- Counter: Total number of messages enqueuedbielupersistentqueues.operations.errors- Counter: Total number of operation errorsbielupersistentqueues.message.processing.duration- Histogram: Duration of message processing in millisecondsbielupersistentqueues.batch.size- Histogram: Number of messages in batchesbielupersistentqueues.queues.active- Gauge: Number of currently active queuesbielupersistentqueues.storage.used_bytes- Gauge: Number of bytes currently used by the storagebielupersistentqueues.storage.total_bytes- Gauge: Total number of bytes allocated for the storagebielupersistentqueues.storage.usage_percent- Gauge: Percentage of storage currently in use (0–100%)
All metrics include relevant dimensions such as queue.name, operation, and batch.size.
Note: Storage usage metrics are automatically registered when the underlying store supports usage reporting (e.g., LMDB). No additional configuration is needed beyond enabling OpenTelemetry instrumentation.
Distributed Tracing
The decorator creates OpenTelemetry activities (spans) for all queue operations:
- Producer operations:
Send,SendBatch,Enqueue - Consumer operations:
Receive,ReceiveBatch,ProcessMessage,ProcessBatch - Internal operations:
CreateQueue,Start,MoveToQueue,ReceiveLater
Each span includes tags such as:
queue.name- The queue namemessage.id- Unique message identifierdestination- Message destination endpointbatch.size- Number of messages in batch operationsdelay.seconds- Delay time for scheduled messages
Error Tracking
All operations are wrapped with exception handling that:
- Records error metrics with operation context
- Sets activity status to
Error - Adds exception details to the span
- Preserves original exception for caller handling
Example: Full Configuration with OpenTelemetry
using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
using Bielu.PersistentQueues.OpenTelemetry;
// Configure OpenTelemetry
services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddBieluPersistentQueuesInstrumentation())
.WithTracing(tracing => tracing
.AddBieluPersistentQueuesInstrumentation());
// Configure queue
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("C:\\queue_path")
.ListenOn(new IPEndPoint(IPAddress.Loopback, 5050))
.CreateQueues("my-queue");
});
// Add instrumentation
services.AddBieluPersistentQueueInstrumentation();
Partitioned Queues (Kafka-like Partitioning)
Bielu.PersistentQueues supports Kafka-like partitioning to enable parallel consumption and message ordering guarantees within a partition key.
Concepts
- A partitioned queue divides a logical queue into N partitions (e.g.,
orders:partition-0,orders:partition-1, ...). - Messages are routed to partitions using a configurable partition strategy.
- Messages with the same partition key are guaranteed to land in the same partition, preserving ordering for related messages.
- Consumers can receive from all partitions, a specific partition, or a subset of partitions.
Built-in Partition Strategies
| Strategy | Description |
|---|---|
HashPartitionStrategy (default) |
Routes messages based on FNV-1a hash of the partition key. Same key → same partition. Messages without a key use the message ID for distribution. |
RoundRobinPartitionStrategy |
Distributes messages evenly across partitions in round-robin order, ignoring partition keys. |
ExplicitPartitionStrategy |
Routes messages to the partition index specified in the partition key (e.g., key "2" → partition 2). |
Manual Configuration
var queue = new QueueConfiguration()
.WithDefaults()
.StoreWithLmdb("C:\\queue_path", StorageSize.MB(100))
.BuildAndStartPartitioned("orders", partitionCount: 4);
// Enqueue with a partition key (routed by strategy)
var message = Message.Create(
data: "order-data"u8.ToArray(),
queue: "orders",
partitionKey: "customer-123"
);
queue.EnqueueToPartition(message, "orders");
// Receive from a specific partition
await foreach (var ctx in queue.ReceiveFromPartition("orders", partition: 0, cancellationToken: token))
{
// Process message
ctx.QueueContext.SuccessfullyReceived();
ctx.QueueContext.CommitChanges();
}
Using Dependency Injection
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("C:\\queue_path", config =>
{
config.MapSize = StorageSize.MB(500); // Use StorageSize helper instead of raw bytes
config.EnvironmentConfiguration.MaxDatabases = 20; // Increase for partitioned queues
})
.AutomaticEndpoint()
.UsePartitioning(
new HashPartitionStrategy(),
("orders", 4), // 4 partitions for "orders"
("events", 8) // 8 partitions for "events"
);
});
// Inject IPartitionedQueue
public class MyService(IPartitionedQueue queue)
{
public void ProcessOrder(string customerId, byte[] data)
{
var message = Message.Create(
data: data,
queue: "orders",
partitionKey: customerId // Same customer → same partition → ordered
);
queue.EnqueueToPartition(message, "orders");
}
public async Task ConsumePartition(int partition, CancellationToken token)
{
await foreach (var ctx in queue.ReceiveFromPartition("orders", partition, cancellationToken: token))
{
// Deserialize to a strongly-typed object
var order = ctx.Message.GetContent<OrderMessage>();
// Process — messages from same partition key arrive in order
ctx.QueueContext.SuccessfullyReceived();
ctx.QueueContext.CommitChanges();
}
}
}
Custom Partition Strategy
Implement IPartitionStrategy to create your own routing logic:
public class GeoPartitionStrategy : IPartitionStrategy
{
public int GetPartition(Message message, int partitionCount)
{
// Route by geographic region from partition key
var region = message.PartitionKeyString;
return region switch
{
"us-east" => 0 % partitionCount,
"us-west" => 1 % partitionCount,
"eu" => 2 % partitionCount,
_ => 0
};
}
}
LMDB Configuration Note
When using partitioned queues with LMDB storage, ensure MaxDatabases is set high enough to accommodate all partitions. Each partition creates a separate LMDB database. For example, 4 partitions + 1 outgoing database requires at least MaxDatabases = 6.
Storage Usage Monitoring
Bielu.PersistentQueues exposes storage usage information through the IMessageStore.GetStorageUsageInfo() API. This is useful for monitoring disk pressure and alerting when storage is nearing capacity.
Programmatic Access
You can query storage usage directly from the store:
var usageInfo = queue.Store.GetStorageUsageInfo();
if (usageInfo != null)
{
Console.WriteLine($"Used: {usageInfo.Value.UsedBytes} bytes");
Console.WriteLine($"Total: {usageInfo.Value.TotalBytes} bytes");
Console.WriteLine($"Usage: {usageInfo.Value.UsagePercentage:F2}%");
}
GetStorageUsageInfo()returnsnullif the storage provider does not support usage reporting. The LMDB provider reports usage based on the environment's page utilization relative to the configuredMapSize.
OpenTelemetry Integration
When OpenTelemetry instrumentation is enabled, storage usage metrics are automatically collected as observable gauges — no extra configuration is needed:
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("./queue_data", config =>
{
config.MapSize = StorageSize.MB(100); // Use StorageSize helper
})
.CreateQueues("my-queue");
});
// Storage gauges are registered automatically
services.AddBieluPersistentQueueInstrumentation();
The three storage gauges (storage.used_bytes, storage.total_bytes, storage.usage_percent) are polled at each metric scrape and always reflect the current state.
Transport Security (TLS Encryption)
Bielu.PersistentQueues supports TLS encryption to secure communication. The library provides hooks to enable custom certificate validation and encryption settings:
services.AddPersistentQueues(builder =>
{
builder
.AddLmdbStorage("C:\\queue_path")
.SecureTransportWith(
new TlsStreamSecurity(async (uri, stream) =>
{
var sslStream = new SslStream(stream, true, (_, _, _, _) => true, null);
await sslStream.AuthenticateAsClientAsync(uri.Host);
return sslStream;
}),
new TlsStreamSecurity(async (_, stream) =>
{
var sslStream = new SslStream(stream, false);
await sslStream.AuthenticateAsServerAsync(certificate, false,
checkCertificateRevocation: false, enabledSslProtocols: SslProtocols.Tls12);
return sslStream;
}));
});
Contributing
We welcome contributions! See our Contributing Guide for details on:
- Setting up your development environment
- Building and testing the project
- Submitting pull requests
Acknowledgments
This project is based on and builds upon the excellent work of:
- LightningQueues - The original fast persistent queues library for .NET, created and maintained by Corey Kaylor and contributors. Licensed under the MIT License.
License
This project is licensed under the MIT License - see the LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Bielu.PersistentQueues (>= 0.6.0-beta.639117966284424630)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.5)
- ZoneTree (>= 1.8.6)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.6.0-beta.639117966284424630 | 33 | 4/14/2026 |
| 0.6.0-beta.639117956712079210 | 36 | 4/14/2026 |
| 0.6.0-beta.639117942337787353 | 39 | 4/14/2026 |
| 0.6.0-beta.639117865410525614 | 33 | 4/14/2026 |
| 0.6.0-beta.639117486656783346 | 35 | 4/14/2026 |