SlugEnt.StreamProcessor
0.4.0
Suggested Alternatives
Additional Details
Renamed and replaced. This package contains very preliminary working library. Do not use
dotnet add package SlugEnt.StreamProcessor --version 0.4.0
NuGet\Install-Package SlugEnt.StreamProcessor -Version 0.4.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SlugEnt.StreamProcessor" Version="0.4.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add SlugEnt.StreamProcessor --version 0.4.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: SlugEnt.StreamProcessor, 0.4.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install SlugEnt.StreamProcessor as a Cake Addin #addin nuget:?package=SlugEnt.StreamProcessor&version=0.4.0 // Install SlugEnt.StreamProcessor as a Cake Tool #tool nuget:?package=SlugEnt.StreamProcessor&version=0.4.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
RabbitMQ.StreamProcessor
High Level library for interacting with RabbitMQ streams using the base library --> rabbitmq-stream-dotnet-client
This library provides a higher level interface for working with RabbitMQ Streams that makes its use in applications quick and very easy.
Features
It supports a number of features out of the box including:
- Ability to automatically perform Offset Storage in the MQ Stream for the specific application after a given number of messages have been received.
- Provide statistics on number of messages received / produced / successful / failures
- Built in Circuit Breaker that stops sending messages after a given number of consecutive failures
- Automatic retry logic for failed publishes
- Auto-retry can be turned off
- Retry logic if successful resets circuit breaker allowing full resumption of publishing messages
- Optional subscription to events:
- Publish Confirmation
- Publish Failure
- CheckPoint Saved (Offset set in stream for the given application)
- Ability to read last offsets for the given stream / application
- Ability to call CircuitBreak and ask it to recheck its status
- Can Send simple Messages (Text) only
- Can Send complex Messages
- Creation Time automatically set
- The application that generated the message is stored in message
- Can add any number of additional ApplicationProperties to the message with having to store them in the body
- Ability to define stream on start if it does not exist yet.
- Ability to configure new stream, per:
- Maximum Size
- Maximum Segment Size
- Max Age
- Ability to set stream size / age limits in sizes such as MB or KB instead of bytes*
- Built in Logging
- Multi-Threaded
Sample Use
string _streamName = "Sample.B";
ISampleB_Producer _producerB;
StreamSystemConfig config = new StreamSystemConfig();
_producerB = _serviceProvider.GetService<ISampleB_Producer>();
// Initialize the Stream. We will access _streamName and the Application name is "producerB"
_producerB.Initialize(_streamName,"producerB",config);
// Set the method that generates Messages
_producerB.SetProducerMethod(ProduceMessages);
// Create a stream that has max size of 1000 bytes, max segment size of 100 bytes and max age of 900 seconds
_producerB.SetStreamLimitsRaw(1000, 100, 900);
// Define Event handlers. This is optional
_producerB.MessageConfirmationError += MessageConfirmationError;
_producerB.MessageConfirmationSuccess += MessageConfirmationSuccess;
// Start producing messages
await _producerB.Start();
// Build a consumer
_consumerB_slow = _serviceProvider.GetService<ISampleB_Consumer>();
_consumerB_slow.Initialize(_streamName, "consumer_slow",config);
_consumerB_slow.SetConsumptionHandler(ConsumeSlow);
// Optional if you want to know when a Checkpoint has occurred
_consumerB_slow.EventCheckPointSaved += OnEventCheckPointSavedSlow;
// Start Consuming messages
await _consumerB_slow.Start();
// Produce Messages Method
protected async Task ProduceMessages(SampleB_Producer producer)
{
Message message = producer.CreateMessage("hello");
await producer.SendMessage(message);
}
// Consume Messages method
private async Task<bool> ConsumeSlow(Message message)
{
// Do something with Message
string msg = Encoding.Default.GetString(message.Data.Contents.ToArray());
Console.WriteLine("msg: ", msg);
// Simulate slow
Thread.Sleep(1500);
return true;
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net7.0
- ByteSize (>= 2.1.1)
- Microsoft.Extensions.DependencyInjection (>= 7.0.0)
- RabbitMQ.Stream.Client (>= 1.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Alpha Development