基于SpringBoot整合Kafaka
作者:互联网
一、安装及配置kafaka (版本:2.11)
1、安装配置过程
//下载解压 wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz 或下载地址:https://kafka.apache.org/downloads.html 解压 //配置 vi server.properties 添加端口port=9092 添加host.name=实际Ip 修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释 //启动运行 ./zookeeper-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/zookeeper.properties & ./kafka-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/server.properties &
2、成果
(1)启动zookeeper
(2)启动kafaka
二、springboot整合kafaka实现一个简单的发布/订阅消息系统
1、整合过程
//添加kafaka依赖 <!--kafka支持--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.9.RELEASE</version><!--$NO-MVN-MAN-VER$--> </dependency> //在application.properties添加kafka配置 #kafka相关配置 spring.kafka.bootstrap-servers=192.168.245.100:9092 #设置一个默认组 spring.kafka.consumer.group-id=0 #key-value序列化反序列化 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #每次批量发送消息的数量 spring.kafka.producer.batch-size=65536 spring.kafka.producer.buffer-memory=524288 //创建kafaka生产者 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * producer:kafaka生产者(发布消息) */ @Component public class KafkaSender { @Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 发送消息到kafka */ public void sendChannelMess(String channel, String message){ kafkaTemplate.send(channel,message); } } //创建Kafaka消费者 /** * consumer:kafaka消费者(订阅消息) */ @Component public class KafkaConsumer { @KafkaListener(topics = {"seckill"}) public void receiveMessage(String message){ //收到通道的消息之后执行秒杀操作 System.out.println("监听topic:"+"seckill"+",接收到该主题消息message:"+message); } } //创建测试控制器 @RestController public class MessageController { @Autowired private KafkaSender kafkaSender; //@Autowired //private KafkaTemplate<String,Object> kafkaTemplate; // @RequestMapping("/message/send") // public boolean send(@RequestParam String message){ // // System.out.println("receive a request: /message/send"); // kafkaTemplate.send("seckill",message); // return true; // } @RequestMapping("/sendMessageByKafaka") public boolean sendMessageByKafaka(){ String message="这是一条关于kafaka消息订阅发布的测试消息!"; String channel="seckill"; System.out.println("receive a request: /sendMessageByKafaka"); kafkaSender.sendChannelMess(channel,message); return true; } } //启动springboot项目即可 //在浏览器访问指定url,观察日志输出
2、成果
(1)发送消息:"This is kafaka"
(2)接收消息
(3)其它
三、FAQ问题集锦
1、启动kafaka项目出现“Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception during bean creation; ”?
原因:
java.lang.ClassNotFoundException: org.springframework.kafka.transaction.KafkaAwareTransactionManager
解决方法:
更换Kafaka依赖的版本2.x.x
2、启动kafaka项目出现“Consumer clientId=consumer-1, groupId=0] Connection to node -1 co”?
原因:服务器的kafaka配置有误,无法建立连接
解决方法:
修改配置项host.name=实际Ip,例如192.168.245.100(不能为127.0.0.1)
修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释
建立起连接后,kafaka项目打印日志及服务器打印日志,正确如下所示:
标签:SpringBoot,kafaka,spring,kafka,Kafaka,整合,org,message,public 来源: https://www.cnblogs.com/lkw-cnblogs/p/16126053.html