首页 > TAG信息列表 > apache-kafka-streams

java-Kafka KStream-衡量消费者滞后

由于我的基于KStream的应用程序未遵循传统的Kafka消费者路线,我应如何跟踪消费者的滞后时间?通常我会使用ConsumerOffsetChecker(或类似的值),但是它需要使用者组名称. 我应该怎么用呢? (我想对此进行跟踪,以便我可以判断是否/何时启动新消费者)解决方法:Kafka Streams内部使用KafkaC

java-Kafka-如何同时使用filter和filternot?

我有一个Kafka流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题. KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic"); stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.Stri

java-如何将两个Kafka流结合在一起,并在具有Avro值的主题中产生结果

我有两个Kafka Streams,它们具有String键和我使用KSQL创建的Avro格式的值. 这是第一个: DESCRIBE EXTENDED STREAM_1; Type : STREAM Key field : IDUSER Timestamp field : Not set - using <ROWTIME> Key format : STRING Value form

java-Kafka流:从应用程序的每个实例中的所有分区读取

当使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取.我尝试使用GlobalKTable来实现这一点,但问题是数据将被覆盖,并且聚合也无法应用于其上. 假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3).当我运行Kafka流应用程序的3个实例(I

java – 无法在IDE中删除Kafka Stream Application的状态目录

我正在开发一个简单的Kafka Stream应用程序,它从主题中提取消息并在转换后将其放入另一个主题.我正在使用Intelij进行开发. 当我调试/运行这个应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,它将完美运行 (i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and

java – Kafka Streams错误 – 分区上的偏移提交失败,请求超时

我们使用Kafka Streams来消费,处理和生成消息,而在PROD环境中,我们遇到了多个主题的错误: ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] Offset commit failed on partition xxx-

java – 为什么kafka流会重新处理代理重启后生成的消息

我有一个单节点kafka代理和简单的流应用程序.我创建了2个主题(topic1和topic2). 在topic1上生成 – 处理过的消息 – 写入topic2 注意:对于生成的每条消息,只有一条消息写入目标主题 我制作了一条消息.在写入topic2之后,我停止了kafka经纪人.一段时间后,我重新启动了代理并在topic1

java – 从主题读取时写入磁盘的KafkaStreams

我一直在研究Kafka Streams应用程序上的磁盘写入,我将拓扑结构减少到最低限度,即: KStream<String, JsonElement> stream = builder.stream("input-topic"); 然而,在docker统计数据上,我可以观察到我的应用程序一直在向磁盘写入内容.我检查了容器,我看不到任何可疑的文件句柄. 如

java – kafkastreams – 增加更多处理能力

我正在研究POC转换现有的Flink应用程序/拓扑以使用KafkaStreams.我的问题是关于部署. 具体来说 – 在Flink中,将“工作节点”添加到flink安装中,然后为拓扑添加更多并行化以跟上不断增加的数据速率. 随着数据速率的增加,如何增加KStream容量? KStreams会自动处理吗?我是否会启动更多

java – Kafka Streams本地国营商店

我有一个简单的流应用程序将一个主题作为输入流并将KeyValues转换为另一个,如: StoreBuilder<KeyValueStore<Long, CategoryDto>> builder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),

java – KafkaStreams serde异常

我正在玩Kafka和溪流技术;我已经为KStream创建了一个自定义序列化器和反序列化器,我将用它来接收来自给定主题的消息. 现在,问题是我正在以这种方式创建一个serde: JsonSerializer<EventMessage> serializer = new JsonSerializer<>(); JsonDeserializer<EventMessage> deserialize

java – Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1. 因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点. 我真正感兴趣的时间戳是由流利的信息发送的: “timestamp”:”15078