3.消费者:高级API和低级API(5)
作者:互联网
3.5 消费者提交分区偏移量
消费者提交偏移量是为了保存分区的消费进度。因为Kafka保证同一个分区只会分配给消费组中的唯一消费者,所以即使发生再平衡后,分区和消费者的所有权关系发生变化,新消费者也可以接着上一个消费者记录的偏移盘位置继续消费消息。
但是消费者即使记录了分区的偏移量,仍然无法解决消息被重复消费的问题。例如,消费者l每隔10秒提交一次偏移量,在10秒时提交的偏移量是100,下一次提交的日们可点是20秒。在20秒之前,消费者l又消费了30条消息,然后消费者l突然挂掉了。由于偏移量现在仍然停留在10。这个位置,因此新的消费者2也只会从10。这个位置继续消费,从而会重复处理偏移量为100之后的30条消息。
通常消息被重复处理是可以接受的,至少不会出现消息丢失这种不可接受的问题。定时提交偏移量的周期时间越长,消息被重复消费的数据量就越多。客户端可以将这个周期时间设置得更短,来减少重复消费的消息量。当然也不能太短,否则会导致客户端和保存偏移量的存储系统产生大量的网络请求。
在旧版本中每个分区的偏移量都保存到ZK中,每个分区都要和ZK产生一次交互,况且还要周期性地写人,这对ZK来说是个不小的负担。在新版本中把偏移量像普通消息一样写入Kafka集群的内部主题。而且正像消息会源源不断地写到集群一样,记录偏移量也是周期性的。Kafka支持高吞吐量的消息写入,对于偏移量的记录当然也不在话下。下面我们会分析两个版本的提交偏移量过程。
3.5.1 消费者提交分区偏移量
消费者提交偏移量是为了保存分区的消费进度。因为Kafka保证同一个分区只会分配给消费组中的唯一消费者,所以即使发生再平衡后,分区和消费者的所有权关系发生变化,新消费者也可以接着上一个消费者记录的偏移盘位置继续消费消息。但是消费者即使记录了分区的偏移量,仍然无法解决消息被重复消费的问题。例如,消费者l每隔10秒提交一次偏移量,在10秒时提交的偏移量是100,下一次提交的日们可点是20秒。在20秒之前,消费者l又消费了30条消息,然后消费者l突然挂掉了。由于偏移量现在仍然停留在10。这个位置,因此新的消费者2也只会从10。这个位置继续消费,从而会重复处理偏移量为100之后的30条消息。通常消息被重复处理是可以接受的,至少不会出现消息丢失这种不可接受的问题。定时提交偏移量的周期时间越长,消息被重复消费的数据量就越多。客户端可以将这个周期时间设置得更短,来减少重复消费的消息量。当然也不能太短,否则会导致客户端和保存偏移量的存储系统产生大量的网络请求。在旧版本中每个分区的偏移量都保存到ZK中,每个分区都要和ZK产生一次交互,况且还要周期性地写人,这对ZK来说是个不小的负担。在新版本中把偏移量像普通消息一样写入Kafka集群的内部主题。而且正像消息会源源不断地写到集群一样,记录偏移量也是周期性的。Kafka支持高吞吐量的消息写入,对于偏移量的记录当然也不在话下。下面我们会分析两个版本的提交偏移量过程。
注意:分区信息主要包含3个变量:队列用来存储拉取到的消息,fetchedOffset用来确定拉取的起始位直,consuf!led.Offset表示消费过的位直,用来记录分区的消费进度。
checkpoi.ntedZkOffsets变量用来决定是否需要将偏移量写入ZK。如果分区分配给消费者,但是消费者并没有消费分区的消息,提交偏移量也没有意义,因为当前的消费进度和IZK中保存的偏移量是一样的。比如消费者已经拉取到最新消息了,在这之后一直没有新消息产生,那么即使定时提交的周期时间到了,因为没有消费新的消息,也不需要提交偏移量。相关代码如下:
数据有写人就有读取,从ZK中读取偏移量需要指定要读取哪个分区。注意,虽然是由消费者对分区的偏移量进行读写操作,但是分区对应的ZK节点并没有消费者信息。ZK节点的路径中只有消费者所属的组:/consufllers/[group_id]/offsets/[topic]/[partition_id]。提交分区偏移量以“消费组”为级别,而不是让每个消费者向己维护分区偏移量。目的是:即使某个消费者挂掉,分区偏移量代表的含义也不会改变,再平衡可以将分区调度给“同一个消费组”的其他消费者。相关代码如下:
消费者提交每个分区的偏移iJ:都需要和ZK通信一次,如果集群中的分区数量成千上万,所有消费者和ZK通信会产生大量的网络请求,这对于本身不是作为存储系统的ZK而言,会造成很大的性能问题。但是,记录每个分区的偏移fiJ:又是必须完成的,那么何不考虑重用现有的Kafka集群来存储偏移量呢?
3.5.2 提交偏移量到内部主题
消费者提交偏移量到Kafka的内部主题,首先要确定连接哪个或者哪些服务端节点。回顾一下,生产者发送消息时会根据分区的主副本分组,和多个节点者rs建立连接;消费者分配多个分区,也要根据分区的主副本分组,和多个节点建立连接。而消费者提交所有分区的偏移量时,实际上只和-个服务端节点建立连接。同样要处理多个分区,为什么普通消息需要多个连接,而偏移量只需要一个连接?如图3-24所示,目标节点指的是分区的主副本节点,我们给出了偏移聋的多种连接方案。
(1)如果不同分区的偏移盐写到了不同的节点,消费者分配了多个分区,当要读取不同分区的偏移盘时,就得连接不同的节点才可以获得完整的数据。
(2)如果能让所有分区的偏移草’数据只保存在一个节点,消费者就只需要同一个节点通信。但因为消费者和分区的关系是变化的,即使保证这一次分区在一个节点上,也无法保证下一次仍然在同一个节点。
(3)如果消费组所有消费者所有分区的偏移量都保存在一个节点,就可以解决第二种方式的问题。
(4)实际上,消费者的分区偏移盏’要保存在哪个节点,跟消费者所属的消费组有关系。只要保证消费组级别的偏移i量在一个节点上,即使消费者和分区的关系发生变化,也能够保证消费者访问新分配的分区时,只需要访问一个节点。
同一个消费组的所有消费者,以内部主题形式提交所有分区的偏移盘到一个目标节点,这个内部主题和普通消息的主题一样也会有多个分区。如果只有一个分区,所有消费组都只能提交到唯一的节点,就又退化到和ZK面临的将所有读写请求都压到一个节点的相同问题。而如果有多个分区,并且以悄费组作为分区的分布条件,不同消费组提交到的偏移量有可能是不同的节点,就可以分散偏移盘读写的压力。
实际上,消费者提交偏移量如果存储在ZK中,也是用消费组级别来表示。存储在ZK巾天生就具有共享存储的优势,所有的消费者只需要连接ZK即可。而以主题方式存储偏移革命时,就得考虑是杏需要连接多个服务端节点。每个消费组只连接一个节点是最好的,这个节点负宽管理一个消费组所有消费者所有分区的偏移量,叫作偏移量管理器(OffsetManager)。和采用ZK方式将偏移蓝数据写到ZK不同,消费者将偏移量数据封装成偏移量提交请求(OffsetCommitRequest)发送给偏移量管理器。就像生产者的生产请求、消费者的拉取请求一样,偏移量提交请求和偏移拉获取请求都是发送给Kafka服务端节点的。相关代码如下:
如图3-25所示,总结r下目前为止客户端需要确定服务端节点的几个场景。
- 生产者发送消息时,直接在客户端决定消息要发送给哪个分区,这一步不向服务端发送请求。
- 消费者拉取管理器的LeaderFinderThread线程向服务端发送主题元数据请求,获取包含了主副本等信息的所有分区元数据,消费者拉取线程才能确定要连接哪些服务端节点。
- 提交偏移茸虽然有点像生产者的发送消息,都是写数据,但也需要和l消费者的LeaderFinderThread一样,获取分区的主副本作为偏移ill:管理器,才能确定提交到哪个节点。
注意:消费者提交普通主题“分区”的消费偏移量,和偏移量存储在内部主题的“分区”,这两个分区概念上相同,但数据是不同的,前者是普通主题的分区,后者是内部主题的分区。服务端把客户端提交的分区偏移量当作消息,消息键由消费组编号、主题、分区编号组成,消息值是分区的偏移量。比如消费者l属于消费纽I(GroupI),它订阅了主题I(Topicl),分配给它的分区消费进度分别是:(P0:100,P1:120,P2:130]。服务祸内部主题有3条消息:(Group1-Topic1-P0,100),(Group1-Topic1-P1,120),(Group1-Topic1-P2,130)。
3.5.3 连接偏移量管理器
前面我们分析的拉取偏移#方法和提交偏移量’方法,都需要和偏移主-管理器通信。在这之前,消费者需要通过channelToOffsetManager()方法向服务端任意一个节点发送“消费组的协调者请求”(GroupCoordinatorRequest),来获取消费组对应的协调节点,即偏移量管理器(OffsetManager)节点。服务端处理消费组的协调者请求,实际上也是通过查询主题的元数据来完成的。不过和LeaderFinderThread中返回主题元数据,然后还要在客户端继续处理(比如获取存在主副本的分区)不同,这里在服务端完成“选择消费组对应内部主题的分区的主副本节点”,然后直接返回这个协调节点给客户端。也就是说客户端发送消费组的协调者请求,服务端返回的就是消费组的协调节点。相关代码如下:
如图3-26所示,消费组l中所有消费者提交的偏移量都应该连接到代理节点l,但是消费组中不同消费者连接的任意代理节点可能一开始并不是代理节点l。不过没关系,这一步只是准备t作,目的是确定目标节点,不管连接哪个节点,当前连接的节点都会告诉你应该连接的正确节点;如果你连得不对,根据返回值再去连接正确的节点。比如,消费者0刚好连接的是代理节点l,可以直接把queryChannel作为offsetChannel;而消费者l和消费者2第一次连接的不是代理节点1,所以在得到结果时应该首先关闭queryChannel,然后重新连接代理节点l作为queryChannel。
任何一个服务端节点处理消费者发送的GroupCoordlnator请求,首先要确定消费组在内部主题的分区,然后,从主题元数据的所有分区元数据中找出指定分区的主副本节点,就是消费组对应的协调者节点。在LeaderFlnderThread中直接返回主题的元数据,是因为无法确定具体的分区,而这里根据消费组可以确定分区,所以直接在服务端返回对应分区的主副本信息。相关代码如下:
确定了消费者要连接的消费组协调节点,也就是偏移量管理器后,消费者才会向该目标节点发送偏移茸的读写请求。在以ZK为存储系统时,消费者针对偏移量的读写都是每个分区单独发起一个请求。在以内部主题形式存储分区的偏移量时,消费者会把它负责的所有分区一次性发送给协调节点。现在我们来看一下服务端对偏移业请求的处理过程。
3.5.4 服务端处理提交偏移量的请求
协调节点会将消费者的偏移量提交请交给GroupCoordlnator类的handleCommitOffsets()方法处理,其中参数offsetMetadata表示分配给消费者的所有分区消费进度。相关代码如下:
写入偏移消息会调用RepllcaManager.appendMessages()方法,将消息集追加到本地日志文件,并且会把分区和对应的偏移量保存在协调节点的缓存中。目的是:再平衡后如果其他消费者需要读取分区的偏移毡,在连接上协调节点后,可以直接读取缓存,而不需要从日志文件中读取。在prepareStoreOffsets()方法内部的putCacheCallback()方法会更新缓存,回调函数的调用只有在主流程即“追加消息到日志文件完成”后才会发生;如果主流程没有完成,回调函数就不会调用。DelayedStore对象包含了需要追加到日志文件的消息集,以及更新缓存的回调方法。相关代码如下:
如图3-27所示,消费者发送提交偏移量和获取偏移虽有1会被j服务端的KafkaAp"i.s处理,服务端处理这两个请求的具体步骤如下。
(1)KafkaApi.s将提交偏移量请求的处理交给消费组的协调者(GroupCoordi.nator)。
(2)消费组的协调者再交给消费组的元数据管理类(GroupMetadataManager)去处理。
(3)延迟的存储对象(DelayedStore)会调用副本管理器的appendMessages()存储消息。
(4)副本管理器将消息追加到底层文件系统的日志文件中,这样分区的偏移量就抒储到服务端了。
(5)分区和l对应的偏移量会在消息存储成功后,被缓存至服务端的消费组元数据管理类。
(6)服务端处理客户端的获取分区偏移盐请求,会首先从缓存中获取。
(7)如果缓存中没有分区的偏移量,就从日志文件中读取。
如图3-28所示,我们用一个示例说明消费者提交偏移量的过程,具体步骤如下。
(1)消费者分配到分区,比如消费者l(Cl)分配到主题(test!)的分区PO和分区Pl。
(2)分区PO的主副本是消息代理节点1(Broker!),分区Pl的主副本是消息代理节点2(Broker2),
消费者创建拉取线程拉取分区消息。
(3)消费者拉取到每个分区的消息后,客户端迭代每条消息,会更新分区信息对象的消费进度。
(4)消费者定时提交分区偏移盏,连接消费组的协调节点,消费组l对应内部主题的P1~nBroker2。
(5)消费者l将向己负责的分区(~PPO和Pl)偏移革A提交到协调节点Broker2上。
内部主题(_consuMer_offsets)和普通主题一样也有多个分区,内部主题的分区方式是消费组编号,即相同消费组编号的分区是一样的。所以如果消费者属于同一个消费组,它们提交和读取分区偏移量都是被同一个协调节点处理的。
在前面的再平衡操作中,消费者分配到分区后,会从ZK中读取偏移盘作为分区信息对象(Pat1tionTop1cinfo)的拉取偏移量(fetchedOffset)和消费偏移量(consuMedOffset)。如果偏移量保存在Kafka中,获取偏移量就不是从ZK中读取了,而是从Kafka的内部主题中读取。但读取Kafka需要读取日志文件,为了加快数据的读取,服务端会将内部主题的分区偏移盘缓存起来。
3.5.5 缓存分区的偏移量
消费者提交自己负责分区的偏移量,除了写入服务端(协调节点)内部主题某个分区的日志文件中,还要把这部分数据保存一份到当前服务端的内存中,这样分区的偏移量保存在了磁盘和内存两个地方。偏移量消息的键由消费组、主题、分区组成(GoupTopicPat1tion),消息的值是分区的偏移盘。查询分区的偏移量时给定GoupTop1cPat1hon,会返回分区对应的偏移盘,即分区当前的消费进度。
由于消费者会周期性地提交偏移量,同一个分区在每次提交时都会产生新的偏移盐。比如分区p。在第一次提交时偏移量为10,在第二次提交时偏移量为20。每次提交偏移盘写入日志文件都采用追加消息的方式。对于写入锺存而言,因为使用Map结构,所以相同分区的偏移量会被覆盖更新。相关代码如下:
缓存的作用是为了方便查询,而且会被重复查询,如果没有重复查询,就没有必要放入缓存。比如,不能把普通的消息内容作为缓存,因为普通消息量很大,而且消费者读取过一次之后一般不会再次读取。
如表3-3所示,服务端有两种作用域类型的缓存:“所有节点共享”“每个节点独享”。如果是共享数据,贝I]向任意一个服务端节点发送请求,都可以获取到一致的状态(比如主题的元数据),它的特点是和业务逻辑的任何组件都无关。如果是节点独享的数据,节点之间数据不一致,要保证读写请求连接的是同一个节点,才能读取到一致的数据。它的特点是和业务逻辑的某个组件有关,比如消费者提交的分区偏移量和消费组有关。
如图3-29所示,偏移盘请求和消费组有关,客户端只能连接指定的节点,所以是协调节点独享的缓存。而主题元数据(TopicMetadata)和消费组的协调者(GroupCoordinator)因为在每个服务端节点保存的数据都一样,可以请求任何一个节点,所以是所有节点共享的缓存。
我们来讨论一个问题:为什么分区偏移量消息的键由“消费组、主题、分区”组成,而分区方式
却只由消费组决定?下面我们来循序渐进地回答这个问题。
首先,要回答消息的键为什么有消费组,而没有消费者。虽然分区是由消费者提交的,但是偏移量消息的任tt不能存在消费者。假设键是GroupConsuJ11erTopicPart1hon,每个消费者提交的偏移量都有向己的标识。比如消费者1提交的偏移盘是G1-C1-T1P0:10,消费者2提交的偏移量是G1-C2-T1P1:20,保存到缓存的数据是[(G1C1T1P0,10),(G1αT1P1,20)]。再平衡后,TlPO分配笋消费者2,在缓存中就不会查询至l]G1CZT1P0的记录;如果TlPl分配给消费者l,也无法查到G1ζ1T1P1的记录。而以消费组存储时缓存的内容是[(G1T1问,10),(G1T1P1,20汀,这样不管是消费者l还是消费者2分配到TIPO,都可以从缓存中读取州TIPO的偏移盘。只要消费组所有消费者都提交了分区的消费进度,再平衡时无论怎么重新分配分区,任何一个消费者都可以查询到任意一个分区的最新消费进度。
另外,必须要有消费组的原因是,不同的消费组可能会订阅同一个主题。如果只有“主题、分区”作为分区偏移£l消息的键,就无法区分不同的消费组。而实际上,不同消费组,即使主题分区相同,它们的分区偏移盐也可能不同,所以偏移量消息的键需要有“消费组”。
其次,因为服务端要保存分区的偏移盐,所以消息值是偏移量,其他信息比如主题、分区都放在消息的健中。所以偏移量消息的键由“消费组、主题、分区”3部分组成。
最后,再来看看为什么分区方式只Fl=I消费组决定的,而不是偏移量消息的键?因为同一个消费组的分区偏移量消息都在同一个协调节点上,为消息进行分区的方式只能是消费组。如果分区方式也是“消费组、主题、分区”,那么只有这3个数据都相同时,内部主题的分区才相同。比如G1T1P0和G1T1P1因为分区不同,内部主题的分区也不同,提交偏移量时就不在同一个协调节点了。而这和前面的“相同消费组的消费者提交偏移量是在同一个协调节点”就发生了矛盾。
标签:消费,消费者,分区,高级,偏移量,低级,API,偏移,节点 来源: https://blog.csdn.net/WANTAWAY314/article/details/115232944