其他分享
首页 > 其他分享> > 震惊了,原来这才是Kafka的“真面目”!

震惊了,原来这才是Kafka的“真面目”!

作者:互联网

Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。


image.png

Kafka 对外使用 Topic 的概念,生产者往 Topic 里写消息,消费者从中读消息。


为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partition 的数量来进行横向扩容。单个 Parition 内是保证消息有序。


每新写一条消息,Kafka 就是在对应的文件 append 写,所以性能非常高。


Kafka 的总体数据流是这样的:

image.png

大概用法就是,Producers 往 Brokers 里面的指定 Topic 中写消息,Consumers 从 Brokers 里面拉取指定 Topic 的消息,然后进行业务处理。


图中有两个 Topic,Topic0 有两个 Partition,Topic1 有一个 Partition,三副本备份。


可以看到 Consumer Gourp1 中的 Consumer2 没有分到 Partition 处理,这是有可能出现的,下面会讲到。


关于 Broker、Topics、Partitions 的一些元信息用 ZK 来存,监控和路由啥的也都会用到 ZK。


生产


基本流程是这样的:

image.png

创建一条记录,记录中一个要指定对应的 Topic 和 Value,Key 和 Partition 可选。 


先序列化,然后按照 Topic 和 Partition,放进对应的发送队列中。Kafka Produce 都是批量请求,会积攒一批,然后一起发送,不是调 send() 就立刻进行网络发包。


如果 Partition 没填,那么情况会是这样的:


这些要发往同一个 Partition 的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。


API


有 High Level API,替我们把很多事情都干了,Offset,路由啥都替我们干了,用起来很简单。


还有 Simple API,Offset 啥的都是要我们自己记录。(注:消息消费的时候,首先要知道去哪消费,这就是路由,消费完之后,要记录消费单哪,就是 Offset)


Partition


当存在多副本的情况下,会尽量把多个副本,分配到不同的 Broker 上。


Kafka 会为 Partition 选出一个 Leader,之后所有该 Partition 的请求,实际操作的都是 Leader,然后再同步到其他的 Follower。


当一个 Broker 歇菜后,所有 Leader 在该 Broker 上的 Partition 都会重新选举,选出一个 Leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)


然后这里就涉及两个细节:


关于 Partition 的分配,还有 Leader 的选举,总得有个执行者。在 Kafka 中,这个执行者就叫 Controller。


Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。


Partition 的分配:


Leader 容灾


Controller 会在 ZK 的 /brokers/ids 节点上注册 Watch,一旦有 Broker 宕机,它就能知道。


当 Broker 宕机后,Controller 就会给受到影响的 Partition 选出新 Leader。


Controller 从 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,读取对应 Partition 的 ISR(in-sync replica 已同步的副本)列表,选一个出来做 Leader。


选出 Leader 后,更新 ZK,然后发送 LeaderAndISRRequest 给受影响的 Broker,让它们知道改变这事。


为什么这里不是使用 ZK 通知,而是直接给 Broker 发送 RPC 请求,我的理解可能是这样做 ZK 有性能问题吧。


如果 ISR 列表是空,那么会根据配置,随便选一个 Replica 做 Leader,或者干脆这个 Partition 就是歇菜。


如果 ISR 列表的有机器,但是也歇菜了,那么还可以等 ISR 的机器活过来。


多副本同步


这里的策略,服务端这边的处理是 Follower 从 Leader 批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。


生产者生产消息的时候,通过 request.required.acks 参数来设置数据的可靠性。

image.png

在 Acks=-1 的时候,如果 ISR 少于 min.insync.replicas 指定的数目,那么就会返回不可用。


这里 ISR 列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从 ISR 列表中剔除。


以前还有根据落后多少条消息就踢出 ISR,在 1.0 版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出 ISR 列表。


从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。


因为新的 Leader 选出来后,Follower 上面的数据,可能比新 Leader 多,所以要截取。


这里高水位的意思,对于 Partition 和 Leader,就是所有 ISR 中都有的最新一条记录。消费者最多只能读到高水位。


从 Leader 的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一轮的 Fetch 中才能告诉 Leader。


也正是由于这个高水位延迟一轮,在一些情况下,Kafka 会出现丢数据和主备数据不一致的情况,0.11 开始,使用 Leader Epoch 来代替高水位。


思考:当 Acks=-1 时


消费


订阅 Topic 是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个 Partition。


换句话来说,就是一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。


因此,如果消费组内的消费者如果比 Partition 多的话,那么就会有个别消费者一直空闲。

image.png

API


订阅 Topic 时,可以用正则表达式,如果有新 Topic 匹配上,那能自动订阅上。


Offset 的保存


一个消费组消费 Partition,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次。


这里 ZK 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 总剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中。


写进消息的 Key 由 Groupid、Topic、Partition 组成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。总是保留最新的 Key,其余删掉。


一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回。


确定 Consumer Group 位移信息写入 consumers_offsets 的哪个 Partition,具体计算公式:

__consumers_offsets partition =
           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。


思考:如果正在跑的服务,修改了 offsets.topic.num.partitions,那么 Offset 的保存是不是就乱套了?


分配 Partition—Reblance


生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。


类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。


下面从顶向下,分别阐述一下:


①选 Coordinator:看 Offset 保存在那个 Partition;该 Partition Leader 所在的 Broker 就是被选定的 Coordinator。


这里我们可以看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一台机器。


②交互流程:把 Coordinator 选出来之后,就是要分配了。整个流程是这样的:


③Reblance 流程:


当 Partition 或者消费者的数量发生变化时,都得进行 Reblance。


列举一下会 Reblance 的情况:


消息投递语义


Kafka 支持 3 种消息投递语义:


在业务中,常常都是使用 At least once 的模型,如果需要可重入的话,往往是业务自己实现。


At least once


先获取数据,再进行业务处理,业务处理成功后 Commit Offset:


At most once


先获取数据,再 Commit Offset,最后进行业务处理:


Exactly once


思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着 At least once 的原因来搞。


首先想出来的:


由于业务接口是否幂等,不是 Kafka 能保证的,所以 Kafka 这里提供的 Exactly once 是有限制的,消费者的下游也必须是 Kafka。


所以以下讨论的,没特殊说明,消费者的下游系统都是 Kafka(注:使用 Kafka Conector,它对部分系统做了适配,实现了 Exactly once)。生产者幂等性好做,没啥问题。


解决重复消费有两个方法:


本来 Exactly once 实现第 1 点就 OK 了。但是在一些使用场景下,我们的数据源可能是多个 Topic,处理后输出到多个 Topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。


既然要做事务,那么干脆把重复消费的问题从根源上解决,把 Commit Offset 和输出到其他 Topic 绑定成一个事务。


生产幂等性


思路是这样的,为每个 Producer 分配一个 Pid,作为该 Producer 的唯一标识。


Producer 会为每一个维护一个单调递增的 Seq。类似的,Broker 也会为每个记录下最新的 Seq。


当 req_seq == broker_seq+1 时,Broker 才会接受该消息,因为:


事务性/原子性广播


场景是这样的:


其中第 2、3 点作为一个事务,要么全成功,要么全失败。这里得益于 Offset 实际上是用特殊的 Topic 去保存,这两点都归一为写多个 Topic 的事务性处理。


基本思路是这样的:


做事务时,先标记开启事务,写入数据,全部成功就在 Transaction Log 中记录为 Prepare Commit 状态,否则写入 Prepare Abort 的状态。


之后再去给每个相关的 Partition 写入一条 Marker(Commit 或者 Abort)消息,标记这个事务的 Message 可以被读取或已经废弃。成功后在 Transaction Log记录下 Commit/Abort 状态,至此事务结束。

数据流:


这里 Prepare 的状态主要是用于事务恢复,例如给相关的 Partition 发送控制消息,没发完就宕机了,备机起来后,Producer 发送请求获取 Pid 时,会把未完成的事务接着完成。


当 Partition 中写入 Commit 的 Marker 后,相关的消息就可被读取。所以 Kafka 事务在 Prepare Commit 到 Commit 这个时间段内,消息是逐渐可见的,而不是同一时刻可见。


消费事务


前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。


消费时,Partition 中会存在一些消息处于未 Commit 状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,Kafka 选择在消费者进程中进行过来,而不是在 Broker 中过滤,主要考虑的还是性能。


Kafka 高性能的一个关键点是 Zero Copy,如果需要在 Broker 中过滤,那么势必需要读取消息内容到内存,就会失去 Zero Copy 的特性。


文件组织


Kafka 的数据,实际上是以文件的形式存储在文件系统的。Topic 下有 Partition,Partition 下有 Segment,Segment 是实际的一个个文件,Topic 和 Partition 都是抽象概念。


在目录 /partitionid}/ 下,存储着实际的 Log 文件(即 Segment),还有对应的索引文件。


每个 Segment 文件大小相等,文件名以这个 Segment 中最小的 Offset 命名,文件扩展名是 .log。Segment 对应的索引的文件名字一样,扩展名是 .index。


有两个 Index 文件:


总体的组织是这样的:

image.png

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个 Message 都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 


索引包含两部分:


查找 Offset 对应的记录时,会先用二分法,找出对应的 Offset 在哪个 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍历查找 Message。


常用配置项


Broker 配置


image.png

Topic 配置


image.png

关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。


还有 0.10 之前的版本,时间看的是日志文件的 Mtime,但这个值是不准确的,有可能文件被 Touch 一下,Mtime 就变了。因此从 0.10 版本开始,改为使用该文件最新一条消息的时间来判断。


按大小清理这里也要注意,Kafka 在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。



标签:Topic,Partition,Broker,Kafka,真面目,Leader,Offset,震惊
来源: https://blog.51cto.com/14410880/2550957