首页 > TAG信息列表 > kafka-consumer-api

c#-Kafka Confluent库中轮询和消耗之间的区别

Confluent Kafka库的github示例page列出了两种方法,即轮询和消耗.两者有什么区别. 我确实在Confluent Kafka库here中查看了Consumer的实现,觉得它们在功能上是相同的,仅在返回值方面有所不同. Poll()调用consumer()来查看是否有准备接收的消息,如果是,则调用OnMessage事件.而使用,

卡夫卡消费者启动延迟汇合的dotnet

启动融合的dotnet使用者时,在调用订阅和随后的轮询之后,似乎需要很长时间才能从服务器接收到“分配的分区”事件,并因此收到消息(大约10-15秒). 起初我以为会有自动创建主题的开销,但是无论消费者的主题/消费者组是否已经存在,时间都是相同的. 我从此配置开始使用我的使用者,其余代

Java-Kafka崩溃时,Kafka使用者挂起轮询

我一直在研究Zookeeper和Kafka的基本设置,以学习如何使用它,但是我在与消费者打交道时遇到了麻烦.当Kafka不可用时,对poll()方法的调用将挂起,直到它重新联机. 卡夫卡版本:0.10.1.0 我的代码如下所示: KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consum

kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException

我正在为kafka运行一个简单的消费者,例如: int timeout = 80000; int bufferSize = 64*1024; consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName); 这可以正常运行几个小时,但出现异常 稍后的kafka.consumer.SimpleConsumer:由于套接字错误而重新连接: j

Kafka无法从头开始阅读–Java

我是kafka的新手,并尝试使用kafka构建一个生产者 – 消费者应用程序.在这里,我能够向kalka发送消息,但是当我尝试使用消费者消费它时,它返回0条记录. 我检查了我的消费者组的偏移量,我可以看到偏移量等于日志长度是相同的(在我的情况下为1M – 与记录数相同). 如果我在创建我的消

java – 为什么Apache Kafka使用者不使用Log4j2根记录器?

我有这个配置: 的pom.xml <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.apache.logg

java – Kafka Consumer将过多的DEBUG语句输出到控制台(ecilpse)

我正在运行一些来自http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html?page=2的示例代码,并且kafkaconsumer正在根据需要从主题中消耗,但每次轮询都会导致许多调试日志的打印(到标准输出),这是我不想要的. 我已经尝试在/config/log4j.

java – 从Kafka多次读同一条消息

我使用Spring Kafka API来实现手动偏移管理的Kafka消费者: @KafkaListener(topics = "some_topic") public void onMessage(@Payload Message message, Acknowledgment acknowledgment) { if (someCondition) { acknowledgment.acknowledge(); } } 在这里,我希望

java – Consumer.endOffsets如何在Kafka中运行?

假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,提交的偏移量和结束偏移量.与Kafka控制台消费者组脚本的工作方式类似,但适用于所有组. 就像是 单个消费者 – 不工作 – 不返回某些提供的主题分区的偏移量(例如,提供10个 – 返

java – Kafka突然重置消费者Offset

我正在和Kafka 0.8& zookeeper 3.3.5.实际上,我们有十几个主题,我们正在消费没有任何问题. 最近,我们开始提供并消费一个有奇怪行为的新主题.消耗的偏移突然重置.它尊重我们设置的auto.offset.reset策略(实际上是最小的),但我无法理解为什么该主题突然重置其偏移量. 我正在使用高级

java – 如何获取消费者组的最后消耗偏移量?

我在消费者组中有两个使用相同kafka主题分区的消费者.我想从消费者B内部获得消费者A的最后读取偏移量.任何想法,如何实现这一点?解决方法:永远不会将单个分区分配给同一组中的两个使用者实例. 您可以使用以下脚本来了解上次消耗的偏移量 sh kafka-consumer-groups.sh --bootstrap-s

java – 无法在kafka consumer下设置’max.poll.records’,其中cons.poll仍然返回分区下的所有记录

我创建了多线程消费者应用程序来处理各种分区. 查看各种博客,我开始了解’max.poll.records’属性,以便控制来自给定主题,分区的记录集.(因此它可以很快从记录循环中出来,因此调用cons.poll ()保持活力) 问题是我的处理逻辑需要时间来处理每条记录.在启动Cons-2时,两者都开始在相同

java – Kafka使用者异常和偏移提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误的最佳实践. 我想知道是否有人能够提供帮助: >分享围绕Kafka消费者应该做的最佳实践 当出现故障时>帮助我了解AckMode Record如何工作,以及在侦听器方法中抛出异常时如何防止提交到Kafka偏移

java – Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1. 因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点. 我真正感兴趣的时间戳是由流利的信息发送的: “timestamp”:”15078

如何找回Kafka生产者和消费者配置(Java API)?

用例如下. 我在Java代码中通过许多对象实例传递生产者或消费者引用.在其中一些我想对Kafka配置进行一些检查.这意味着我想回来,Kafka Producer / Consumer中存储了哪些有效的配置(包括默认值).我没有看到java docs中的anthing: > KafkaProducer> KafkaConsumer 那么,如何找回Kafka生

java – 将字节数组发送到storm kafka bolt

我写了一个风暴拓扑.我基本上想要以字节数组的形式将avro架构中的元组发送到kafka主题. 这就是我设置螺栓的方法: builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>()) .fieldsGrouping(BOLT1, new Fields("key")); 这就是我转换为字节数组的

如何在kafka中找到分配给哪个主题分区的消费者?

我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给一个使用者组中的哪个使用者. 假设存在具有n个分区的消费者群组-A消费主题-A,因此在不同VM中托管的群组A中可以有多个消费者.那么如何找到哪个分区被分配给哪个消费者主机?在kafka 0.9.1中有可能吗? 提前致谢.解决方法:您

java – Kafka模式订阅.新主题没有触发重新平衡

根据kafka javadocs的文件,如果我: >订阅模式 >创建与模式匹配的主题 应该发生重新平衡,这使消费者从该新主题中读取.但那并没有发生. 如果我停止并启动消费者,它确实会选择新主题.所以我知道新主题与模式匹配.这个问题可能在https://stackoverflow.com/questions/37120537/whitelis