Kafka漫谈(一)
作者:互联网
Kafka跟作家kafka的唯一关系在于,开发者喜欢kafka,实际上它是一种高吞吐量的分布式发布订阅消息系统。同样Rocketmq也是一种消息订阅系统,人们经常拿他们进行对比,Rocketmq可以详见该专栏RocketMQ漫谈:从简介到启动(一)。
文章目录
下文主要围绕以下几点进行展开:
- kafka能解决什么问题
- 与Rocketmq对比
- 常用概念介绍
- 安装实操环境
一、安装
在介绍概念之前,我们先直接上手安装:
- 访问官网进行下载,http://kafka.apache.org/downloads。Tips:若你的机器环境是Windows,则不建议安装最新版本的kafka,会出现各类未知问题,这里使用2.1.1版本进行安装使用。
- 解压下载的tgz压缩文件,目录结构如下:
-rw-r--r-- 1 Vainycos 197121 32216 Feb 9 2019 LICENSE
-rw-r--r-- 1 Vainycos 197121 336 Feb 9 2019 NOTICE
drwxr-xr-x 1 Vainycos 197121 0 Feb 9 2019 bin/
drwxr-xr-x 1 Vainycos 197121 0 Feb 9 2019 config/
-rw-r--r-- 1 Vainycos 197121 67317760 Nov 18 14:34 kafka_2.11-2.1.1.tar
drwxr-xr-x 1 Vainycos 197121 0 Feb 9 2019 libs/
drwxr-xr-x 1 Vainycos 197121 0 Nov 18 15:28 logs/
drwxr-xr-x 1 Vainycos 197121 0 Feb 9 2019 site-docs/
主要关注bin目录和conf目录,其中bin目录下的windows文件夹对应windows下的环境;conf目录对应配置文件,采用默认配置暂时不做任何调整。
- 开始启动环节
以下命令均在kafka根目录下执行。
-
启动zk(由于kafka需要依赖zk,使用内置zk启动):启动完成后需保持黑框窗口不关闭
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
-
启动kafka:新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭
.\bin\windows\kafka-server-start.bat .\config\server.properties
-
创建一个"Hello-Kafka"主题:新开一个黑框窗口运行,创建完毕后可关闭黑框窗口
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
-
发送消息(生产者):新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic Hello-Kafka
-
接收消息(消费者):新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic Hello-Kafka --from-beginning
-
测试消息发送
上面的黑框窗口是发送消息,下面的黑框窗口是接收消息,可以看到消息被成功发送和接收了。
二、QA问答
Q:为什么启动kafka前需要先启动zookeeper?
kafka强依赖于zookeeper,没有zookeeper都没办法启动kafka,因为kafka将元数据的管理都丢给了zookeeper进行管理。(目前也开始在做低依赖zk的版本了,即Zookeeper-less Kafka,但是不建议直接在生产环境中使用,让子弹再飞一会儿,详见参考资料)
Q:创建主题Topic的意义?
在了解Topic之前还应该先接触broker。由于kafka支持集群部署,所以每一个部署的节点就是broker,即上面我们启动的一个kafka就是一个broker。而在每一个broker中还需要区分不同的Topic主题,可类比为发送人和接收人,且不同主题的信息互不干扰。
Q:使用Rocketmq还是kafka?
在之前的专栏中我就已经介绍过Rocketmq,也是一款优秀的消息中间件。
Rocketmq早期由阿里开发主导,使用的是java语言,对java生态支持较好,目前属于apache下的开源项目。
kafka也同样隶属于apache下的开源项目,由Scala和Java编写。
kafka经常被拿来与Rocketmq来作比较,论单机性能来说kafka更胜一筹,但是在数据可靠性上Rocketmq更优。
kafka仅能用作日志传输的说法不敢苟同,后期的kafka项目不仅仅满足于日志传输;而Rocketmq是阿里在实际业务需要的产物,自然有它的优点。
结论,没有银弹,视各自的业务需要做选择。
Q:能解决什么问题?
通过上面的测试收发消息,不就是一个简单的发送和接收吗,能解决什么实际需求呢。
同样类比于Rocketmq,kafka这一类的mq中间件其实能解决我们很多业务上的痛点,例如我们在某个业务中用户点击一个按钮进行某个流程周转,其中某一步操作需要给用户发送一条短信,而这一步操作如果需要等待短信发送是否成功将让用户觉得点一个按钮为什么要等待那么久,而我们引入mq则将等待较慢的业务进行解耦,我们只需要发一个消息给mq之后就可以流转其他流程了,因为mq会把产生的消息给消费掉,即发布订阅模型。消费时长决定于消息队列是否忙碌,一般情况下消费时长也挺短的;
还有一种情况是业务系统的高并发情况下,例如秒杀环境,上游订单流量瞬时增加,引入mq这一层即可以将订单消息先存起来,由消费者通过订阅模式进行处理。
以上总结其实就是一句话,生产者生成数据,将数据发送到一个缓存区域,消费者从缓存区域中消费数据。
三、整合kafka
项目示例地址:https://github.com/Vainycos/my_kafka.git(可以clone该仓库进行调试学习,但是更建议参考以下步骤自己动手实现一个)
-
引入依赖
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
生产者
package com.vainycos.mykafka.mq.producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.Resource; /** * @author: Vainycos * @description kafka生产者 * @date: 2021/11/22 11:14 */ @Component public class MyKafkaProducer { private Logger logger = LoggerFactory.getLogger(MyKafkaProducer.class); @Resource private KafkaTemplate<String, Object> kafkaTemplate; /** * 定义MQ主题:Hello-Kafka */ public static final String TOPIC_INVOICE = "Hello-Kafka"; /** * 发送消息 * @param msg * @return */ public ListenableFuture<SendResult<String, Object>> sendMsg(String msg) { logger.info("发送MQ消息 topic:{} message:{}", TOPIC_INVOICE, msg); return kafkaTemplate.send(TOPIC_INVOICE, msg); } }
-
消费者
package com.vainycos.mykafka.mq.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import java.util.Optional; /** * @author: Vainycos * @description kafka消费者 * @date: 2021/11/22 11:17 */ @Component public class KafkaConsumer { private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = "Hello-Kafka", groupId = "Hello-Kafka") public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional<?> message = Optional.ofNullable(record.value()); // 1. 判断消息是否存在 if (!message.isPresent()) { return; } // 2. 处理 MQ 消息 try { Assert.isTrue(true, "消费成功!"); // 3. 打印日志 logger.info("消费MQ消息,完成 topic:{} 消费信息:{}", topic, message.get()); // 4. 消息消费完成 ack.acknowledge(); } catch (Exception e) { // 消息重试,需要保证幂等。 logger.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get()); throw e; } } }
-
定时发送测试消息
package com.vainycos.mykafka; import com.vainycos.mykafka.mq.producer.MyKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import javax.annotation.Resource; import java.time.LocalDateTime; /** * @author: Vainycos * @description 定时发送mq消息 * @date: 2021/11/22 11:22 */ @Component public class SendKafkaMsgTask { private Logger logger = LoggerFactory.getLogger(SendKafkaMsgTask.class); @Resource private MyKafkaProducer kafkaProducer; /** * 每5s执行一次消息发送 */ @Scheduled(cron = "0/5 * * * * ?") void sendMsg(){ // 发送当前时间 ListenableFuture<SendResult<String, Object>> future = kafkaProducer.sendMsg(LocalDateTime.now().toString()); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { // MQ 消息发送成功 logger.info("发送MQ消息成功 topic:{}", MyKafkaProducer.TOPIC_INVOICE); } @Override public void onFailure(Throwable throwable) { // MQ 消息发送失败 logger.error("发送MQ消息失败 topic:{}", MyKafkaProducer.TOPIC_INVOICE); } }); } }
我们每隔5秒就用当前时间戳作为消息发送,在黑框控制台里就能看到发送成功的消息:
四、总结
以上我们就实现了一个简单的kafka消息中间件集成作为入门,实际业务过程中我们可以发送具体对象,通过json进行序列化发送,或者实现单独的解码编码直接传输对象。
参考资料:
- KAFKA Windows环境下的问题- ERROR Failed to write meta.properties
- Kafka为什么要抛弃ZooKeeper?
- Kafka 不再需要 ZooKeeper
- Kafka学习之路 (一)Kafka的简介
- 技术选型:RocketMQ or Kafka
标签:--,漫谈,springframework,kafka,发送,org,import,Kafka 来源: https://blog.csdn.net/imVainiycos/article/details/121469007