DM8->KAFKA部署手册
作者:互联网
前段时间在做国产化项目时,写了一篇关于国产数据库达梦实时同步软件的介绍,简单介绍了一下原理和框架。今天正好项目要搭建个DM8到Kafka的同步,自己琢磨了一下,这里把搭建步骤分享出来。
1. DMHS的安装
1.1. 安装
需要在源端和目的端分别安装dmhs软件,如果为HA双机环境,分别在两台服务器上安装dmhs。
1) 将安装包上传至服务器,并授予可执行权限
chmod 777 dmhs_V3.1.3_dm8-kafka_rev82884sp3_rh7_64_veri_20190617.bin |
2) 运行安装程序
./dmhs_V3.1.3_dm8-kafka_rev82884sp3_rh7_64_veri_20190617.bin -i |
3) 输入key文件
[root@localhost dmdba]# ./dmhs_V3.0.2.01_dm7_rock4.2_64\(20151009\). bin -i Extract install files......... Welecom to DMHS Install Program Require Space:100M Whether to input the path of Key File? (Y/y:yes N/n:No)[Y/y]Y Please Input the Path of the Key File [dmhs.key]:/home/dmdba/dmhs.key |
4) 输入安装路径
输入安装路径
Please input the install path [/opt/dmhs]:/home/dmdba/dmhs Available Space :1488M Please Confirm the install path?(Y/y:yes N/n:No)[Y/y]Y |
5) 输入安装参数,[]中的为默认值,直接输入回车选择默认值,该步骤参数也可以随便填写,后期通过配置修改。其中源端和目的端的siteid要设置的不同(如源端使用1,目的端使用2)。
Please input the language (ch/en)[ch]:ch Please input the mgr_port (1000-65535)[5345]: Please input the chk_interval (1-60) second[3]: Please input the ckpt_interval (10-65536) second[60]: Please input the siteid (0-65536):1 Whether to config statistical analysis? (Y/y:Yes N/n:No)[Y/y] Please input the database type (dm7/dm6/oracle)[dm7]: Please input the status collection Interval (0-60)[5]: Please input the database IP:127.0.0.1 Please input the database port (1000-65535)[5236]: Please input the database user name:SYSDBA Please input the database user password:SYSDBA |
1.2. 配置
1.2.1修改源端和目的端dmhs.xml配置文件。
源端配置内容如下:
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
<lang>ch</lang>
<mgr_port>5345</mgr_port>
<chk_interval>3</chk_interval>
<ckpt_interval>60</ckpt_interval>
<siteid>1</siteid>
<version>2.0</version>
</base>
<cpt>
<db_type>dm8</db_type>
<db_server>127.0.0.1</db_server>
<db_user>SYSDBA</db_user>
<db_pwd>SYSDBA</db_pwd>
<db_port>5236</db_port>
<dict_dir>/home/dmdba/dm/dmhs_kafka/dmhs_dict</dict_dir>
<char_code>PG_GB18030</char_code>
<ddl_mask></ddl_mask>
<arch>
<clear_interval>600</clear_interval>
<clear_flag>1</clear_flag>
</arch>
<send>
<ip>10.147.232.159</ip>
<mgr_port>5345</mgr_port>
<data_port>5346</data_port>
<net_pack_size>256</net_pack_size>
<net_turns>0</net_turns>
<crc_check>0</crc_check>
<trigger>0</trigger>
<constraint>0</constraint>
<identity>0</identity>
<filter>
<enable>
<item>*.*</item>
</enable>
<disable/>
</filter>
<map>
</map>
</send>
</cpt>
</dmhs>
目的端配置文件内容如下:
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
<lang>ch</lang>
<mgr_port>5345</mgr_port>
<chk_interval>3</chk_interval>
<ckpt_interval>60</ckpt_interval>
<siteid>2</siteid>
<version>2.0</version>
</base>
<exec>
<recv>
<data_port>5346</data_port>
</recv>
<db_name></db_name>
<exec_thr>1</exec_thr>
<case_sensitive>0</case_sensitive>
<exec_policy>2</exec_policy>
<toggle_case>0</toggle_case>
<commit_policy>1</commit_policy>
<enable_merge>0</enable_merge>
<is_kafka>1</is_kafka>
</exec>
</dmhs>
如果源端和目的端在同一台服务器可以把配置集合到一起,内容如下:
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
<lang>ch</lang>
<mgr_port>5345</mgr_port>
<chk_interval>3</chk_interval>
<ckpt_interval>60</ckpt_interval>
<siteid>1</siteid>
<version>2.0</version>
</base>
<exec>
<recv>
<data_port>5346</data_port>
</recv>
<db_name></db_name>
<exec_thr>1</exec_thr>
<case_sensitive>0</case_sensitive>
<exec_policy>2</exec_policy>
<toggle_case>0</toggle_case>
<commit_policy>1</commit_policy>
<enable_merge>0</enable_merge>
<is_kafka>1</is_kafka>
</exec>
<cpt>
<db_type>dm8</db_type>
<db_server>127.0.0.1</db_server>
<db_user>SYSDBA</db_user>
<db_pwd>SYSDBA</db_pwd>
<db_port>5236</db_port>
<dict_dir>/home/dmdba/dm/dmhs_kafka/dmhs_dict</dict_dir>
<char_code>PG_GB18030</char_code>
<ddl_mask></ddl_mask>
<arch>
<clear_interval>600</clear_interval>
<clear_flag>1</clear_flag>
</arch>
<send>
<ip>10.147.232.159</ip>
<mgr_port>5345</mgr_port>
<data_port>5346</data_port>
<net_pack_size>256</net_pack_size>
<net_turns>0</net_turns>
<crc_check>0</crc_check>
<trigger>0</trigger>
<constraint>0</constraint>
<identity>0</identity>
<filter>
<enable>
<item>*.*</item>
</enable>
<disable/>
</filter>
<map>
</map>
</send>
</cpt>
</dmhs>
1.2.2源端数据库修改配置参数
修改dm.ini中参数RLOG_APPEND_LOGIC=1
开启归档日志ARCH_INI=1,(一般默认未开启)增加归档配置文件dmarch.ini:
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /dbarch/dmarch
ARCH_FILE_SIZE = 512
ARCH_SPACE_LIMIT = 300000
1.2.3在源端数据库建立数据库对象
为了同步DDL操作和维护源端字典信息,使用SYSDBA用户建立下面的数据库对象。该脚本以相应版本安装目录中./scripts/ddl_sql_dm8.sql为准。
1.3. 目的端KAFKA相关设置
- 查看当前有哪些topic
[dmdba@master kafka_2.11-1.1.0]$ bin/kafka-topics.sh --list --zookeeper localhos
t:2181
- 创建一个topic
[dmdba@master kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper local
host:2181 --replication-factor 1 --partitions 1 --topic DMHS
[dmdba@master kafka_2.11-1.1.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
DMHS
- 查看消费者的界面是否有消息推送过来
[dmdba@master kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-ser
ver 10.147.232.159:9092 --topic DMHS --from-beginning
注:以上红色部分为创建的topic话题,需要和后期启动脚本中的话题名称一致,区分大小写。
2. 启动同步服务
如果源端和目的端在同一服务器只需启动一个进程即可,在两台服务器需要分别启动。本篇以在同一台服务器举例。
2.1. 启动DMHS服务
2.1.1. 启动脚本
- 启动之前将kafka目录下的libs目录的文件,全部copy 至dmhs软件bin目录下
创建DMHS服务启动脚本名为start_dmhs_kafka.sh,内容如下:
#!/bin/sh
export.GBK
export LD_LIBRARY_PATH=/home/dmdba/dm/dmdbms/bin:/home/dmdba/dm/dmhs_kafka/bin
java -Djava.ext.dirs="/home/dmdba/dm/dmhs_kafka/bin" com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService /home/dmdba/dm/dmhs_kafka/bin/dmhs.hs 10.147.232.159:9092 DMHS
说明:
① "/home/dmdba/dm/dmhs_kafka/bin"为kafka的libs目录以及DMHS 辅助jar包路径。
② com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService 为DMHS实现kafka同步服务的类名。
③ /home/dmdba/dm/dmhs_kafka/bin/dmhs.hs为DMHS配置文件dmhs.xml。
④ 10.147.232.159:9092为kafka集群节点。
⑤ DMHS 为topic话题。topic可选,如果不填写topic,则每张表会建一个topic,名称即为表名。指定topic的话所有的表都填写在一个topic中,该topic话题必须和KAFKA话题一致。
以上各项路径参数根据实际环境进行修改。建议使用jdk1.7以上版本。
2.1.2. 启动服务
进入启动脚本目录,使用dmdba用户执行./start_dmhs_kafka.sh后,DMHS执行服务自动启动并监听相应数据端口。示例如下:
图中出现的错误:库文件libexec_ins_ora.so未找到,不需要管
启动的时候如果有个报错,库文件 libdmhs_exec_dll.so 未找到,原因是有依赖问题,ldd一下把依赖解决就好。
处理过程中遇到缺少libodbc.so.1的动态库需要安装一下unixodbc。操作如下:
1) 安装 UNIXODBC-2.3.0 的方法。
将压缩包上传到 linux /usr/local 下,然后执行 tar -xzf unixODBC-2.3.0.tar.gz ./configure make && make install 通过 odbc_config --version 确定odbc安装是否成功 |
2) 拷贝odbc动态库。
进入/usr/local/lib/下执行cp libodbc.so.2 到dmhs/bin下libodbc.so.1
然后另起一个终端窗口,进入DMHS目录执行./dmhs_console程序连接DMHS服务,进行字典装载操作后,启动捕获服务:
COPY 0 “SCH.NAME=’DMHS’” DICT
START CPT
至此,完成DMHS同步服务启动。
2.1.3. 验证DMHS同步至kafka
- 开启一个消费者界面,目前新版本一个对自动对应一个topic,这里我观察DMHS的推送情况,所以topic就指定DMHS
./kafka-console-consumer.sh --bootstrap-server 10.147.232.159:9092 --topic DMHS--from-beginning
- 在DM数据库上相关操作
- 查看消费者端,会有类似以下消息推送过来。看到有数据推送过来,说明DMHS可以推送消息至KAFKA端。
标签:dmdba,KAFKA,dmhs,--,kafka,topic,DMHS,DM8,手册 来源: https://blog.51cto.com/15092136/2692663