SimpleKafkaLibrary 1.0.3
See the version list below for details.
dotnet add package SimpleKafkaLibrary --version 1.0.3
NuGet\Install-Package SimpleKafkaLibrary -Version 1.0.3
<PackageReference Include="SimpleKafkaLibrary" Version="1.0.3" />
paket add SimpleKafkaLibrary --version 1.0.3
#r "nuget: SimpleKafkaLibrary, 1.0.3"
// Install SimpleKafkaLibrary as a Cake Addin #addin nuget:?package=SimpleKafkaLibrary&version=1.0.3 // Install SimpleKafkaLibrary as a Cake Tool #tool nuget:?package=SimpleKafkaLibrary&version=1.0.3
What is the purpose of this KafkaLibrary?
The purpose of this repository is to provide a sample implementation of a Kafka message broker. The README.md file states that the repository contains "a sample implementation of kakfa message broker".
What are the key features or components of the sample implementation?
The key features/components of the sample implementation include:
KafkaLibrary: This is a library that simplifies the implementation of Kafka. It is built on top of the Confluent.Kafka library. Consumer Implementation: The sample code provides an example of how to implement a consumer that extends a ConsumerBase class. The consumer consumes messages from a topic named "testTopic".
public class Consumer : ConsumerBase
{
public Consumer(KafkaConfig configuration) : base("testKafka", configuration)
{
}
public override async Task Invoke()
{
await ConsumeAsync<string>("testTopic", (value) =>
{
Console.WriteLine(value);
});
await base.Invoke();
}
}
Configuration: The sample implementation requires configuration settings to be provided in the appsettings.json file, including the Kafka bootstrap servers, whether to use Kafka, the flush producer interval, and the consumed interval. Service Registration: The sample code shows how to configure the Kafka services in the application's dependency injection, either for the producer only or for both the producer and consumer.
"Kafka": {
"bootstrapservers": "localhost:9092",
"UseKafka": true,
"FlushProducerInSeconds": 2,
"ConsumedInSeconds": 2
}
How can one get started with using this sample implementation?
To get started with using the sample implementation, the README.md file provides the following steps:
Add the Kafka configuration in the appsettings.json file, including the bootstrap servers, whether to use Kafka, the flush producer interval, and the consumed interval. Create a consumer class that extends the ConsumerBase class, and override the Invoke() method to handle the message consumption. Register the Kafka services in the application's dependency injection, either for the producer only or for both the producer and consumer. The README.md file also provides sample code for the consumer implementation and the service registration.
Configured the Service by add this line of codes.
1. To add producer only use this code
services.AddKafkaServices(configuration.GetSection("producer"));
2. To add both producer and consumer use this code.
services.AddKafkaServices<Consumer>(configuration.GetSection("Kafka"));
Producer Usage
Sample Producer DI
public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand, PayloadResponse<UsersDto>>
{
private readonly IMessageProducers _messageProducers;
public CreateUserCommandHandler(IMessageProducers messageProducers)
{
_messageProducers = messageProducers
}
public async Task<PayloadResponse<UsersDto>> Handle(CreateUserCommand request, CancellationToken cancellationToken)
{
...
await _messageProducers.ProduceAsync<UsersDto>("User", newUser).ConfigureAwait(false);
return ResponseStatus<UsersDto>.Create<PayloadResponse<UsersDto>>(ResponseCodes.SUCCESSFUL, _messageProvider.GetMessage(ResponseCodes.SUCCESSFUL), newUser);
}
}
Consumer Usage
Sample Consumer class
Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\Consumer.cs
public class Consumer : ConsumerBase
{
public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
{
}
public override async Task Invoke()
{
await ConsumeAsync<string>("testTopic", (value) =>
{
// put your action here
Console.WriteLine(value);
});
await base.Invoke();
}
}
Note: - You can create multiple Consumer classes and configure them in the configuration file. - The groupId is test1. - The topic to be consumed is testTopic.
services.AddKafkaServices<Consumer>(kafkaConfig);
OR
services.AddKafkaServices(cfg =>
{
cfg.Configure(configuration.GetSection("Kafka"));
cfg.RegisterConsumer<Consumer>();
cfg.RegisterConsumer<Consumer2>();
});
OR
services.AddKafkaServices(cfg =>
{
cfg.Configure(configuration.GetSection("Kafka"));
cfg.RegisterConsumer(Assembly.GetExecutingAssembly());
});
License
This project is licensed with the MIT license.
Where can I find the sample code for this implementation?
The sample code for this implementation is available in the GitHub repository at the URL provided in the initial query: https://github.com/shegemm5252/MessagaBrokerSample
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.4.0)
- Microsoft.Extensions.Configuration (>= 8.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection (>= 8.0.0)
- Microsoft.Extensions.Options (>= 8.0.2)
- Newtonsoft.Json (>= 13.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.