Momentum.Extensions.Messaging.Kafka 0.0.1-pre.14

This is a prerelease version of Momentum.Extensions.Messaging.Kafka.
dotnet add package Momentum.Extensions.Messaging.Kafka --version 0.0.1-pre.14
                    
NuGet\Install-Package Momentum.Extensions.Messaging.Kafka -Version 0.0.1-pre.14
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.1-pre.14" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.1-pre.14" />
                    
Directory.Packages.props
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Momentum.Extensions.Messaging.Kafka --version 0.0.1-pre.14
                    
#r "nuget: Momentum.Extensions.Messaging.Kafka, 0.0.1-pre.14"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Momentum.Extensions.Messaging.Kafka@0.0.1-pre.14
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.1-pre.14&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.1-pre.14&prerelease
                    
Install as a Cake Tool

Momentum.Extensions.Messaging.Kafka

Kafka messaging integration package for the Momentum platform, providing event-driven architecture capabilities with CloudEvents support and automatic topic management.

Overview

This package extends the Momentum platform with Apache Kafka messaging capabilities, enabling reliable event-driven communication between microservices. It builds on top of Momentum.ServiceDefaults to provide seamless integration with the platform's observability, health checks, and configuration systems.

Installation

Add the package to your project using the .NET CLI:

dotnet add package Momentum.Extensions.Messaging.Kafka

Or using the Package Manager Console:

Install-Package Momentum.Extensions.Messaging.Kafka

Key Features

  • Event-Driven Architecture: Full support for integration and domain events
  • CloudEvents Compliance: Industry-standard event format with automatic serialization
  • Automatic Topic Management: Environment-aware topic naming and auto-provisioning
  • Partition Key Support: Intelligent message partitioning for scalability
  • OpenTelemetry Integration: Built-in observability and distributed tracing
  • Health Checks: Kafka connectivity monitoring
  • WolverineFx Integration: CQRS/MediatR-style message handling

Integrated Dependencies

This package includes the following key dependencies:

Package Purpose
Aspire.Confluent.Kafka .NET Aspire Kafka integration with service discovery
CloudNative.CloudEvents.Kafka CloudEvents specification implementation for Kafka
WolverineFx.Kafka Message bus framework with Kafka transport
WolverineFx Message bus framework with pattern matching

Getting Started

Prerequisites

  • .NET 9.0 or later
  • Apache Kafka 2.8 or later
  • Momentum.ServiceDefaults package

Basic Setup

Add Kafka messaging to your Momentum service:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Add service defaults first
builder.AddServiceDefaults();

// Add Kafka messaging
builder.AddKafkaMessagingExtensions();

var app = builder.Build();

app.MapDefaultEndpoints();
app.Run();

2. Configuration

Add the Kafka connection string to your configuration:

// appsettings.json
{
    "ConnectionStrings": {
        "Messaging": "localhost:9092"
    }
}

3. Define Integration Events

Create events that will be published across services:

// Events should be in a namespace ending with "IntegrationEvents"
namespace MyService.Contracts.IntegrationEvents;

[EventTopic("customer", Domain = "sales")]
public record CustomerCreated(
    Guid CustomerId,
    string Name,
    string Email,
    DateTime CreatedAt) : IDistributedEvent
{
    public string GetPartitionKey() => CustomerId.ToString();
}

4. Publishing Events

Publish events using Wolverine's message bus:

public class CustomerService(IMessageBus messageBus)
{
    public async Task CreateCustomerAsync(CreateCustomerRequest request)
    {
        // Business logic here...

        var integrationEvent = new CustomerCreated(
            customerId,
            request.Name,
            request.Email,
            DateTime.UtcNow);

        // This will be automatically routed to the appropriate Kafka topic
        await messageBus.PublishAsync(integrationEvent);
    }
}

5. Handling Events

Create handlers for integration events:

// This handler will automatically subscribe to the CustomerCreated topic
public class CustomerCreatedHandler
{
    public async Task Handle(CustomerCreated customerCreated, CancellationToken cancellationToken)
    {
        // Process the integration event
        Console.WriteLine($"Customer {customerCreated.Name} was created with ID {customerCreated.CustomerId}");
    }
}

Advanced Configuration

Topic Naming Convention

Topics are automatically named using the following pattern:

{environment}.{domain}.{scope}.{topic}.{version}

For example:

  • Development: dev.sales.public.customers.v1
  • Production: prod.sales.public.customers.v1
  • Internal events: prod.sales.internal.customer-updates.v1

Event Topic Attributes

Control topic configuration with attributes:

[EventTopic(
    "order-payment",
    Domain = "ecommerce",
    Internal = false,           // Creates public topic
    ShouldPluralizeTopicName = true,  // "payments" instead of "payment"
    Version = "v2")]
public record PaymentProcessed(Guid OrderId, decimal Amount);

Partition Key Strategies

Using IDistributedEvent Interface
public record OrderCreated(Guid OrderId, Guid CustomerId) : IDistributedEvent
{
    // Messages with the same customer ID will go to the same partition
    public string GetPartitionKey() => CustomerId.ToString();
}
Using PartitionKey Attribute
public record ProductUpdated(
    [PartitionKey] Guid ProductId,
    string Name,
    decimal Price);

Environment-Specific Configuration

The package automatically adapts topic names based on the environment:

// Environment mapping
"Development" → "dev"
"Production" → "prod"
"Test" → "test"

Health Checks

Kafka health checks are automatically registered and available at the /health endpoint:

{
    "status": "Healthy",
    "checks": {
        "kafka": {
            "status": "Healthy",
            "description": "Kafka connectivity check",
            "tags": ["messaging", "kafka"]
        }
    }
}

CloudEvents Integration

All messages are automatically wrapped in CloudEvents format, providing:

  • Standardization: Industry-standard event format
  • Metadata: Rich event metadata (source, type, time, etc.)
  • Tracing: Distributed tracing correlation
  • Versioning: Event schema evolution support

Example CloudEvent structure:

{
    "specversion": "1.0",
    "type": "CustomerCreated",
    "source": "urn:momentum:sales-api",
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "time": "2024-01-15T10:30:00Z",
    "datacontenttype": "application/json",
    "data": {
        "customerId": "123e4567-e89b-12d3-a456-426614174000",
        "name": "John Doe",
        "email": "john.doe@example.com"
    }
}

Error Handling

The package provides error handling through WolverineFx:

  • Basic Error Logging: Failed message processing is logged for monitoring

Observability

Built-in observability includes:

Metrics

  • TBD

Tracing

  • TBD

Logging

  • Structured logging with correlation IDs
  • Event processing lifecycle
  • Error diagnostics

Best Practices

Event Design

  • Immutable Events: Events represent facts that have already occurred
  • Rich Context: Include all necessary information in the event
  • Backward Compatibility: Design events for schema evolution

Partition Keys

  • Consistency: Use consistent partition keys for related events
  • Distribution: Ensure good key distribution to avoid hot partitions
  • Stability: Partition keys should be stable over time

Topic Management

  • Environment Separation: Always use environment-specific topics
  • Naming Conventions: Follow the established topic naming pattern
  • Retention: Configure appropriate message retention policies

Troubleshooting

Common Issues

Connection Failures

InvalidOperationException: Kafka connection string 'Messaging' not found in configuration
  • Ensure the Messaging connection string is configured
  • Verify Kafka broker accessibility

Topic Creation Issues

Topic does not exist and auto-creation is disabled
  • Enable auto-provisioning in Kafka configuration
  • Manually create topics if auto-creation is disabled

Serialization Errors

CloudEvent serialization failed
  • Ensure event types are properly decorated with attributes
  • Verify JSON serialization compatibility

Debug Logging

Enable debug logging for detailed troubleshooting:

{
    "Logging": {
        "LogLevel": {
            "Momentum.Extensions.Messaging.Kafka": "Debug",
            "Wolverine.Kafka": "Debug"
        }
    }
}

Requirements

  • .NET 9.0 or later
  • Apache Kafka 2.8 or later
  • Momentum.ServiceDefaults package
  • Momentum.ServiceDefaults - Base service configuration
  • Momentum.Extensions.Abstractions - Shared abstractions and interfaces

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

For more information about the Momentum platform and contribution guidelines, please visit the main repository.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.1-pre.14 5 8/21/2025
0.0.1-pre.13 10 8/21/2025
0.0.1-pre.12 11 8/20/2025
0.0.1-pre.11 12 8/18/2025
0.0.1-pre.10 11 8/18/2025
0.0.1-pre.9 11 8/18/2025
0.0.1-pre.8 11 8/18/2025
0.0.1-pre.7 10 8/18/2025
0.0.1-pre.6 13 8/18/2025