BorgNet.Kafka
1.0.0
dotnet add package BorgNet.Kafka --version 1.0.0
NuGet\Install-Package BorgNet.Kafka -Version 1.0.0
<PackageReference Include="BorgNet.Kafka" Version="1.0.0" />
paket add BorgNet.Kafka --version 1.0.0
#r "nuget: BorgNet.Kafka, 1.0.0"
// Install BorgNet.Kafka as a Cake Addin #addin nuget:?package=BorgNet.Kafka&version=1.0.0 // Install BorgNet.Kafka as a Cake Tool #tool nuget:?package=BorgNet.Kafka&version=1.0.0
使用 appsetting.json "KafkaOptions": { "BootstrapServers": "aiot-kafka:9092", "SaslMechanism": "Plain", "SecurityProtocol": "Plaintext", "SaslUsername": "", "SaslPassword": "" } 启用服务 没有基于BorgNet.Core 的使用场景,必须手动进行初始化,如下: 如下:
public void ConfigureServices(IServiceCollection services) { var conf = new ProducerConfig(); configuration.GetSection("KafkaOptions").Bind(conf); var kafkaProducer = new ProducerBuilder<Null, string>(conf).Build(); services.TryAddSingleton(kafkaProducer); } 基于BorgNet.Core的使用,只需要添加引用后配置以上appsetting.josn配置KafkaOptions节点即可 使用说明 public class KafkaController : ControllerBase {
private readonly IProducer<Null, string> _producer;
private IServiceProvider serviceProvider;
public KafkaController(IProducer<Null, string> producer, IServiceProvider serviceProvider)
{
_producer = producer;
this.serviceProvider = serviceProvider;
}
[HttpPost]
public async Task<IActionResult> reportdevice([FromBody]KeyValueEquipment keyValue)
{
await _producer.ProduceAsync($"report-device", new Message<Null, string>
{
Value = System.Text.Json.JsonSerializer.Serialize(new List<KeyValueEquipment> { keyValue })
});
return Ok("ok");
}
[HttpGet]
public async Task<IActionResult> Subscribe()
{
_= serviceProvider.GetRequiredService<SubscribeWorker>().StartAsync(default);
return Ok("ok");
}
}
public class SubscribeWorker : BackgroundService, ISingletonDependency
{
private readonly IConfiguration _configuration;
public SubscribeWorker(IConfiguration configuration)
{
_configuration = configuration;
}
public int Order => 0;
/// <summary>
/// 订阅
/// </summary>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
var consumerConfig = new ConsumerConfig();
// 消费者组ID,在Kafka中相同Group的消费者绑定同一个offset
// 如果需要消费全量数据,则使用一个完全唯一的GroupID即可,若需要数据在消费者之间均衡,则几个消费者使用一样的GroupID
consumerConfig.GroupId = Guid.NewGuid().ToString();
consumerConfig.AutoOffsetReset = AutoOffsetReset.Latest;
_configuration.GetSection("KafkaOptions").Bind(consumerConfig);
var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
var topic = $"report-device";
consumer.Subscribe(topic);
while (!stoppingToken.IsCancellationRequested)
{
var messageText = consumer.Consume(stoppingToken);
var message = System.Text.Json.JsonSerializer.Deserialize<List<KeyValueEquipment>>(messageText.Message.Value);
foreach (var kv in message)
{
Console.WriteLine($"{kv.Key},{kv.Value}");
}
}
consumer.Unsubscribe();
}
}
更新中...
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- BorgNet.Startup (>= 1.0.0)
- Confluent.Kafka (>= 1.9.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.0 | 184 | 1/29/2023 |