其他分享
首页 > 其他分享> > Kafka Broker(二)

Kafka Broker(二)

作者:互联网

1.1、副本基本信息

AR=ISR+OSR ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。 OSR,表示Follower与Leader副本同步时,延迟过多的副本。

1.2、Leader 选举流程

Kafka集群中有一个broker的Controller会被选举为ControllerLeader,负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作。Controller的信息同步工作是依赖于Zookeeper的。

创建一个 topic 4分区4副本

[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin

查看leader

[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin
Topic: yilin    TopicId: EDnOcK_tToG511JOfmeX8w PartitionCount: 4       ReplicationFactor: 4    Configs: segment.bytes=1073741824
        Topic: yilin    Partition: 0    Leader: 3       Replicas: 3,1,0,2       Isr: 3,1,0,2
        Topic: yilin    Partition: 1    Leader: 1       Replicas: 1,0,2,3       Isr: 1,0,2,3
        Topic: yilin    Partition: 2    Leader: 0       Replicas: 0,2,3,1       Isr: 0,2,3,1
        Topic: yilin    Partition: 3    Leader: 2       Replicas: 2,3,1,0       Isr: 2,3,1,0

停掉102上的kafka再次查看ader

分布情况

查看Leader分布情况

[hui@hadoop102 kafka]$ bin/kafka-server-stop.sh 
[hui@hadoop102 kafka]$ jps
3465 Jps
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin
Topic: yilin    TopicId: EDnOcK_tToG511JOfmeX8w PartitionCount: 4       ReplicationFactor: 4    Configs: segment.bytes=1073741824
        Topic: yilin    Partition: 0    Leader: 0       Replicas: 3,1,0,2       Isr: 0,1,2
        Topic: yilin    Partition: 1    Leader: 1       Replicas: 1,0,2,3       Isr: 1,0,2
        Topic: yilin    Partition: 2    Leader: 0       Replicas: 0,2,3,1       Isr: 0,2,1
        Topic: yilin    Partition: 3    Leader: 2       Replicas: 2,3,1,0       Isr: 2,1,0

停止掉hadoop105的kafka进程,并查看Leader分区情况

 停掉105上的kafka再次查看ad 
[hui@hadoop105 kafka]$ bin/kafka-server-stop.sh 
[hui@hadoop105 kafka]$ jps
2024 QuorumPeerMain
3355 Jps
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin
Topic: yilin    TopicId: EDnOcK_tToG511JOfmeX8w PartitionCount: 4       ReplicationFactor: 4    Configs: segment.bytes=1073741824
        Topic: yilin    Partition: 0    Leader: 0       Replicas: 3,1,0,2       Isr: 0,1
        Topic: yilin    Partition: 1    Leader: 1       Replicas: 1,0,2,3       Isr: 1,0
        Topic: yilin    Partition: 2    Leader: 0       Replicas: 0,2,3,1       Isr: 0,1
        Topic: yilin    Partition: 3    Leader: 1       Replicas: 2,3,1,0       Isr: 1,0

启动hadoop102的kafka进程,并查看Leader分区情况 

[hui@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon  ./config/server.properties 
[hui@hadoop102 kafka]$ jps
3879 Jps
3806 Kafka
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin
Topic: yilin    TopicId: EDnOcK_tToG511JOfmeX8w PartitionCount: 4       ReplicationFactor: 4    Configs: segment.bytes=1073741824
        Topic: yilin    Partition: 0    Leader: 0       Replicas: 3,1,0,2       Isr: 0,1,3
        Topic: yilin    Partition: 1    Leader: 1       Replicas: 1,0,2,3       Isr: 1,0,3
        Topic: yilin    Partition: 2    Leader: 0       Replicas: 0,2,3,1       Isr: 0,1,3
        Topic: yilin    Partition: 3    Leader: 1       Replicas: 2,3,1,0       Isr: 1,0,3

启动hadoop105的kafka进程,并查看Leader分区情况

[hui@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon  ./config/server.properties 
[hui@hadoop105 kafka]$ jps
3716 Kafka
2024 QuorumPeerMain
3739 Jps
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic yilin
Topic: yilin    TopicId: EDnOcK_tToG511JOfmeX8w PartitionCount: 4       ReplicationFactor: 4    Configs: segment.bytes=1073741824
        Topic: yilin    Partition: 0    Leader: 3       Replicas: 3,1,0,2       Isr: 0,3,1,2
        Topic: yilin    Partition: 1    Leader: 1       Replicas: 1,0,2,3       Isr: 1,0,3,2
        Topic: yilin    Partition: 2    Leader: 0       Replicas: 0,2,3,1       Isr: 0,1,3,2
        Topic: yilin    Partition: 3    Leader: 1       Replicas: 2,3,1,0       Isr: 1,0,3,2

1.3、Leader和Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。

Follower故障

  1. Follower发生故障后会被临时踢出ISR
  2. 这个期间Leader和Follower继续接收数据
  3. 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
  4. 等该Folower的LEO大于等于该Partition的Hw,即Follower追上Leader之后,就可以重新加入ISR了。

Leader故障

  1. Leader发生故障之后,会从ISR中选出一个新的Leader
  2. 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

1.4、分区副本分配

如果kafka服务器只有4个节点,那么设置kafka的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

创建16分区,3个副本

[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --create --partitions 16 --replication-factor 3 --topic ryy
Created topic ryy.
[hui@hadoop103 kafka]$ 
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic ryy
Topic: ryy      TopicId: lcl-_ytERDa-KreXaEVW9g PartitionCount: 16      ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: ryy      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: ryy      Partition: 1    Leader: 3       Replicas: 3,0,2 Isr: 3,0,2
        Topic: ryy      Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: ryy      Partition: 3    Leader: 0       Replicas: 0,3,1 Isr: 0,3,1
        Topic: ryy      Partition: 4    Leader: 2       Replicas: 2,0,3 Isr: 2,0,3
        Topic: ryy      Partition: 5    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: ryy      Partition: 6    Leader: 1       Replicas: 1,3,0 Isr: 1,3,0
        Topic: ryy      Partition: 7    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: ryy      Partition: 8    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: ryy      Partition: 9    Leader: 3       Replicas: 3,1,0 Isr: 3,1,0
        Topic: ryy      Partition: 10   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: ryy      Partition: 11   Leader: 0       Replicas: 0,2,3 Isr: 0,2,3
        Topic: ryy      Partition: 12   Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: ryy      Partition: 13   Leader: 3       Replicas: 3,0,2 Isr: 3,0,2
        Topic: ryy      Partition: 14   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: ryy      Partition: 15   Leader: 0       Replicas: 0,3,1 Isr: 0,3,1

1.5、手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

 手动调整分区副本存储的步骤如下:

创建一个新的topic,查看分区情况

[hui@hadoop103 kafka]$  bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --create --partitions 4 --replication-factor 2 --topic rwx
Created topic rwx.
[hui@hadoop103 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --describe --topic rwx
Topic: rwx      TopicId: D1qdN2rWSS2iDdYuEOwvDg PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: rwx      Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: rwx      Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: rwx      Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: rwx      Partition: 3    Leader: 1       Replicas: 1,0   Isr: 1,0

创建副本存储计划(所有副本都指定存储在broker0、broker1中) 

[hui@hadoop103 kafka]$ vim increase-replication-factor-0401.json
{
        "version": 1,
        "partitions": [{
                "topic": "rwx",
                "partition": 0,
                "replicas": [0, 1]
        }, {
                "topic": "rwx",
                "partition": 1,
                "replicas": [0, 1]
        }, {
                "topic": "rwx",
                "partition": 2,
                "replicas": [1, 0]
        }, {
                "topic": "rwx",
                "partition": 3,
                "replicas": [1, 0]
        }]
}

执行&验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor-0401.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor-0401.json --verify 

1.6、Leader Partition负载平衡

正常情况下,Kafka本身会自动把LeaderPartition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致LeaderPartition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是followerpartition,读写请求很低,造成集群负载不均衡。

  1. auto.leader.rebalance.enable,默认是true。自动Leader Partition 平衡
  2. eader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
  3. eader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。

1.7、增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

bin/kafka-topics.sh --bootstrap-server hadoop103:9092 --create--partitions 3 --replication-factor 1 --topic four

创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中) 

vim increase-replication-factor.json
{
    "version": 1,
    "partitions": [{
        "topic": "four",
        "partition": 0,
        "replicas": [0, 1, 2]
    }, {
        "topic": "four",
        "partition": 1,
        "replicas": [0, 1, 2]
    }, {
        "topic": "four",
        "partition": 2,
        "replicas": [0, 1, 2]
    }]
}

执行&验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --verify 

2、文件存储

2.1 文件存储机制

opic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

 思考:Topic数据到底存储在什么位置?

[hui@hadoop103 first-2]$ pwd
/opt/module/kafka/datas/first-2
[hui@hadoop103 first-2]$ ll
总用量 20
-rw-r--r-- 1 hui wd 10485760 4月   1 06:22 00000000000000000000.index
-rw-r--r-- 1 hui wd      250 4月   1 07:09 00000000000000000000.log
-rw-r--r-- 1 hui wd 10485756 4月   1 06:22 00000000000000000000.timeindex
-rw-r--r-- 1 hui wd       10 4月   1 06:22 00000000000000000005.snapshot
-rw-r--r-- 1 hui wd        9 4月   1 06:29 leader-epoch-checkpoint
-rw-r--r-- 1 hui wd       43 3月  28 21:33 partition.metadata

查看索引文件

[hui@hadoop103 first-2]$ kafka-run-class.sh  kafka.tools.DumpLogSegments --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 0 position: 0
Mismatches in :/opt/module/kafka/datas/first-2/00000000000000000000.index
  Index offset: 0, log offset: 4

查看日志文件

[hui@hadoop103 first-2]$ kafka-run-class.sh  kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 4 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 5 isTransactional: false isControl: false position: 0 CreateTime: 1648542373044 size: 171 magic: 2 compresscodec: none crc: 131702150 isvalid: true
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 32 isTransactional: false isControl: false position: 171 CreateTime: 1648768184856 size: 79 magic: 2 compresscodec: none crc: 4054655699 isvalid: true

index文件和log文件详解

说明:日志存储参数配置

log.segment.bytes:Kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小,默认值1G。

log.index.interval.bytes 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。稀疏索引。

2.2 文件清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  1. log.retention.hours,最低优先级小时,默认7天。
  2. log.retention.minutes,分钟。
  3. log.retention.ms,最高优先级毫秒。
  4. log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

delete日志删除:将过期数据删除;log.cleanup.policy=delete所有数据启用删除策略
基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。log.retention.bytes,默认等于-1,表示无穷大。

compact日志压缩;compact日志压缩:对于相同key的不同value值,只保留最后一个版本。log.cleanup.policy=compact所有数据启用压缩策略

 

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

3 高效读写数据 

1、Kafka本身是分布式集群,可以采用分区技术,并行度高
2、读数据采用稀疏索引,可以快速定位要消费的数据
3、顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

 

4、页缓存+ 零拷贝技术

标签:--,Isr,Partition,Broker,kafka,Topic,Kafka,Leader
来源: https://www.cnblogs.com/wdh01/p/16085008.html