首页 > TAG信息列表 > spring-kafka

java – 用spring管理Kafka主题

我们计划在我们的应用程序中使用Kafka排队.我在RabbitMQ和Spring方面有一些经验. 使用RabbitMQ和Spring,我们曾经在启动spring服务时管理队列创建. 有了Kafka,我不确定什么是创建主题的最佳方式?有没有办法用Spring管理主题. 或者,我们应该编写一个单独的脚本来帮助创建主题吗?维护一

java – 春天Kafka听正则表达式

我试图用下面的代码听新创建的主题,但是没有用.如果下面的代码是正确的,你能告诉我吗? public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); private final ProcessEventModel eventModel;

Spring Boot Java Kafka配置,覆盖端口

我使用Spring Boot Kafka.这是我目前对Kafka非常简单的配置: @Configuration @EnableKafka public class KafkaConfig { } 此配置非常正常,并且能够在默认Kafka端口上连接到Kafka实例:9092 现在我需要改变端口,让我们说9093. 如何更新此Kafka配置以便能够在9093上连接?解决方法:我

java – 从Kafka多次读同一条消息

我使用Spring Kafka API来实现手动偏移管理的Kafka消费者: @KafkaListener(topics = "some_topic") public void onMessage(@Payload Message message, Acknowledgment acknowledgment) { if (someCondition) { acknowledgment.acknowledge(); } } 在这里,我希望

javascript – Spring Websocket与Kafka集成

我正在尝试通过Spring MVC项目中的Spring-Websockets将消耗的Kafka数据发送到前端(JavaScript). 为了建立服务器和客户端之间的通信,我有以下内容. 客户(app.js) function connect() { var socket = new SockJS('/kafka-data-websocket'); stompClient = Stomp.over(socke

java – Consumer.endOffsets如何在Kafka中运行?

假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,提交的偏移量和结束偏移量.与Kafka控制台消费者组脚本的工作方式类似,但适用于所有组. 就像是 单个消费者 – 不工作 – 不返回某些提供的主题分区的偏移量(例如,提供10个 – 返

java – Spring Kafka-将KafkaTemplate与Producer Listener配置并使用Listenable Future注册回调之间的区别

所以我正在阅读Spring kafka文档并遇到了Producer Listener.这就是Spring Kafka文档所说的 – “或者,您可以使用ProducerListener配置KafkaTemplate,以获取带有发送结果(成功或失败)的异步回调,而不是等待Future完成.” 他们还指定了界面 – public interface ProducerLis

如何使用Spring Kafka实现有状态消息监听器?

我想使用Spring Kafka API实现有状态监听器. 鉴于以下内容: > ConcurrentKafkaListenerContainerFactory,并发设置为“n” > Spring @Service类上的@KafkaListener注释方法 然后将创建“n”KafkaMessageListenerContainers.其中每一个都有自己的KafkaConsumer,因此会有“n”个消费者

java – Spring Kafka JsonSerializer用法

我想按照这里的说明操作: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/htmlsingle/#_serialization_deserialization_and_message_conversion 设置一个KafkaTemplate,它可以序列化并发送我拥有的一些简单的Java POJO.但我发现文档含糊不清,特别是这一部分: F

spring – kafka在重新平衡后停止使用来自新分配的分区的消息

我对kafka(也是英语……)很新,我面对这个问题,不能谷歌任何解决方案. 我使用spring-boot,spring-kafka支持,我在本地机器上安装了kafka_2.11-0.10.1.1(只有一个代理0) s1.然后我创建主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --parti

java – Kafka使用者异常和偏移提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误的最佳实践. 我想知道是否有人能够提供帮助: >分享围绕Kafka消费者应该做的最佳实践 当出现故障时>帮助我了解AckMode Record如何工作,以及在侦听器方法中抛出异常时如何防止提交到Kafka偏移

java – spring kafka thorws设置并发后的InstanceAlreadyExistsException异常> 1

我正在使用spring-kafka,如果我没有设置ConcurrentKafkaListenerContainerFactory的并发性,一切正常,当我将它设置为大于1的数字时,我得到一个异常: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3 我的配置: @Bean public Concu

一个Spring的KafkaConsumer监听器可以监听多个主题吗?

任何人都知道单个监听器是否可以监听下面的多个主题?我知道只有“topic1”有效,如果我想添加其他主题怎么办?你能否在下面展示两个例子?谢谢您的帮助! @KafkaListener(topics = "topic1,topic2") public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) { System.o

java – 将ObjectMapper注入Spring Kafka serialiser / deserialiser

我正在使用Spring Kafka 1.1.2-RELEASE和Spring Boot 1.5.0 RC,我已经配置了一个自定义值serialiser / deserialiser类,扩展了org.springframework.kafka.support.serializer.JsonSerializer / org.springframework.kafka.support. serializer.JsonDeserializer.这些类确实使用了Ja