系统相关
首页 > 系统相关> > linux(CentOS7)上面搭建kafka环境操作流程,方案为docker 和 docker-compose

linux(CentOS7)上面搭建kafka环境操作流程,方案为docker 和 docker-compose

作者:互联网

linux(CentOS7)上面搭建kafka环境操作流程,方案为docker 和 docker-compose

1.基础环境

1.1、yum源配置

使用阿里云安装源

cd /etc/yum.repos.d
rename .repo .repo.bak *.repo
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo

curl -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
yum clean all 
yum makecache 
yum repolist
yum update

安装基本工具

############ 安装基础工具 ############
yum -y install wget telnet bind-utils vim net-tools lrzs

1.2、修改系统设置

修改主机名

注意hostname不能是localhost,且不包含下划线、小数点、大写字母,命令语法为:

sudo hostnamectl set-hostname test

修改系统参数

############ limits 相关 ############
# 扩大句柄数
sed -i 's/4096/1000000/g' /etc/security/limits.d/20-nproc.conf
cat <<EOF>> /etc/security/limits.d/20-all-users.conf
*               soft    nproc          1000000
*               hard    nproc          1000000
*               soft    nofile         1000000
*               hard    nofile         1000000
EOF


############ 内核参数 ############
cat <<'EOF'> /etc/sysctl.d/50-custom.conf 
# 禁用IPV6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
# 最大进程数量
kernel.pid_max = 1048576
# 进程可以拥有的VMA(虚拟内存区域)的数量
vm.max_map_count = 262144
#关闭tcp的连接传输的慢启动,即先休止一段时间,再初始化拥塞窗口。
net.ipv4.tcp_slow_start_after_idle = 0
# 哈希表项最大值,解决 'nf_conntrack: table full, dropping packet.' 问题
net.netfilter.nf_conntrack_max = 2097152
# 在指定之间(秒)内,已经建立的连接如果没有活动,则通过iptables进行清除。
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
EOF


############ 关闭selinux ############
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux
setenforce 0


############ 关闭防火墙 ############
systemctl disable firewalld
systemctl stop firewalld


########## 设置时间同步服务器 ##########
查看我的其他文章

systemctl enable chronyd.service
systemctl restart chronyd.service
systemctl status chronyd.service
#查看时间同步源:
chronyc sources -v

1.3、docker安装

1.Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。

通过 uname -r 命令查看你当前的内核版本

$ uname -r

2.使用 root 权限登录 Centos。确保 yum 包更新到最新。

$ sudo yum update

3.检查系统中是否已经安装了docker

$ ps -ef |grep docker

4.如果显示已安装docker的需要先卸载

$ yum remave docker-*

5.安装yum仓库管理工具

$ yum install -y yum-utils  (该工具包含yum-config-manager)

6.下载阿里的docker-ce仓库

$ cd /etc/yum.repos.d/
$ yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

7.查看选择docker-ce各版本

$ yum list docker-ce --showduplicates | sort -r

8.安装指定版本的docker-ce、docker-ce-cli、containerd.io

$ yum install docker-ce-19.03.5-3.el7 docker-ce-cli-19.03.5-3.el7 containerd.io-1.2.10-3.2.el7

9.关闭防火墙
 
$ systemctl status firewalld 查看防火墙状态   systemctl disable firwalld  关闭防火墙

10.启动docker

$ systemctl start docker

11.设置docker开机启动

$ systemctl enable docker

1.4、docker-compose 安装

第一种直接安装

curl -L https://get.daocloud.io/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
其中版本号可以修改

第二种,离线安装

去github手动下载文件:https://github.com/docker/compose/releases/tag/1.25.0-rc4

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NK6Uyb0M-1627004817264)(http://docs.software.dc/images/markdown/mp-data/mp-data-deploy-doc/https:/img-blog.csdnimg.cn/20200313213255537.png?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FsZW5feGlhb3hpbg==,size_16,color_FFFFFF,t_70)]

将文件上传到/usr/local/bin/ 目录下,重命名为docker-compose,修改文件权限:

$ chmod +x /usr/local/bin/docker-compose
$ docker-compose -v 

1.5、docker配置

配置镜像仓库

在/etc/docker/daemon.json文件(没有请自行创建)

#配置阿里云相关的

$ vi etc/docker/daemon.json  添加如下内容

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": ["https://39kjotb7.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

2、根据docker-compose创建kafka服务

2.1、直接贴出docker-compose文件

kafka依赖zookeeper,我们不搞集群,直接单机可用即可。akhq是一个界面化的管理工具,稍后搭建完毕截图详解

version: '3.8'
services:
  zookeeper:
    image: zookeeper:3.4
    container_name: zookeeper
    restart: always
    ports:
      - 21181:2181
    environment:
      TZ: Asia/Shanghai

  kafka:
    image: wurstmeister/kafka:2.13-2.6.0
    container_name: kafka
    restart: 'always'
    ports:
      - 9092:9092
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      TZ: Asia/Shanghai
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
      
  akhq:
    image: tchiotludo/akhq
    container_name: akhq
    ports:
      - "10080:8080"
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"

我们现在把我们自己测试的展示出来
在这里插入图片描述
通过
http://ip:10080/ui/docker-kafka-server/topic,通过浏览器就可以访问kafka,如下所示
在这里插入图片描述
红框选中部分,多点点试一下,最下方的创建topic按钮,可以创建你自己的topic
在这里插入图片描述
我创建一个demo,然后整个环境就搭建完毕

2.3、问题

在这里插入图片描述
看我截图。如果你想要通过一个springboot工程在properties配置文件中使用kafka,你会发现填写ip:端口不生效,因为通过docker容器化后的kafka在zookeeper下面,暴露出来的ip就会是一个容器ip,现在我们解决一下这个问题。
我遇到的问题
在这里插入图片描述
在这里插入图片描述
百度出来结局方案
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
先介绍原理,后给出解决方案。

3、kafka listeners 和 advertised.listeners 的介绍

3.1、介绍区别

在公司内网部署 kafka 集群只需要用到 listeners,所以一直也不用管 advertised.listeners 是做啥的,刚开始有查过,因为经验不足,始终理解的不够,后来发现在 docker 部署和云服务器上部署,内外网需要作区分时,发挥了它强大的作用。

那么先看看文字类描述:

比如说:

listeners: INSIDE://172.17.0.10:9092,OUTSIDE://172.17.0.10:9094
advertised_listeners: INSIDE://172.17.0.10:9092,OUTSIDE://<公网 ip>:端口
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"

advertised_listeners 监听器会注册在 zookeeper 中;

当我们对 172.17.0.10:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口;

同理,当我们对 <公网 ip>:端口 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 OUTSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 172.17.0.10:9094

总结:advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners

场景

只有内网

比如在公司搭建的 kafka 集群,只有内网中的服务可以用,这种情况下,只需要用 listeners 就行

listeners: <协议名称>://<内网ip>:<端口>

例如:

listeners: SASL_PLAINTEXT://192.168.0.4:9092

内外网

docker 中或者 在类似阿里云主机上部署 kafka 集群,这种情况下是 需要用到 advertised_listeners

docker 为例:

listeners: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
advertised_listeners: INSIDE://localhost:9092,OUTSIDE://<宿主机ip>:<宿主机暴露的端口>
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"

4、解决问题

docker-compoes文件保持不变
除了我个人记录的问题,通过搜索其他的问题也和我遇到的问题本质上是一样的。

记录使用kafka遇到的两个问题:

- 1.Caused by java.nio.channels.UnresolvedAddressException null
- 2.org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t2-0: 30042 ms has passed since batch creation plus linger time

以下为别人的解决方案,经过测试可行

两个报错是同一个问题。

4.1、问题重现

java 生产者向kafka集群生产消息,报错如下:

2018-06-03 00:10:02.071  INFO 80 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.2.0



2018-06-03 00:10:02.071  INFO 80 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 576d93a8dc0cf421



2018-06-03 00:10:32.253 ERROR 80 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='test1' and payload='hello122' to topic t2:



 



org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t2-0: 30042 ms has passed since batch creation plus linger time

4.2、排除找原因

ps -ef | grep java
spring.kafka.bootstrap-servers=39.108.61.252:9092



spring.kafka.consumer.group-id=springboot-group1



spring.kafka.consumer.auto-offset-reset=earliest

cmd进入命令行

telnet 39.108.61.252 9092

spring boot 的设置方式是:
logging.level.root=debug
然后重启应用
发现后台在不停的刷错误
如下:

2018-06-03 00:22:37.703 DEBUG 5972 --- [       t1-0-C-1] org.apache.kafka.clients.NetworkClient   : Initialize connection to node 0 for sending metadata request
2018-06-03 00:22:37.703 DEBUG 5972 --- [       t1-0-C-1] org.apache.kafka.clients.NetworkClient   : Initiating connection to node 0 at izwz9c79fdwp9sb65vpyk3z:9092.

2018-06-03 00:22:37.703 DEBUG 5972 --- [       t1-0-C-1] org.apache.kafka.clients.NetworkClient   : Error connecting to node 0 at izwz9c79fdwp9sb65vpyk3z:9092:

java.io.IOException: Can't resolve address: izwz9c79fdwp9sb65vpyk3z:9092
    at org.apache.kafka.common.network.Selector.connect(Selector.java:182) ~[kafka-clients-0.10.2.0.jar:na
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) [spring-kafka-1.2.2.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ~[kafka-clients-0.10.2.0.jar:na]
    ... 17 common frames omitted

查看日志可知,解析的地址是izwz9c79fdwp9sb65vpyk3z
这个地址是远程服务器的实例名称(阿里云服务器)。自己配置的明明是ip,程序内部却去获取他的别名,那如果生产者所在机器上没有配置这个ip的别名,就不能解析到对应的ip,所以连接失败报错。

4.3、解决

windows则去添加一条host映射

C:\Windows\System32\drivers\etc\hosts

39.108.61.252 izwz9c79fdwp9sb65vpyk3z
127.0.0.1 localhost

vi /etc/hosts 修改方式一致

此时重启生产者应用
日志如下部分,成功启动,后台会一直在心跳检测连接和更新offset,所以debug日志一直在刷,此时就可以把日志级别修改为info

2018-06-03 00:29:46.543 DEBUG 12772 --- [       t2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Group springboot-group1 committed offset 10 for partition t2-0
2018-06-03 00:29:46.543 DEBUG 12772 --- [       t2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Completed auto-commit of offsets {t2-0=OffsetAndMetadata{offset=10, metadata=''}} for group springboot-group1
2018-06-03 00:29:46.563 DEBUG 12772 --- [       t1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Group springboot-group1 committed offset 0 for partition t1-0
2018-06-03 00:29:46.563 DEBUG 12772 --- [       t1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Completed auto-commit of offsets {t1-0=OffsetAndMetadata{offset=0, metadata=''}} for group springboot-group1
2018-06-03 00:29:46.672 DEBUG 12772 --- [       t2-0-C-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [t2-0] to broker izwz9c79fdwp9sb65vpyk3z:9092 (id: 0 rack: null)
2018-06-03 00:29:46.867 DEBUG 12772 --- [       t1-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-06-03 00:29:46.872 DEBUG 12772 --- [       t2-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records

进行生产数据,成功执行!

最后附上还有一种解决方案,不用配置host文件,只需要申请一个外网的域名,通过dns配置,将docker-compose

 kafka:
    image: wurstmeister/kafka:2.13-2.6.0
    container_name: kafka
    restart: 'always'
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ilovechina.henanjiayou.com:9092   
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      TZ: Asia/Shanghai
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper

ilovechina.henanjiayou.com是个人申请的一个外网域名

标签:compose,java,clients,9092,kafka,yum,docker
来源: https://blog.csdn.net/weixin_32555599/article/details/119024342