linux(CentOS7)上面搭建kafka环境操作流程,方案为docker 和 docker-compose
作者:互联网
linux(CentOS7)上面搭建kafka环境操作流程,方案为docker 和 docker-compose
1.基础环境
1.1、yum源配置
使用阿里云安装源
cd /etc/yum.repos.d
rename .repo .repo.bak *.repo
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
curl -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
yum clean all
yum makecache
yum repolist
yum update
安装基本工具
############ 安装基础工具 ############
yum -y install wget telnet bind-utils vim net-tools lrzs
1.2、修改系统设置
修改主机名
注意hostname不能是localhost,且不包含下划线、小数点、大写字母,命令语法为:
sudo hostnamectl set-hostname test
修改系统参数
############ limits 相关 ############
# 扩大句柄数
sed -i 's/4096/1000000/g' /etc/security/limits.d/20-nproc.conf
cat <<EOF>> /etc/security/limits.d/20-all-users.conf
* soft nproc 1000000
* hard nproc 1000000
* soft nofile 1000000
* hard nofile 1000000
EOF
############ 内核参数 ############
cat <<'EOF'> /etc/sysctl.d/50-custom.conf
# 禁用IPV6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
# 最大进程数量
kernel.pid_max = 1048576
# 进程可以拥有的VMA(虚拟内存区域)的数量
vm.max_map_count = 262144
#关闭tcp的连接传输的慢启动,即先休止一段时间,再初始化拥塞窗口。
net.ipv4.tcp_slow_start_after_idle = 0
# 哈希表项最大值,解决 'nf_conntrack: table full, dropping packet.' 问题
net.netfilter.nf_conntrack_max = 2097152
# 在指定之间(秒)内,已经建立的连接如果没有活动,则通过iptables进行清除。
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
EOF
############ 关闭selinux ############
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux
setenforce 0
############ 关闭防火墙 ############
systemctl disable firewalld
systemctl stop firewalld
########## 设置时间同步服务器 ##########
查看我的其他文章
systemctl enable chronyd.service
systemctl restart chronyd.service
systemctl status chronyd.service
#查看时间同步源:
chronyc sources -v
1.3、docker安装
1.Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。
通过 uname -r 命令查看你当前的内核版本
$ uname -r
2.使用 root 权限登录 Centos。确保 yum 包更新到最新。
$ sudo yum update
3.检查系统中是否已经安装了docker
$ ps -ef |grep docker
4.如果显示已安装docker的需要先卸载
$ yum remave docker-*
5.安装yum仓库管理工具
$ yum install -y yum-utils (该工具包含yum-config-manager)
6.下载阿里的docker-ce仓库
$ cd /etc/yum.repos.d/
$ yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
7.查看选择docker-ce各版本
$ yum list docker-ce --showduplicates | sort -r
8.安装指定版本的docker-ce、docker-ce-cli、containerd.io
$ yum install docker-ce-19.03.5-3.el7 docker-ce-cli-19.03.5-3.el7 containerd.io-1.2.10-3.2.el7
9.关闭防火墙
$ systemctl status firewalld 查看防火墙状态 systemctl disable firwalld 关闭防火墙
10.启动docker
$ systemctl start docker
11.设置docker开机启动
$ systemctl enable docker
1.4、docker-compose 安装
第一种直接安装
curl -L https://get.daocloud.io/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
其中版本号可以修改
第二种,离线安装
去github手动下载文件:https://github.com/docker/compose/releases/tag/1.25.0-rc4
将文件上传到/usr/local/bin/ 目录下,重命名为docker-compose,修改文件权限:
$ chmod +x /usr/local/bin/docker-compose
$ docker-compose -v
1.5、docker配置
配置镜像仓库
在/etc/docker/daemon.json文件(没有请自行创建)
#配置阿里云相关的
$ vi etc/docker/daemon.json 添加如下内容
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://39kjotb7.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
2、根据docker-compose创建kafka服务
2.1、直接贴出docker-compose文件
kafka依赖zookeeper,我们不搞集群,直接单机可用即可。akhq是一个界面化的管理工具,稍后搭建完毕截图详解
version: '3.8'
services:
zookeeper:
image: zookeeper:3.4
container_name: zookeeper
restart: always
ports:
- 21181:2181
environment:
TZ: Asia/Shanghai
kafka:
image: wurstmeister/kafka:2.13-2.6.0
container_name: kafka
restart: 'always'
ports:
- 9092:9092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
TZ: Asia/Shanghai
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
akhq:
image: tchiotludo/akhq
container_name: akhq
ports:
- "10080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
我们现在把我们自己测试的展示出来
通过
http://ip:10080/ui/docker-kafka-server/topic,通过浏览器就可以访问kafka,如下所示
红框选中部分,多点点试一下,最下方的创建topic按钮,可以创建你自己的topic
我创建一个demo,然后整个环境就搭建完毕
2.3、问题
看我截图。如果你想要通过一个springboot工程在properties配置文件中使用kafka,你会发现填写ip:端口不生效,因为通过docker容器化后的kafka在zookeeper下面,暴露出来的ip就会是一个容器ip,现在我们解决一下这个问题。
我遇到的问题
百度出来结局方案
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
先介绍原理,后给出解决方案。
3、kafka listeners 和 advertised.listeners 的介绍
3.1、介绍区别
在公司内网部署 kafka
集群只需要用到 listeners
,所以一直也不用管 advertised.listeners
是做啥的,刚开始有查过,因为经验不足,始终理解的不够,后来发现在 docker
部署和云服务器上部署,内外网需要作区分时,发挥了它强大的作用。
那么先看看文字类描述:
listeners
: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka
服务。advertised.listeners
:和listeners
相比多了个advertised
。Advertised
的含义表示宣称的、公布的,就是说这组监听器是Broker
用于对外发布的。
比如说:
listeners: INSIDE://172.17.0.10:9092,OUTSIDE://172.17.0.10:9094
advertised_listeners: INSIDE://172.17.0.10:9092,OUTSIDE://<公网 ip>:端口
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"
advertised_listeners
监听器会注册在 zookeeper
中;
当我们对 172.17.0.10:9092
请求建立连接,kafka
服务器会通过 zookeeper
中注册的监听器,找到 INSIDE
监听器,然后通过 listeners
中找到对应的 通讯 ip
和 端口;
同理,当我们对 <公网 ip>:端口
请求建立连接,kafka
服务器会通过 zookeeper
中注册的监听器,找到 OUTSIDE
监听器,然后通过 listeners
中找到对应的 通讯 ip
和 端口 172.17.0.10:9094
;
总结:advertised_listeners
是对外暴露的服务端口,真正建立连接用的是 listeners
。
场景
只有内网
比如在公司搭建的 kafka
集群,只有内网中的服务可以用,这种情况下,只需要用 listeners
就行
listeners: <协议名称>://<内网ip>:<端口>
例如:
listeners: SASL_PLAINTEXT://192.168.0.4:9092
内外网
在 docker
中或者 在类似阿里云主机上部署 kafka
集群,这种情况下是 需要用到 advertised_listeners
。
以 docker
为例:
listeners: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
advertised_listeners: INSIDE://localhost:9092,OUTSIDE://<宿主机ip>:<宿主机暴露的端口>
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"
4、解决问题
docker-compoes文件保持不变
除了我个人记录的问题,通过搜索其他的问题也和我遇到的问题本质上是一样的。
记录使用kafka遇到的两个问题:
- 1.Caused by java.nio.channels.UnresolvedAddressException null
- 2.org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t2-0: 30042 ms has passed since batch creation plus linger time
以下为别人的解决方案,经过测试可行
两个报错是同一个问题。
4.1、问题重现
java 生产者向kafka集群生产消息,报错如下:
2018-06-03 00:10:02.071 INFO 80 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.2.0
2018-06-03 00:10:02.071 INFO 80 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 576d93a8dc0cf421
2018-06-03 00:10:32.253 ERROR 80 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='test1' and payload='hello122' to topic t2:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t2-0: 30042 ms has passed since batch creation plus linger time
- 明显连接kafka cluster超时,但是实际上并不知道具体是什么原因。
4.2、排除找原因
- 确定kafka集群已经启动,包括zookeeper、kafka集群。可以通过命令
ps -ef | grep java
- 个人排查,kafka集群即zookeeper已经成功启动
- 确定生产者程序一方的kafka配置是否有误。
spring boot的配置如下
spring.kafka.bootstrap-servers=39.108.61.252:9092
spring.kafka.consumer.group-id=springboot-group1
spring.kafka.consumer.auto-offset-reset=earliest
- 自然也没有问题
- 确定kafka集群所在机器防火墙或者说安全组是否已经开放相关端口。
若是在windows上,打开telnet
工具可以查看是否端口开放被监听
cmd进入命令行
telnet 39.108.61.252 9092
- 执行后成功连接,说明问题在程序所在的一方。
- 打开debug日志级别看错误详细信息
spring boot 的设置方式是:
logging.level.root=debug
然后重启应用
发现后台在不停的刷错误
如下:
2018-06-03 00:22:37.703 DEBUG 5972 --- [ t1-0-C-1] org.apache.kafka.clients.NetworkClient : Initialize connection to node 0 for sending metadata request
2018-06-03 00:22:37.703 DEBUG 5972 --- [ t1-0-C-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 0 at izwz9c79fdwp9sb65vpyk3z:9092.
2018-06-03 00:22:37.703 DEBUG 5972 --- [ t1-0-C-1] org.apache.kafka.clients.NetworkClient : Error connecting to node 0 at izwz9c79fdwp9sb65vpyk3z:9092:
java.io.IOException: Can't resolve address: izwz9c79fdwp9sb65vpyk3z:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:182) ~[kafka-clients-0.10.2.0.jar:na
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) [spring-kafka-1.2.2.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ~[kafka-clients-0.10.2.0.jar:na]
... 17 common frames omitted
- 可知建立socket时不能解析kafka所在服务器地址
查看日志可知,解析的地址是izwz9c79fdwp9sb65vpyk3z
这个地址是远程服务器的实例名称(阿里云服务器)。自己配置的明明是ip,程序内部却去获取他的别名,那如果生产者所在机器上没有配置这个ip的别名,就不能解析到对应的ip,所以连接失败报错。
4.3、解决
windows则去添加一条host映射
C:\Windows\System32\drivers\etc\hosts
39.108.61.252 izwz9c79fdwp9sb65vpyk3z
127.0.0.1 localhost
- linux则
vi /etc/hosts 修改方式一致
此时重启生产者应用
日志如下部分,成功启动,后台会一直在心跳检测连接和更新offset,所以debug日志一直在刷,此时就可以把日志级别修改为info
2018-06-03 00:29:46.543 DEBUG 12772 --- [ t2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Group springboot-group1 committed offset 10 for partition t2-0
2018-06-03 00:29:46.543 DEBUG 12772 --- [ t2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Completed auto-commit of offsets {t2-0=OffsetAndMetadata{offset=10, metadata=''}} for group springboot-group1
2018-06-03 00:29:46.563 DEBUG 12772 --- [ t1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Group springboot-group1 committed offset 0 for partition t1-0
2018-06-03 00:29:46.563 DEBUG 12772 --- [ t1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Completed auto-commit of offsets {t1-0=OffsetAndMetadata{offset=0, metadata=''}} for group springboot-group1
2018-06-03 00:29:46.672 DEBUG 12772 --- [ t2-0-C-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [t2-0] to broker izwz9c79fdwp9sb65vpyk3z:9092 (id: 0 rack: null)
2018-06-03 00:29:46.867 DEBUG 12772 --- [ t1-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-06-03 00:29:46.872 DEBUG 12772 --- [ t2-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
进行生产数据,成功执行!
最后附上还有一种解决方案,不用配置host文件,只需要申请一个外网的域名,通过dns配置,将docker-compose
中
kafka:
image: wurstmeister/kafka:2.13-2.6.0
container_name: kafka
restart: 'always'
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ilovechina.henanjiayou.com:9092
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
TZ: Asia/Shanghai
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
ilovechina.henanjiayou.com是个人申请的一个外网域名
标签:compose,java,clients,9092,kafka,yum,docker 来源: https://blog.csdn.net/weixin_32555599/article/details/119024342