其他分享
首页 > 其他分享> > 【kafka】生产者和消费者代码

【kafka】生产者和消费者代码

作者:互联网

 

Producer

static void Main(string[] args)
        {
            Console.WriteLine("请输入消息内容");
            using (var producer = new KafkaProducer())
            {
                while (true)
                {
                    string message = Console.ReadLine();
                    try
                    {
                        //topic名称是test
                        var result = producer.ProduceAsync("test",
                        new Confluent.Kafka.Message<string, string>() { Key = Guid.NewGuid().ToString(), Value = message })
                            .GetAwaiter().GetResult();
                        Console.WriteLine($"offset:{result.Offset.Value},partition:{result.Partition.Value}");
                    }
                    catch (ProduceException<string, string> e)
                    {
                        Console.WriteLine($"失败的消息: {e.Message} [{e.Error.Code}]");
                        continue;
                    }

                }
            }
        }
        class KafkaProducer : IDisposable
        {
            private ProducerConfig _config = new ProducerConfig();
            private IProducer<string, string> _producer;
            public KafkaProducer(string server = null)
            {
                if (string.IsNullOrEmpty(server))
                {
                    server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003";

                }
                _config.BootstrapServers = server;
                _producer = new ProducerBuilder<string, string>(_config).Build();

            }

            public async Task<DeliveryResult<string, string>> ProduceAsync(string topic, Message<string, string> message)
            {
                return await _producer.ProduceAsync(topic, message);

            }

            public void Dispose()
            {
                _producer?.Dispose();
            }
        }

 Consumer

static void Main(string[] args)
        {
            Console.WriteLine("默认只关注test主题的消息)");
            using (var consumer = new KafkaConsumer())
            {
                while (true)
                {
                    consumer.Consume(a =>
                    {
                        if (a == null)
                        {
                            Console.WriteLine("暂无消息");
                        }
                        else
                        {
                            Console.WriteLine($"Key:{a.Message.Key},Value:{a.Message.Value}");
                        }
                    });
                }
            }
        }

        class KafkaConsumer : IDisposable
        {
            private IConsumer<string, string> _consumer;
            public KafkaConsumer(string server = null)
            {
                if (string.IsNullOrEmpty(server))
                {
                   server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003";
                }
                var config = new ConsumerConfig
                {
                    GroupId = "TestGroupone",
                    BootstrapServers = server,
                    AutoOffsetReset = AutoOffsetReset.Earliest
                };
                _consumer = new ConsumerBuilder<string, string>(config).Build();
                //topic名称默认是test
                _consumer.Subscribe("test");

            }

            public void Consume(Action<ConsumeResult<string, string>> action = null)
            {
                var consumerResult = _consumer.Consume(TimeSpan.FromSeconds(2));
                action?.Invoke(consumerResult);
            }

            public void Dispose()
            {
                _consumer?.Dispose();
            }
        }

 

标签:Console,string,producer,生产者,代码,server,new,kafka,consumer
来源: https://www.cnblogs.com/Ginease/p/16305225.html