SimpleKafkaLibrary 1.0.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package SimpleKafkaLibrary --version 1.0.3                
NuGet\Install-Package SimpleKafkaLibrary -Version 1.0.3                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SimpleKafkaLibrary" Version="1.0.3" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add SimpleKafkaLibrary --version 1.0.3                
#r "nuget: SimpleKafkaLibrary, 1.0.3"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install SimpleKafkaLibrary as a Cake Addin
#addin nuget:?package=SimpleKafkaLibrary&version=1.0.3

// Install SimpleKafkaLibrary as a Cake Tool
#tool nuget:?package=SimpleKafkaLibrary&version=1.0.3                

What is the purpose of this KafkaLibrary?

The purpose of this repository is to provide a sample implementation of a Kafka message broker. The README.md file states that the repository contains "a sample implementation of kakfa message broker".

What are the key features or components of the sample implementation?

The key features/components of the sample implementation include:

KafkaLibrary: This is a library that simplifies the implementation of Kafka. It is built on top of the Confluent.Kafka library. Consumer Implementation: The sample code provides an example of how to implement a consumer that extends a ConsumerBase class. The consumer consumes messages from a topic named "testTopic".


        public class Consumer : ConsumerBase
        {
        
            public Consumer(KafkaConfig configuration) : base("testKafka", configuration)
            {

            }


            public override async Task Invoke()
            {

                await ConsumeAsync<string>("testTopic", (value) =>
                {
                    Console.WriteLine(value);


                });



                await base.Invoke();
            }
        }

Configuration: The sample implementation requires configuration settings to be provided in the appsettings.json file, including the Kafka bootstrap servers, whether to use Kafka, the flush producer interval, and the consumed interval. Service Registration: The sample code shows how to configure the Kafka services in the application's dependency injection, either for the producer only or for both the producer and consumer.

    "Kafka": {
        "bootstrapservers": "localhost:9092",
        "UseKafka": true,
        "FlushProducerInSeconds": 2,
        "ConsumedInSeconds":  2
    }

How can one get started with using this sample implementation?

To get started with using the sample implementation, the README.md file provides the following steps:

Add the Kafka configuration in the appsettings.json file, including the bootstrap servers, whether to use Kafka, the flush producer interval, and the consumed interval. Create a consumer class that extends the ConsumerBase class, and override the Invoke() method to handle the message consumption. Register the Kafka services in the application's dependency injection, either for the producer only or for both the producer and consumer. The README.md file also provides sample code for the consumer implementation and the service registration.

Configured the Service by add this line of codes.

1. To add producer only use this code

    services.AddKafkaServices(configuration.GetSection("producer"));

2. To add both producer and consumer use this code.
  
     services.AddKafkaServices<Consumer>(configuration.GetSection("Kafka"));

Producer Usage Sample Producer DI

 public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand, PayloadResponse<UsersDto>>
 {
    private readonly IMessageProducers _messageProducers;

    public CreateUserCommandHandler(IMessageProducers messageProducers)
    {
       _messageProducers = messageProducers
    }

    public async Task<PayloadResponse<UsersDto>> Handle(CreateUserCommand request, CancellationToken cancellationToken)
        {
            ...

            await _messageProducers.ProduceAsync<UsersDto>("User", newUser).ConfigureAwait(false);

            return ResponseStatus<UsersDto>.Create<PayloadResponse<UsersDto>>(ResponseCodes.SUCCESSFUL, _messageProvider.GetMessage(ResponseCodes.SUCCESSFUL), newUser);
        }

 }

Consumer Usage

Sample Consumer class

Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\Consumer.cs

    public class Consumer : ConsumerBase
    {

        public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
        {

        }

        public override async Task Invoke()
        {

            await ConsumeAsync<string>("testTopic", (value) =>
            {  
                // put your action here
                Console.WriteLine(value);
            });

            await base.Invoke();
        }
    }

Note: - You can create multiple Consumer classes and configure them in the configuration file. - The groupId is test1. - The topic to be consumed is testTopic.

services.AddKafkaServices<Consumer>(kafkaConfig);

OR


services.AddKafkaServices(cfg =>
{
    cfg.Configure(configuration.GetSection("Kafka"));
    cfg.RegisterConsumer<Consumer>();
    cfg.RegisterConsumer<Consumer2>(); 
});

OR



services.AddKafkaServices(cfg =>
{
    cfg.Configure(configuration.GetSection("Kafka"));

    cfg.RegisterConsumer(Assembly.GetExecutingAssembly());

});

License

This project is licensed with the MIT license.

Where can I find the sample code for this implementation?

The sample code for this implementation is available in the GitHub repository at the URL provided in the initial query: https://github.com/shegemm5252/MessagaBrokerSample

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.5 96 1/20/2025
1.0.4 113 12/27/2024
1.0.3 102 12/17/2024
1.0.2 218 11/27/2024
1.0.1 118 6/22/2024
1.0.0 112 6/22/2024