其他分享
首页 > 其他分享> > 基于SpringBoot整合Kafaka

基于SpringBoot整合Kafaka

作者:互联网

一、安装及配置kafaka (版本:2.11)

1、安装配置过程

//下载解压
wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
或下载地址:https://kafka.apache.org/downloads.html
解压

//配置
vi server.properties
添加端口port=9092
添加host.name=实际Ip
修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释

//启动运行
./zookeeper-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/zookeeper.properties &
./kafka-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/server.properties &

 

2、成果

(1)启动zookeeper

 

 

(2)启动kafaka

 

二、springboot整合kafaka实现一个简单的发布/订阅消息系统

1、整合过程

//添加kafaka依赖
<!--kafka支持-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.9.RELEASE</version><!--$NO-MVN-MAN-VER$-->
    </dependency>
    
//在application.properties添加kafka配置
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.245.100:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

//创建kafaka生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * producer:kafaka生产者(发布消息)
 */
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     */
    public void sendChannelMess(String channel, String message){
        kafkaTemplate.send(channel,message);
    }

}

//创建Kafaka消费者
/**
 * consumer:kafaka消费者(订阅消息)
 */
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"seckill"})
    public void receiveMessage(String message){
        //收到通道的消息之后执行秒杀操作
        System.out.println("监听topic:"+"seckill"+",接收到该主题消息message:"+message);
    }

}

//创建测试控制器
@RestController
public class MessageController {

    @Autowired
    private KafkaSender kafkaSender;

    //@Autowired
    //private KafkaTemplate<String,Object> kafkaTemplate;


//    @RequestMapping("/message/send")
//    public boolean send(@RequestParam String message){
//
//        System.out.println("receive a request: /message/send");
//        kafkaTemplate.send("seckill",message);
//        return true;
//    }

    @RequestMapping("/sendMessageByKafaka")
    public boolean sendMessageByKafaka(){
        String message="这是一条关于kafaka消息订阅发布的测试消息!";
        String channel="seckill";

        System.out.println("receive a request: /sendMessageByKafaka");
        kafkaSender.sendChannelMess(channel,message);

        return true;
    }

}

//启动springboot项目即可

//在浏览器访问指定url,观察日志输出

 

2、成果

(1)发送消息:"This is kafaka"

 

 

(2)接收消息

 

 

(3)其它

 

三、FAQ问题集锦

1、启动kafaka项目出现“Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception during bean creation; ”?

原因:

java.lang.ClassNotFoundException: org.springframework.kafka.transaction.KafkaAwareTransactionManager

解决方法:

更换Kafaka依赖的版本2.x.x

 

2、启动kafaka项目出现“Consumer clientId=consumer-1, groupId=0] Connection to node -1 co”?

原因:服务器的kafaka配置有误,无法建立连接

解决方法:

修改配置项host.name=实际Ip,例如192.168.245.100(不能为127.0.0.1)

修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释

建立起连接后,kafaka项目打印日志及服务器打印日志,正确如下所示:

 

 

 

 

 

 

 

标签:SpringBoot,kafaka,spring,kafka,Kafaka,整合,org,message,public
来源: https://www.cnblogs.com/lkw-cnblogs/p/16126053.html