【kafka】生产者和消费者代码
作者:互联网
Producer
static void Main(string[] args) { Console.WriteLine("请输入消息内容"); using (var producer = new KafkaProducer()) { while (true) { string message = Console.ReadLine(); try { //topic名称是test var result = producer.ProduceAsync("test", new Confluent.Kafka.Message<string, string>() { Key = Guid.NewGuid().ToString(), Value = message }) .GetAwaiter().GetResult(); Console.WriteLine($"offset:{result.Offset.Value},partition:{result.Partition.Value}"); } catch (ProduceException<string, string> e) { Console.WriteLine($"失败的消息: {e.Message} [{e.Error.Code}]"); continue; } } } } class KafkaProducer : IDisposable { private ProducerConfig _config = new ProducerConfig(); private IProducer<string, string> _producer; public KafkaProducer(string server = null) { if (string.IsNullOrEmpty(server)) { server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003"; } _config.BootstrapServers = server; _producer = new ProducerBuilder<string, string>(_config).Build(); } public async Task<DeliveryResult<string, string>> ProduceAsync(string topic, Message<string, string> message) { return await _producer.ProduceAsync(topic, message); } public void Dispose() { _producer?.Dispose(); } }
Consumer
static void Main(string[] args) { Console.WriteLine("默认只关注test主题的消息)"); using (var consumer = new KafkaConsumer()) { while (true) { consumer.Consume(a => { if (a == null) { Console.WriteLine("暂无消息"); } else { Console.WriteLine($"Key:{a.Message.Key},Value:{a.Message.Value}"); } }); } } } class KafkaConsumer : IDisposable { private IConsumer<string, string> _consumer; public KafkaConsumer(string server = null) { if (string.IsNullOrEmpty(server)) { server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003"; } var config = new ConsumerConfig { GroupId = "TestGroupone", BootstrapServers = server, AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<string, string>(config).Build(); //topic名称默认是test _consumer.Subscribe("test"); } public void Consume(Action<ConsumeResult<string, string>> action = null) { var consumerResult = _consumer.Consume(TimeSpan.FromSeconds(2)); action?.Invoke(consumerResult); } public void Dispose() { _consumer?.Dispose(); } }
标签:Console,string,producer,生产者,代码,server,new,kafka,consumer 来源: https://www.cnblogs.com/Ginease/p/16305225.html