其他分享
首页 > 其他分享> > Kafka消息队列

Kafka消息队列

作者:互联网

 

 

之前也学习过消息队列,但一直没有使用的场景,今天项目中遇到了 kafka 那便有了应用场景


1. Kafka

Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:



之前 笔者也写过 RabbitMQ 的笔记,传送门







2. 生产消费模型

结合 kafka 的下面这些名词来解释其模型会更加容易理解

名称解释
Broker kafka 的实例,部署多台 kafka 就是有多个 broker
Topic 消息订阅的话题,是这些消息的分类,类似于消息订阅的频道
Producer 生产者,负责往 kafka 发送消息
Consumer 消费者,从 kafka 读取消息来进行消费







3. 安装部署

kafka 和依赖的 zookeeper 是 java 编写的工具,其需要 jdk8 及其以上。笔者这里使用 Docker 安装,偷懒了贪图方便快捷

# 使用 wurstmeister 制作的镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka


# 启动 zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper


# 单机启动 kafka
docker run  -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka






4. Quickstart

kafka 官网也有很好的介绍,quickstart

# 进入kafka容器
docker exec -it kafka /bin/sh


# 进入 bin 目录
cd /opt/kafka_2.13-2.8.1/bin


# partitions 分区
# replication 副本因子
# 创建一个主题(参数不懂可直接填写,后面会讲解说明)
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092


# 查看
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092


# 写入 topic(回车表示一条消息,ctrl + c 结束输入)
# 消息默认存储 7 天,下一步的消费可以验证
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event


# 读取 topic(运行多次可以读取消息,因为默认存储 7 天)
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092






5. SpringBoot 集成

SpringBoot 集成了 Kafka,添加依赖后可使用内置的 KafkaTemplate 模板方法来操作 kafka 消息队列


5.1 添加依赖

<!--  sprinboot版本管理中有kafka可不写版本号  -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


5.2 配置文件

server:
  port: 8080

spring:
  # 消息队列
  kafka:
    producer:
      # broker地址,重试次数,确认接收个数,消息的编解码方式
      bootstrap-servers: 101.200.197.22:9092
      retries: 3
      acks: 1
      key-serializer: org.springframework.kafka.support.serializer.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.StringSerializer
    consumer:
      # broker地址,自动提交,分区offset设置
      bootstrap-servers: 101.200.197.22:9092
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


5.3 生产者

@RestController
@RequestMapping("/kafka")
public class Producer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/producer1")
    public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        SendResult<String, Object> sendResult = future.get();
        return sendResult.toString();
    }

    @GetMapping("/producer2")
    public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("faile");
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("success");
            }
        });
        return "";
    }
}


5.4 消费者

@Component
public class Consumer {

    @KafkaListener(topics = {"topic1"})
    public void onMessage(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}






6. 存储目录结构

kafka
|____kafka-logs
    |____topic1
    |	  |____00000000000000000000.log(存储接收的消息)
    |	  |____consumer_offsets-01(消费者偏移量)
    |	  |____consumer_offsets-02
    |____topic2
    	  |____00000000000000000000.log
    	  |____consumer_offsets-01
    	  |____consumer_offsets-02

每台 broker 实例接收到消息后将之存储到 00000.log 里面,保存的方式是先入先出。消息被消费后不会被删除,相反可以设置 topic 的消息保留时间,重要的是 Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的


消费者会将自己消费偏移量 offset 提交给 topic 在 _consumer_offsets 里面保存,然后通过偏移量来确定消息的位置,默认从上次消费的位置开始,添加参数 --frombeginning 则从头开始消费,可获取之前所有存储的消息。kafka 也会定期清除内部的消息,直到保存最新的一条(文件保存的消息默认保存 7 天)







7. 消费组

这个在笔者配置消费者的时候发现的问题,启动时报错说没有指定消费组








8. 分区和副本

topic 消息保存的文件 0000.log 可以进行物理切分,这就是分区的概念,类似于数据库的分库分表。这样做的好处在于单个保存的文件不会太大从而影响性能,最重要的是分区后不是单个文件串行执行了,而是多区多文件可并行执行提高了并发能力




分区:消费者会消费同一 topic 的不同分区,所以会保存不同分区的偏移量,其格式为:GroupId + topic + 分区号

副本:副本是对分区的备份,集群中不同的分区在不同的 broker 上,但副本会对该分区备份到指定数量的 broker 上,这些副本有 leader 和 follower 的区别,leader负责读写,挂了再重新选举,副本为了保持数据一致性







9. 常见问题


9.1 生产者同步和异步消息

生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次

生产者 ack 确认配置:



9.2 消费者自动提交和手动提交



9.3 消息丢失和重复消费



9.4 顺序消费方案

标签:队列,分区,9092,kafka,topic,--,消息,Kafka
来源: https://www.cnblogs.com/zeenzhou/p/15782723.html