PiBox.Plugins.Messaging.Kafka.Flow
1.0.85
dotnet add package PiBox.Plugins.Messaging.Kafka.Flow --version 1.0.85
NuGet\Install-Package PiBox.Plugins.Messaging.Kafka.Flow -Version 1.0.85
<PackageReference Include="PiBox.Plugins.Messaging.Kafka.Flow" Version="1.0.85" />
paket add PiBox.Plugins.Messaging.Kafka.Flow --version 1.0.85
#r "nuget: PiBox.Plugins.Messaging.Kafka.Flow, 1.0.85"
// Install PiBox.Plugins.Messaging.Kafka.Flow as a Cake Addin #addin nuget:?package=PiBox.Plugins.Messaging.Kafka.Flow&version=1.0.85 // Install PiBox.Plugins.Messaging.Kafka.Flow as a Cake Tool #tool nuget:?package=PiBox.Plugins.Messaging.Kafka.Flow&version=1.0.85
Kafka Flow Plugin
This plugin provides the nuget packages from KafkaFlow as Pibox plugin.
The containers that you need are also provided in the docker-compose.yaml file. You just need to configure the consumer(
- and the producer(s) and use them accordingly.
Installation
Install the Plugin via Nuget
dotnet add package PiBox.Plugins.Messaging.Kafka.Flow
or add as package reference to your .csproj
<PackageReference Include="PiBox.Plugins.Messaging.Kafka.Flow" Version=""/>
Appsettings.yml
Configure your appsettings.yml
accordingly.
kafka:
client:
bootstrapServers: "localhost:9092,localhost:9093"
# securityProtocol: "SaslPlaintext"
# saslPassword: "asdf"
# saslUsername: "asdf"
# saslMechanism: "Plain"
# sslCaLocation: ""
# enableSslCertificateVerification: "false"
schemaRegistry:
url: "localhost:8081"
# basicAuthUserInfo: "developer:SECRET"
# enableSslCertificateVerification: "false"
Containers
The docker-compose.yaml will run the following containers:
- zookeeper
- broker
- schema-registry
- control-center
To run all containers
docker-compose up
To stop and remove the currently running containers
docker-compose down
Usage
Plugin configuration
public class KafkaFlowExamplePlugin : IPluginServiceConfiguration
{
private readonly IConfiguration _configuration;
private readonly ILogger? _logger;
public KafkaFlowExamplePlugin(IConfiguration configuration, ILogger<KafkaFlowExamplePlugin>? logger)
{
_configuration = configuration;
_logger = logger;
}
//Configure your consumers & producers
public void ConfigureServices(IServiceCollection serviceCollection)
{
serviceCollection.ConfigureKafka(_configuration, _logger, kafkaFlowBuilder => kafkaFlowBuilder
//possible configurations:
// producerConfig is none
// producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>>
// type is typeof(TMessage)
.AddTypedProducer<TMessage>("protobuf-topic")
// producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>>
// type is typeof(TMessage)
.AddTypedProducer<TMessage>("protobuf-topic", producerConfig)
// producerConfig is none
// producer is added to a List<(string, Action<IProducerConfigurationBuilder>)
// the string added is typeof(TProducer).Name
.AddProducer<TProducer>("protobuf-topic")
// producer is added to a List<(string, Action<IProducerConfigurationBuilder>)
// the string added is typeof(TProducer).Name
.AddProducer<TProducer>("protobuf-topic", producerConfig)
// consumer is added to a List<Action<IConsumerConfigurationBuilder>>
.AddConsumer<TMessageHandler>("protobuf-topic", "mygroup"));
// consumer is added to a List<Action<IConsumerConfigurationBuilder>>
// dead letter message is produced on dead letter topic in case of unsuccessful processing of the message
.AddConsumerWithDeadLetter<TMessageHandler, TMessage, TDeadLetterMessage>("protobuf-topic", "dead-letter-topic", "mygroup")
}
}
Click here to read more about the IServiceCollection interface.
Protobuf message format example
syntax = "proto3";
message TMessage {
string Message = 1;
int32 Code = 2;
}
Click here to read more about protobuf
Consumer usage
public class ProtobufMessageHandler : IMessageHandler<ProtobufLogMessage>
{
public Task Handle(IMessageContext context, ProtobufLogMessage message)
{
// Do something
}
}
Consumer with dead letter message producer usage
// The DltMessageHandler inherits from IMessageHandler<TMessage> (used to create a message handler)
// It also produces a dead letter message if there was an exception in ProcessMessageAsync
public class ProtobufDltMessageHandler : DltMessageHandler<TMessage, TDeadLetterMessage>
{
protected override async Task ProcessMessageAsync(IMessageContext context, TMessage message)
{
// Do something
}
protected override TDeadLetterMessage HandleError(IMessageContext context, TMessage message, Error error)
{
// Do something
}
}
Producer usage (optional)
public class SampleProducer
{
private readonly IMessageProducer _producer;
public SampleProducer(IProducerAccessor producerAccessor)
{
_producer = producerAccessor.GetProducer("TProducer");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_producer.ProduceAsync("protobuf-topic", messageKey, messageValue);
}
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- KafkaFlow (>= 2.5.0)
- KafkaFlow.Extensions.Hosting (>= 2.5.0)
- KafkaFlow.LogHandler.Microsoft (>= 2.5.0)
- KafkaFlow.Microsoft.DependencyInjection (>= 2.5.0)
- KafkaFlow.SchemaRegistry (>= 2.5.0)
- KafkaFlow.Serializer (>= 2.5.0)
- KafkaFlow.Serializer.ProtobufNet (>= 2.5.0)
- KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf (>= 2.5.0)
- KafkaFlow.TypedHandler (>= 2.5.0)
- PiBox.Hosting.Abstractions (>= 1.0.85)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.85 | 66 | 1/15/2025 |
1.0.79 | 92 | 12/20/2024 |
1.0.73 | 93 | 10/22/2024 |
1.0.66 | 83 | 10/15/2024 |
1.0.64 | 95 | 10/14/2024 |
1.0.61 | 91 | 10/1/2024 |
1.0.60 | 100 | 9/27/2024 |
1.0.54 | 140 | 4/30/2024 |
1.0.51 | 126 | 2/27/2024 |
1.0.49 | 95 | 2/27/2024 |
1.0.47 | 128 | 2/21/2024 |
1.0.45 | 119 | 2/20/2024 |
1.0.43 | 116 | 2/13/2024 |
1.0.41 | 120 | 2/13/2024 |
1.0.39 | 129 | 2/8/2024 |
1.0.38 | 125 | 2/8/2024 |
1.0.37 | 122 | 2/8/2024 |
1.0.35 | 116 | 2/2/2024 |
1.0.32 | 115 | 1/30/2024 |
1.0.25 | 196 | 12/27/2023 |
1.0.23 | 133 | 12/19/2023 |
1.0.22 | 127 | 12/19/2023 |
1.0.21 | 127 | 12/19/2023 |
1.0.19 | 149 | 12/11/2023 |
1.0.17 | 190 | 11/23/2023 |
1.0.7 | 148 | 11/23/2023 |
1.0.5 | 161 | 11/23/2023 |
1.0.3 | 155 | 11/23/2023 |
1.0.0 | 132 | 11/21/2023 |
0.9.9 | 154 | 11/21/2023 |
0.9.7 | 147 | 11/21/2023 |