Momentum.Extensions.Messaging.Kafka
0.0.1-pre.14
dotnet add package Momentum.Extensions.Messaging.Kafka --version 0.0.1-pre.14
NuGet\Install-Package Momentum.Extensions.Messaging.Kafka -Version 0.0.1-pre.14
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.1-pre.14" />
<PackageVersion Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.1-pre.14" />
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" />
paket add Momentum.Extensions.Messaging.Kafka --version 0.0.1-pre.14
#r "nuget: Momentum.Extensions.Messaging.Kafka, 0.0.1-pre.14"
#:package Momentum.Extensions.Messaging.Kafka@0.0.1-pre.14
#addin nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.1-pre.14&prerelease
#tool nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.1-pre.14&prerelease
Momentum.Extensions.Messaging.Kafka
Kafka messaging integration package for the Momentum platform, providing event-driven architecture capabilities with CloudEvents support and automatic topic management.
Overview
This package extends the Momentum platform with Apache Kafka messaging capabilities, enabling reliable event-driven communication between microservices. It builds on top of Momentum.ServiceDefaults
to provide seamless integration with the platform's observability, health checks, and configuration systems.
Installation
Add the package to your project using the .NET CLI:
dotnet add package Momentum.Extensions.Messaging.Kafka
Or using the Package Manager Console:
Install-Package Momentum.Extensions.Messaging.Kafka
Key Features
- Event-Driven Architecture: Full support for integration and domain events
- CloudEvents Compliance: Industry-standard event format with automatic serialization
- Automatic Topic Management: Environment-aware topic naming and auto-provisioning
- Partition Key Support: Intelligent message partitioning for scalability
- OpenTelemetry Integration: Built-in observability and distributed tracing
- Health Checks: Kafka connectivity monitoring
- WolverineFx Integration: CQRS/MediatR-style message handling
Integrated Dependencies
This package includes the following key dependencies:
Package | Purpose |
---|---|
Aspire.Confluent.Kafka | .NET Aspire Kafka integration with service discovery |
CloudNative.CloudEvents.Kafka | CloudEvents specification implementation for Kafka |
WolverineFx.Kafka | Message bus framework with Kafka transport |
WolverineFx | Message bus framework with pattern matching |
Getting Started
Prerequisites
- .NET 9.0 or later
- Apache Kafka 2.8 or later
- Momentum.ServiceDefaults package
Basic Setup
Add Kafka messaging to your Momentum service:
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add service defaults first
builder.AddServiceDefaults();
// Add Kafka messaging
builder.AddKafkaMessagingExtensions();
var app = builder.Build();
app.MapDefaultEndpoints();
app.Run();
2. Configuration
Add the Kafka connection string to your configuration:
// appsettings.json
{
"ConnectionStrings": {
"Messaging": "localhost:9092"
}
}
3. Define Integration Events
Create events that will be published across services:
// Events should be in a namespace ending with "IntegrationEvents"
namespace MyService.Contracts.IntegrationEvents;
[EventTopic("customer", Domain = "sales")]
public record CustomerCreated(
Guid CustomerId,
string Name,
string Email,
DateTime CreatedAt) : IDistributedEvent
{
public string GetPartitionKey() => CustomerId.ToString();
}
4. Publishing Events
Publish events using Wolverine's message bus:
public class CustomerService(IMessageBus messageBus)
{
public async Task CreateCustomerAsync(CreateCustomerRequest request)
{
// Business logic here...
var integrationEvent = new CustomerCreated(
customerId,
request.Name,
request.Email,
DateTime.UtcNow);
// This will be automatically routed to the appropriate Kafka topic
await messageBus.PublishAsync(integrationEvent);
}
}
5. Handling Events
Create handlers for integration events:
// This handler will automatically subscribe to the CustomerCreated topic
public class CustomerCreatedHandler
{
public async Task Handle(CustomerCreated customerCreated, CancellationToken cancellationToken)
{
// Process the integration event
Console.WriteLine($"Customer {customerCreated.Name} was created with ID {customerCreated.CustomerId}");
}
}
Advanced Configuration
Topic Naming Convention
Topics are automatically named using the following pattern:
{environment}.{domain}.{scope}.{topic}.{version}
For example:
- Development:
dev.sales.public.customers.v1
- Production:
prod.sales.public.customers.v1
- Internal events:
prod.sales.internal.customer-updates.v1
Event Topic Attributes
Control topic configuration with attributes:
[EventTopic(
"order-payment",
Domain = "ecommerce",
Internal = false, // Creates public topic
ShouldPluralizeTopicName = true, // "payments" instead of "payment"
Version = "v2")]
public record PaymentProcessed(Guid OrderId, decimal Amount);
Partition Key Strategies
Using IDistributedEvent Interface
public record OrderCreated(Guid OrderId, Guid CustomerId) : IDistributedEvent
{
// Messages with the same customer ID will go to the same partition
public string GetPartitionKey() => CustomerId.ToString();
}
Using PartitionKey Attribute
public record ProductUpdated(
[PartitionKey] Guid ProductId,
string Name,
decimal Price);
Environment-Specific Configuration
The package automatically adapts topic names based on the environment:
// Environment mapping
"Development" → "dev"
"Production" → "prod"
"Test" → "test"
Health Checks
Kafka health checks are automatically registered and available at the /health
endpoint:
{
"status": "Healthy",
"checks": {
"kafka": {
"status": "Healthy",
"description": "Kafka connectivity check",
"tags": ["messaging", "kafka"]
}
}
}
CloudEvents Integration
All messages are automatically wrapped in CloudEvents format, providing:
- Standardization: Industry-standard event format
- Metadata: Rich event metadata (source, type, time, etc.)
- Tracing: Distributed tracing correlation
- Versioning: Event schema evolution support
Example CloudEvent structure:
{
"specversion": "1.0",
"type": "CustomerCreated",
"source": "urn:momentum:sales-api",
"id": "550e8400-e29b-41d4-a716-446655440000",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"data": {
"customerId": "123e4567-e89b-12d3-a456-426614174000",
"name": "John Doe",
"email": "john.doe@example.com"
}
}
Error Handling
The package provides error handling through WolverineFx:
- Basic Error Logging: Failed message processing is logged for monitoring
Observability
Built-in observability includes:
Metrics
- TBD
Tracing
- TBD
Logging
- Structured logging with correlation IDs
- Event processing lifecycle
- Error diagnostics
Best Practices
Event Design
- Immutable Events: Events represent facts that have already occurred
- Rich Context: Include all necessary information in the event
- Backward Compatibility: Design events for schema evolution
Partition Keys
- Consistency: Use consistent partition keys for related events
- Distribution: Ensure good key distribution to avoid hot partitions
- Stability: Partition keys should be stable over time
Topic Management
- Environment Separation: Always use environment-specific topics
- Naming Conventions: Follow the established topic naming pattern
- Retention: Configure appropriate message retention policies
Troubleshooting
Common Issues
Connection Failures
InvalidOperationException: Kafka connection string 'Messaging' not found in configuration
- Ensure the
Messaging
connection string is configured - Verify Kafka broker accessibility
Topic Creation Issues
Topic does not exist and auto-creation is disabled
- Enable auto-provisioning in Kafka configuration
- Manually create topics if auto-creation is disabled
Serialization Errors
CloudEvent serialization failed
- Ensure event types are properly decorated with attributes
- Verify JSON serialization compatibility
Debug Logging
Enable debug logging for detailed troubleshooting:
{
"Logging": {
"LogLevel": {
"Momentum.Extensions.Messaging.Kafka": "Debug",
"Wolverine.Kafka": "Debug"
}
}
}
Requirements
- .NET 9.0 or later
- Apache Kafka 2.8 or later
- Momentum.ServiceDefaults package
Related Packages
- Momentum.ServiceDefaults - Base service configuration
- Momentum.Extensions.Abstractions - Shared abstractions and interfaces
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contributing
For more information about the Momentum platform and contribution guidelines, please visit the main repository.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Aspire.Confluent.Kafka (>= 9.4.1)
- CloudNative.CloudEvents.Kafka (>= 2.8.0)
- Microsoft.SourceLink.GitHub (>= 8.0.0)
- Momentum.Extensions.Abstractions (>= 0.0.1-pre.14)
- Momentum.ServiceDefaults (>= 0.0.1-pre.14)
- WolverineFx.Kafka (>= 4.9.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
0.0.1-pre.14 | 5 | 8/21/2025 |
0.0.1-pre.13 | 10 | 8/21/2025 |
0.0.1-pre.12 | 11 | 8/20/2025 |
0.0.1-pre.11 | 12 | 8/18/2025 |
0.0.1-pre.10 | 11 | 8/18/2025 |
0.0.1-pre.9 | 11 | 8/18/2025 |
0.0.1-pre.8 | 11 | 8/18/2025 |
0.0.1-pre.7 | 10 | 8/18/2025 |
0.0.1-pre.6 | 13 | 8/18/2025 |