其他分享
首页 > 其他分享> > 物联网Kafka理论+实战+Zookeeper+深入+错误集合

物联网Kafka理论+实战+Zookeeper+深入+错误集合

作者:互联网

物联网Kafka配置 Kafka理论+Zookeeper 深入 错误集合

前言:用于公司物联网项目,kafka作为消息中间件,kafka作为公司新技术,现行使用单点,不用集群。

物联网Kafka配置

zookeeper:用于注册发现kafka,使用版本apache-zookeeper-3.7.0-bin.tar

kafka:使用版本kafka_2.13-2.8.0

eagle:查看和管理zk,kafka,topic等,使用版本kafka-eagle-bin-1.3.7

mysql:管理监控kafka数据,使用版本5.5

#237服务器 环境变量
#JDK_HOME
export JAVA_HOME=/etc/alternatives/java_sdk_1.8.0
export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/project/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#EAGLE_HOME
export KE_HOME=/project/module/eagle
export PATH=$PATH:$KE_HOME/bin
-----------------------------------------------------------------------------------
# zookeeper配置
# zkData 目录配置id
myid=0
# zoo.cfg配置文件
dirData=/project/module/zookeeper/zkData
-----------------------------------------------------------------------------------
# kafka 配置
# kafka 配置命令
集群配置需要在配置kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
#  修改为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"
 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

# kafka config修改端口9849
1、service.properties             		port = 9849    不指定的话,按照默认9092
2、connect-distributed.properties        bootstrap.servers=localhost:9849
3、producer.properties                   bootstrap.servers=localhost:9849
4、connect-standalone.properties         bootstrap.servers=localhost:9849
5、consumer.properties                   bootstrap.servers=localhost:9849
-----------------------------------------------------------------------------------
# eagle配置
# eagle配置的9850端口启动
# zookeeper连接的localhost:2181
# mysql连接的222.85.156.248:9811,使用版本5.5
# 启动http://222.85.156.248:9850/ke
-----------------------------------------------------------------------------------
# mysql配置
# mysql容器启动命令:
# 暴露端口9811,容器卷地址/project/module/kafka/ke-mysql
docker run --name mysql \
    --restart=always \
    -p 9811:3306 \
    -v /project/module/kafka/ke-mysql:/var/lib/mysql \ 
    -e MYSQL_ROOT_PASSWORD=123456 \
    -e TZ=Asia/Shanghai \
    -d mysql:5.5

Kafka错误

Kafka理论

定义

kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

消息队列

使用消息队列的好处

消息队列的两种模式

点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消息生产者生产消息发到kafka的队列(Queue)中,然后消息费从Queue消费消息,消费后消息从Queue出队,消息不可重复消费,Queue支持多个消费者,但只有一个可以消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0P5JgNqF-1624928667226)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624112849946.png)]

发布/订阅模式(一对多,消费者消费数据后,消息不会清除)

消息生产者发布消息到Topic中,同时会有多个消费者订阅,发布到Topic中的消息会被所有订阅者消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c0K5O7Yd-1624928667228)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624112909394.png)]

Kafka基础架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kcl9brKi-1624928667229)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624113111396.png)]

Kafka实战

安装部署

集群规划

机器0 192.168.124.135机器1 192.168.124.136机器2 192.168.124.137
zookeeperzookeeperzookeeper
kafkakafkakafk

zookeeper:用于注册发现kafka,使用版本zookeeper-3.4.10

kafka:使用版本kafka_2.11-0.11.0.0

eagle:查看和管理zk,kafka,topic等,使用版本kafka-eagle-bin-1.3.7

集群部署

Zookeeper基本介绍+部署

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WNx8lJdu-1624928667230)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624154348735.png)]

## Zookeeper特点
# 1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。 
# 2)集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。 
# 3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。 
# 4)更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行。 
# 5)数据更新原子性,一次数据更新要么成功,要么失败。 6)实时性,在一定时间范围内,Client能读到最新数据。


## Zookeeper集群安装
#(1)解压 Zookeeper 安装包到/opt/module/目录下
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

#(2)同步/opt/module/zookeeper-3.4.10 目录内容到其他服务器

#(3)在/opt/module/zookeeper-3.4.10/这个目录下创建 zkData
mkdir -p zkData

#(4)在/opt/module/zookeeper-3.4.10/zkData 目录下创建一个 myid 的文件,添加 myid 文件,注意一定要在 linux 里面创建,在 notepad++里面很可能乱码
touch myid

#(5)编辑 myid 文件
vi myid
0

#(6)拷贝配置好的 zookeeper 到其他机器上,并分别在 其他俩台服务器上修改 myid 文件中内容为 1、2

#(7)重命名/opt/module/zookeeper-3.4.10/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
mv zoo_sample.cfg zoo.cfg

#(8)打开 zoo.cfg 文件
vi zoo.cfg
---------------------------------------------------------------------------------------
	#修改数据存储路径配置
	dataDir=/opt/module/zkData 
    #增加如下配置 换成相应服务器的IP
    #######################cluster##########################
    server.2=192.168.124.135:2888:3888
    server.3=192.168.124.136:2888:3888
    server.4=192.168.124.137:2888:3888
 ---------------------------------------------------------------------------------------
    
#(9)同步 zoo.cfg 配置文件 到其他服务器

#(10)cluster配置参数解读
# A 是一个数字,表示这个是第几号服务器,集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。 
# B 是这个服务器的地址;
# C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
# D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
server.A=B:C:D

#(11)分别启动 Zookeeper
bin/zkServer.sh start

#(12)查看状态
bin/zkServer.sh status

#(13)启动客户端
bin/zkCli.sh
Kafka集群部署
# 1)kafka解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

# 2)修改解压后的文件名称
mv kafka_2.11-0.11.0.0/ kafka

# 3)在/opt/module/kafka 目录下创建 logs 文件夹
mkdir logs

# 4)修改配置文件
vi config/server.properties
--------------------------------------------------------
#broker 的全局唯一编号,不能重复 例如broker.id=1,broker.id=2
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/project/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181
--------------------------------------------------------

# 5)配置环境变量
--------------------------------------------------------
vi /etc/profile

    #KAFKA_HOME
    export KAFKA_HOME=/project/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
--------------------------------------------------------

# 6)其他服务器依次做完1-6步,**broker.id 不得重复**

# 7)依次在三台服务器节点上启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties #-daemon 后台启动
bin/kafka-server-start.sh  config/server.properties
bin/kafka-server-stop.sh  config/server.properties

Kafka Eagle 服务监控部署

# (1)修改 kafka 启动命令 修改 kafka-server-start.sh 命令中
--------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
--------------------------------------------------------
#  修改为
--------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"
 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
--------------------------------------------------------

# (2)修改之后在启动 Kafka 之前,其他服务器也要修改

# (3)上传压缩包 kafka-eagle-bin-1.3.7.tar.gz 到集群/opt/module 目录,解压到本地
tar -zxvf kafka-eagle-bin-1.3.7.tar.gz

# (3)进入刚才解压的目录,将 kafka-eagle-web-1.3.7-bin.tar.gz 解压至/opt/module
tar -zxvf kafka-eagle-web-1.3.7-bin.tar.gz
# (4)修改名称

mv kafka-eagle-web-1.3.7/ eagle
# (5)给启动文件执行权限
chmod 777 ke.sh

# (6)修改配置文件
--------------------------------------------------------
######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver #注意驱动版本,目前连接的是5.5mysql,是用的是5版本驱动
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
--------------------------------------------------------

# (7)添加环境变量
--------------------------------------------------------
vi /etc/profile
    export KE_HOME=/project/module/eagle
    export PATH=$PATH:$KE_HOME/bin
source /etc/profile
--------------------------------------------------------

# (8)启动 注意:启动之前需要先启动 ZK 以及 KAFKA
bin/ke.sh start

Kafka 命令行操作

# 1)查看当前服务器中的所有 topic
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --list

# 2)创建 topic 注释:--topic 定义 topic 名 --replication-factor 定义副本数 --partitions 定义分区数
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --create --replication-factor 3 --partitions 1 --topic first

# 3)删除 topic
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --delete --topic first

# 4)发送消息
bin/kafka-console-producer.sh --broker-list 192.168.124.135:9092 --topic first

# 5)消费消息 注释:--from-beginning:会把主题中以往所有的数据都读取出来,--bootstrap-server:新版本写法
	bin/kafka-console-consumer.sh --zookeeper 192.168.124.135:2181 --topic first
	bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.135:9092 --from-beginning --topic first
	
# 6)查看某个 Topic 的详情
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --describe --topic first

# 7)修改分区数
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --alter --topic first --partitions 6

Kafka架构深入

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4Zk6ejrt-1624928667231)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624150108083.png)]

Topic:是逻辑上的概念,消息是以Topic进行分类的,生产者生产消息,消费者消费消息,都是面向Topic的。

Partition:是物理上的概念,每个Partition对应一个log文件,该log文件中存储的就是Produce生产的数据,数据会追加到log的末尾,每条数据有一个自己的offset

Consumer Group(CG):消费组中的消费者会自己实时记录消费到了哪个offset,用于出错时恢复上次位置继续消费。

Segment:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SMWpxZfG-1624928667231)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624150729555.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vm6BDDF0-1624928667232)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210624152314643.png)]

Kafka 生产者

分区策略

分区的原因
分区的原则

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iS3K7h9N-1624928667232)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210626113937150.png)]

副本数据同步策略
方案优点缺点
半数以上完成同步,就发acks延迟低选举新的 leader 时,容忍 n 台节点的故障,需要 2n+1 个副
全部完成同步,才发送acks选举新的 leader 时,容忍 n 台节点的故障,需要 n+1 个副本延迟高

Kafka 选择了第二种方案,原因如下:

ISR

用于解决某些follower同步比较慢的问题,leader维护了一个动态的in-sync-replica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据同步后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则将follower提出ISR,改时间阈值由replica.lag.time.ms参数决定,如果leader挂了,会从ISR中选举leader。

acks 应答机制

acks参数配置:

故障处理细节

LEO(log end offset):每个副本的最后一个offset。

HW(High Watermark):指的是消费者能见到的最大的offset,ISR 队列中最小的 LEO。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eJ8EK8H4-1624928667233)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210625104832309.png)]

Exactly Once 语义

Kafka消费者

消费方式

分区分配策略

分配时机

在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:

offset的维护

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X1E9Hs6k-1624928667233)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210625140310334.png)]

同一个消费者组中的消费者,同一时刻只能有一个消费者消费,consumer.properties 配置文件中的 group.id分组。

Kafka 高效读写数据

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

零复制技术

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kFTjqOPl-1624928667233)(C:\Users\HIAPAD\AppData\Roaming\Typora\typora-user-images\image-20210625141831997.png)]

Zookeeper Kafka 中的作用

zookeeper会选举一个broker作为conroller,负责管理集群broker的上下线,所有topic的分区副本分配leader选举等工作。

Kafka 事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基

础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务

分配leader选举**等工作。

Kafka 事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基

础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务

上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

标签:实战,bin,zookeeper,eagle,--,Zookeeper,kafka,Kafka
来源: https://blog.csdn.net/mr_zhu_wenxing/article/details/118325001