Kafka 消息列队
作者:互联网
1. kafka(卡夫卡)
Kafka 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java 编写。
kafka 是一个高吞吐的分布式发布订阅消息系统,它可以处理消费者在网络中的所有动作流数据。
- kafka 组件
kafka server : 消息系统的中间件,接收生产者产生的的消息,接收消费者的订阅,将消费交付给消费者处理。
producer 生产者
consumer 消费者,多个消费者形成一个组
partition 分区,排列数据, 通过offset 偏移量排列, offset 对区分数据进行记录,存在消息的载体 相当于 queue
offset 偏移量 ,对分区的数据进行记录
zookeeper 相当于 exchange
vhost=topic 主题,对小心进行分类,一个类型一个主题
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uimr7l1p-1583405061972)(C:\Users\Administrator\AppData\Local\YNote\data\weixinobU7VjmoDcK6mV43CyWcqdjkLo5g\433ec0fbc6b84d0c8416f47a77543d38}$e0novktaly1cqei]n97f9.png)]
zookeeper:键值对的 用来存放meta信息(原始数据,最底层的数据)还有watch发现机制
1.broken node registry :borken注册节点,会生成znode的临时节点保存信息
2.broken topic registry: 当一个zookeeper启动时会注册自己持有的topic和partition的信息
3.consumer and consumer group : 主要是用来做负载均衡
4.consumer id registry: 每个consumer都有唯一的id号,用来标记消费者的信息
5.consumer offset tracking 用来跟踪每个concumer消费的最大的offset
6.partition owner registry: 用来标记partition被那个consumer所消费
kafka 原理:
https://zhuanlan.zhihu.com/p/68052232
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P4kGxwNx-1583405061977)(C:\Users\Administrator\AppData\Local\YNote\data\weixinobU7VjmoDcK6mV43CyWcqdjkLo5g\2a82daa6f26f4d25865f4ae754e8af1d\clipboard.png)]
2. 消息传送机制
1) at most once 消息最多发送一次,无论成败,不再发送
2) at least once 消息最少发送一次
3. kafka 优点:
1) 保证消息的先来后到,凡是进来的消息,立马打上标签(例如:1、2、3、4……)
2) 当消息被消费,数据丢失
3) 分布式
4) 容量比较大, kafka 容量取决于硬盘的大小
5) 数量的大小不会影响到kafka的性能
4. kafka 集群
leader: 真正用于工作的
follower: 是用于复制 leader 的信息,做备份的
实验环境:
server 1: 1.1.1.101/8
server 2: 1.1.1.102/8
server 3: 1.1.1.103/8
1) 首先部署 zookeeper 集群
kafka 依赖于 zookeeper 来保存集群的数据信息,从而来保证系统的可用性
三台服务器,都需要拥有 java环境
- 部署Java 环境,zookeeper 依赖于 java 环境
- 也可以采用 源码安装 java, 详情参考: https://blog.csdn.net/RunzIyy/article/details/104569137
[root@localhost ~]# yum -y install java
[root@localhost ~]# java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
- 安装 zookeeper
[root@localhost ~]# tar -zxf zookeeper-3.3.6.tar.gz -C /usr/src/
[root@localhost ~]# mv /usr/src/zookeeper-3.3.6/ /usr/local/zookeeper
[root@localhost ~]# cd /usr/local/zookeeper/conf/
# 因程序不识别 zoo_sample.cfg 文件,所以需要改名
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
- 修改 zookeeper 配置文件
[root@localhost ~]# vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000 # 节点之间发送心跳包的时间, 单位 毫秒
initLimit=10 # 新加入节点初始化的时间, 单位 个 10*2000
syncLimit=5 # 节点连接的超时等待时间
dataDir=/usr/local/zookeeper/data # 指定保存数据的目录
dataLogDir=/usr/local/zookeeper/datalog # 指定数据日志的目录
# the port at which the clients will connect
clientPort=2181 # 对外提供服务器的端口
server.1=1.1.1.101:2888:3888
server.2=1.1.1.102:2888:3888
server.3=1.1.1.103:2888:3888
# 节点名 = 节点IP地址:节点通信的端口:节点之间选取leader的端口
- 创建数据存储目录
- 因配置文件改动,指定了数据 存放位置以及 日志文件的存放路径,所以需要手动创建
[root@localhost ~]# mkdir /usr/local/zookeeper/data
[root@localhost ~]# mkdir /usr/local/zookeeper/datalog
- 指定节点标识 (1、2、3、4…………)
- zk集群中的节点需要获取myid 文件内容来标识该节点,否则无法启动
- 该标识类似于人的 身份证号
[root@localhost ~]# echo 1 > /usr/local/zookeeper/data/myid # 指定节点标识 [root@localhost ~]# cat /usr/local/zookeeper/data/myid
1
- 为了方便,将本机程序传输过去,但是需要更改 另外 两台的 节点标识
[root@localhost ~]# scp -r /usr/local/zookeeper/ root@1.1.1.102:/usr/local/
[root@localhost ~]# scp -r /usr/local/zookeeper/ root@1.1.1.103:/usr/local/
# 主机 2
[root@two ~]# echo 2 > /usr/local/zookeeper/data/myid
[root@two ~]# cat /usr/local/zookeeper/data/myid
2
# 主机 3
[root@three ~]# echo 3 > /usr/local/zookeeper/data/myid
[root@three ~]# cat /usr/local/zookeeper/data/myid
3
- 启动 zookeeper 服务
- 完成群集后,需要启动服务, 三台服务器要先后启动
- start 启动 restart 重新启动 stop 停止 status 查看状态
# 创建软连接,使 启动命令可以在全局中使用
[root@localhost ~]# ln -s /usr/local/zookeeper/bin/* /usr/bin/
[root@localhost ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 查看 zookeeper 状态
- 只有全部,启动后,查看zookeeper状态,才会是成功的
- 当三台服务器,全部启动后,他们之间会自动选举出一个 loader ,其余的为 foolower(生成是随机的)
zookeeper 的状态
- leader 真正用于工作的。
- follower 是用于复制 leader 的信息,做备份的
# 只有三台 服务器,全部启动 zookeeper服务,才可以查看状态
[root@localhost zookeeper]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@two ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@three ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
2) 部署 kafka 集群
- 首先解压 kafka 源码包,同样不需要编译安装
[root@localhost ~]# tar -zxf kafka_2.11-1.0.1.tgz -C /usr/src/ [root@localhost ~]# mv /usr/src/kafka_2.11-1.0.1/ /usr/local/kafka
- 修改配置文件
[root@localhost ~]# vim /usr/local/kafka/config/server.properties
21 broker.id=1 # 本机 节点标识
31 listeners=PLAINTEXT://1.1.1.101:9092 # 监听 本地 ip与 端口
60 log.dirs=/usr/local/kafka/data # 指定日志文件的目录
103 log.retention.hours=168
104 message.max.byte=1024000 # 消息最大字节
105 default.replication.factor=2
106 replica.fetch.max.bytes=102400
126 zookeeper.connect=1.1.1.101:2181,1.1.1.102:2181,1.1.1.103:2181 # 指定 zookeeper群集内的节点服务器
107 num.rtition=1
# 创建 日志文件的目录
[root@localhost ~]# mkdir /usr/local/kafka/data
- 部署另外两台 kafka
- 使用 scp 命令,将配置文件传输过去,修改配置文件
[root@localhost ~]# scp -r /usr/local/kafka/ root@1.1.1.102:/usr/local/ [root@localhost ~]# scp -r /usr/local/kafka/ root@1.1.1.103:/usr/local/
- 修改 另外两台 kafka 配置文件
- 只需要修改,本机的 标识与 监听的IP地址
[root@two ~]# vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://1.1.1.102:9092
-------------------------------------------------------
[root@three ~]# vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://1.1.1.102:9092
- 启动 kafka服务
- 分别启动三台服务器
[root@localhost ~]# cd /usr/local/kafka/bin
[root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@localhost bin]# netstat -anpt | grep 9092
[root@two bin]# cd /usr/local/kafka/bin/
[root@two bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@two bin]# netstat -anpt | grep 9092
[root@three ~]# cd /usr/local/kafka/bin/
[root@three bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@three bin]# netstat -anpt | grep 9092
- 创建 topic 测试
- 使用该命令可以创建一个消息队列
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 1.1.1.101:2181 --partitions 1 --replication-factor 2 -topic logs
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-11 19:02:30,208] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "logs".
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 1.1.1.101:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-11 19:02:42,645] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
logs
参数含义:
# create 创建
# zookeeper 使用哪一个 zookeeper
# -- partition 创建分区的个数
# -- replication-factor 2 指定分区别分的个数
# --toplic 主题名称
- 模式生产者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 1.1.1.102:9092 --topic logs
# --broker-list 1.1.1.102:9092 推送到那个节点上
- 模拟消费者
[root@two bin]# ./kafka-console-consumer.sh --zookeeper 1.1.1.101:2181 --topic logs --from-beginning
# --zookeeper 1.1.1.101:2181
# --from-beginning 从哪里读消息,从开始读消息
- 这个时候在 生产者与消费者都会进入阻塞状态
@localhost bin]# ./kafka-console-producer.sh --broker-list 1.1.1.102:9092 --topic logs
–broker-list 1.1.1.102:9092 推送到那个节点上
- 模拟消费者
```shell
[root@two bin]# ./kafka-console-consumer.sh --zookeeper 1.1.1.101:2181 --topic logs --from-beginning
# --zookeeper 1.1.1.101:2181
# --from-beginning 从哪里读消息,从开始读消息
- 这个时候在 生产者与消费者都会进入阻塞状态
- 在 生产者 输入任何内容,都会显示在 消费者屏幕中
标签:local,zookeeper,kafka,消息,列队,Kafka,root,localhost,usr 来源: https://blog.csdn.net/RunzIyy/article/details/104681054