其他分享
首页 > 其他分享> > Kafka 消息列队

Kafka 消息列队

作者:互联网

1. kafka(卡夫卡)

2. 消息传送机制

3. kafka 优点:

4. kafka 集群

实验环境:

1) 首先部署 zookeeper 集群

# 修改 zookeeper 配置文件

# 常见数据目录

# 指定节点标识 (1、2、3、4…………)

# 启动服务

# 查看 zookeeper 状态

2)部署 kafka 集群

# 修改配置文件

# 部署另外两台 kafka

# 修改 另外两台 kafka 配置文件

# 启动 kafka服务

# 创建 topic 测试

1. kafka(卡夫卡)

​ Kafka 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java 编写。

​ kafka 是一个高吞吐的分布式发布订阅消息系统,它可以处理消费者在网络中的所有动作流数据。

kafka server : 消息系统的中间件,接收生产者产生的的消息,接收消费者的订阅,将消费交付给消费者处理。

producer 生产者

consumer 消费者,多个消费者形成一个组

partition 分区,排列数据, 通过offset 偏移量排列, offset 对区分数据进行记录,存在消息的载体 相当于 queue

offset 偏移量 ,对分区的数据进行记录

zookeeper 相当于 exchange

vhost=topic 主题,对小心进行分类,一个类型一个主题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uimr7l1p-1583405061972)(C:\Users\Administrator\AppData\Local\YNote\data\weixinobU7VjmoDcK6mV43CyWcqdjkLo5g\433ec0fbc6b84d0c8416f47a77543d38}$e0novktaly1cqei]n97f9.png)]

zookeeper:键值对的 用来存放meta信息(原始数据,最底层的数据)还有watch发现机制

​ 1.broken node registry :borken注册节点,会生成znode的临时节点保存信息

​ 2.broken topic registry: 当一个zookeeper启动时会注册自己持有的topic和partition的信息

​ 3.consumer and consumer group : 主要是用来做负载均衡

​ 4.consumer id registry: 每个consumer都有唯一的id号,用来标记消费者的信息

​ 5.consumer offset tracking 用来跟踪每个concumer消费的最大的offset

​ 6.partition owner registry: 用来标记partition被那个consumer所消费

kafka 原理:

https://zhuanlan.zhihu.com/p/68052232

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P4kGxwNx-1583405061977)(C:\Users\Administrator\AppData\Local\YNote\data\weixinobU7VjmoDcK6mV43CyWcqdjkLo5g\2a82daa6f26f4d25865f4ae754e8af1d\clipboard.png)]

2. 消息传送机制

1) at most once 消息最多发送一次,无论成败,不再发送

2) at least once 消息最少发送一次

3. kafka 优点:

1) 保证消息的先来后到,凡是进来的消息,立马打上标签(例如:1、2、3、4……)

2) 当消息被消费,数据丢失

3) 分布式

4) 容量比较大, kafka 容量取决于硬盘的大小

5) 数量的大小不会影响到kafka的性能

4. kafka 集群

leader: 真正用于工作的

follower: 是用于复制 leader 的信息,做备份的

实验环境:

server 1: 1.1.1.101/8

server 2: 1.1.1.102/8

server 3: 1.1.1.103/8

1) 首先部署 zookeeper 集群

kafka  依赖于 zookeeper 来保存集群的数据信息,从而来保证系统的可用性

​ 三台服务器,都需要拥有 java环境

[root@localhost ~]# yum -y install java
[root@localhost ~]# java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
[root@localhost ~]# tar -zxf zookeeper-3.3.6.tar.gz -C /usr/src/
[root@localhost ~]# mv /usr/src/zookeeper-3.3.6/ /usr/local/zookeeper
[root@localhost ~]# cd /usr/local/zookeeper/conf/
# 因程序不识别 zoo_sample.cfg 文件,所以需要改名
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
[root@localhost ~]# vim /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000				# 节点之间发送心跳包的时间, 单位 毫秒
initLimit=10				# 新加入节点初始化的时间,   单位 个   10*2000
syncLimit=5					# 节点连接的超时等待时间

dataDir=/usr/local/zookeeper/data			# 指定保存数据的目录
dataLogDir=/usr/local/zookeeper/datalog		# 指定数据日志的目录
# the port at which the clients will connect	
clientPort=2181								   # 对外提供服务器的端口
server.1=1.1.1.101:2888:3888					
server.2=1.1.1.102:2888:3888
server.3=1.1.1.103:2888:3888
	# 节点名 = 节点IP地址:节点通信的端口:节点之间选取leader的端口

[root@localhost ~]# mkdir /usr/local/zookeeper/data 
[root@localhost ~]# mkdir /usr/local/zookeeper/datalog
[root@localhost ~]# echo 1 > /usr/local/zookeeper/data/myid			# 指定节点标识 [root@localhost ~]# cat /usr/local/zookeeper/data/myid 		 
1
[root@localhost ~]# scp -r /usr/local/zookeeper/ root@1.1.1.102:/usr/local/ 
[root@localhost ~]# scp -r /usr/local/zookeeper/ root@1.1.1.103:/usr/local/ 

# 主机 2
[root@two ~]# echo 2 > /usr/local/zookeeper/data/myid 
[root@two ~]# cat /usr/local/zookeeper/data/myid 
2

# 主机 3
[root@three ~]# echo 3 > /usr/local/zookeeper/data/myid 
[root@three ~]# cat /usr/local/zookeeper/data/myid 
3
# 创建软连接,使 启动命令可以在全局中使用
[root@localhost ~]# ln -s /usr/local/zookeeper/bin/* /usr/bin/
[root@localhost ~]# zkServer.sh start         
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

zookeeper 的状态

# 只有三台 服务器,全部启动 zookeeper服务,才可以查看状态
[root@localhost zookeeper]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@two ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader


[root@three ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

2) 部署 kafka 集群

[root@localhost ~]# tar -zxf kafka_2.11-1.0.1.tgz -C /usr/src/ [root@localhost ~]# mv /usr/src/kafka_2.11-1.0.1/ /usr/local/kafka
[root@localhost ~]# vim /usr/local/kafka/config/server.properties 
21 broker.id=1			# 本机 节点标识
31 listeners=PLAINTEXT://1.1.1.101:9092			# 监听 本地 ip与 端口
60 log.dirs=/usr/local/kafka/data					# 指定日志文件的目录
103 log.retention.hours=168						
104 message.max.byte=1024000					# 消息最大字节
105 default.replication.factor=2
106 replica.fetch.max.bytes=102400
126 zookeeper.connect=1.1.1.101:2181,1.1.1.102:2181,1.1.1.103:2181	#	 指定 zookeeper群集内的节点服务器
107 num.rtition=1

# 创建 日志文件的目录
[root@localhost ~]# mkdir /usr/local/kafka/data
[root@localhost ~]# scp -r /usr/local/kafka/ root@1.1.1.102:/usr/local/ [root@localhost ~]# scp -r /usr/local/kafka/ root@1.1.1.103:/usr/local/
[root@two ~]# vim /usr/local/kafka/config/server.properties 

broker.id=2
listeners=PLAINTEXT://1.1.1.102:9092

-------------------------------------------------------

[root@three ~]# vim /usr/local/kafka/config/server.properties 
broker.id=2
listeners=PLAINTEXT://1.1.1.102:9092
[root@localhost ~]# cd /usr/local/kafka/bin
    [root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@localhost bin]# netstat -anpt | grep 9092


[root@two bin]# cd /usr/local/kafka/bin/
[root@two bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@two bin]# netstat -anpt | grep 9092


[root@three ~]# cd /usr/local/kafka/bin/
[root@three bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@three bin]# netstat -anpt | grep 9092
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 1.1.1.101:2181 --partitions 1 --replication-factor 2 -topic logs
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-11 19:02:30,208] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "logs".

[root@localhost bin]# ./kafka-topics.sh --list --zookeeper 1.1.1.101:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-11 19:02:42,645] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
logs

​ 参数含义:


# create 				创建
# zookeeper  			使用哪一个 zookeeper
# -- partition  		创建分区的个数
# -- replication-factor 2   	指定分区别分的个数
# --toplic 				主题名称
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 1.1.1.102:9092 --topic logs

# --broker-list 1.1.1.102:9092   	推送到那个节点上
[root@two bin]# ./kafka-console-consumer.sh --zookeeper 1.1.1.101:2181 --topic logs --from-beginning

# --zookeeper 1.1.1.101:2181

#  --from-beginning		从哪里读消息,从开始读消息

–broker-list 1.1.1.102:9092 推送到那个节点上




- 模拟消费者

```shell
[root@two bin]# ./kafka-console-consumer.sh --zookeeper 1.1.1.101:2181 --topic logs --from-beginning

# --zookeeper 1.1.1.101:2181

#  --from-beginning		从哪里读消息,从开始读消息

标签:local,zookeeper,kafka,消息,列队,Kafka,root,localhost,usr
来源: https://blog.csdn.net/RunzIyy/article/details/104681054