数据库
首页 > 数据库> > canal同步mysql实战

canal同步mysql实战

作者:互联网

环境

mysql 5.6.41

canal 1.15

1.16测试过后,一直报错canal_config表不存在,更换版本后正常

目的 : 同步一个数据库中的二个表

1、创建表

CREATE TABLE `user01` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

CREATE TABLE `user02` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

 

 2下载安装包

服务端

https://github.com/alibaba/canal/releases

下载deploy的server包,还有adapter的client包

 

 直接解压,我的配置文件如下

 

最重要的是

canal.instance.master.address=172.20.70.34:3308

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

canal.instance.filter.regex=hx_erp\\.user01

sh bin/startup.sh启动

其中看服务器内存资源,我这里是修改了XMS和xmx,xmn为256,减少内存消耗

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

3客户端

 

[root@master canaladapter]# cat conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:
defaultDS:
url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
username: xxx
password: xxxx
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
jdbc.username: xxx
jdbc.password: xxx

 

其中最重要的是src和outerAdapters源和目标数据库的url账号密码

注意点:

  1. 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
  2. adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件

RDB文件

[root@master canaladapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: hx_erp
table: user01
targetTable: user02
targetPk:
id: id
mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小

 

从user01表同步到user02表

 

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

同样修改启动脚本

echo CLASSPATH :$CLASSPATH
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>/dev/null 2>&1 &
echo $! > $base/bin/adapter.pid

echo "cd to $current_path for continue"
cd $current_path
[root@master canaladapter]# ./bin/startup.sh

 

 同事存在客户端和客户端

查看服务端日志

 

 这样就是正常的

客户端日志

 

 

现在来验证下,想user01表插入数据

 

 

查看客户端日志

 

 这样就是成功的。

 

现在我们测试下,如果服务端挂了,期间user01有DDL,服务端启动以后,user02是否会同步

停止服务

插入数据

 

 

 

 

启动服务

[root@master canalserver]# ./bin/startup.sh

结果是没有自动同步过去

客户端还在报错

重启客户端

[root@master canaladapter]# ./bin/restart.sh

 

 

 

 

现在可以了,自动同步OK了

这就表示,即使服务端因为异常挂掉了,重启服务端和客户端也可以自动同步

canal也可以同步restfulapi的方式进行管理

查询所有同步实例状态

[root@master canalserver]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}][root@master canalserver]#

 

curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

查询单个实例同步状态

curl http://127.0.0.1:8081/syncSwitch/example

查询key为mysql1的rdb实例的数据量

[root@master canalserver]# curl http://127.0.0.1:8081/count/rdb/mysql1/mytest_user.yml
{"count":3,"targetTable":"`user02`"}[root@master canalserver]

手动全量同步

[root@master canalserver]# curl http://127.0.0.1:8081/etl/rdb/mysql1/mytest_user.yml -X POST
{"succeeded":true,"resultMessage":"导入RDB 数据:4 条"}[root@master canalserver]#

如果出现上述canalserver服务端挂掉了,也可以手动全量同步来处理一遍

 

标签:canal,实战,同步,kafka,XX,master,mysql,root
来源: https://www.cnblogs.com/whitelittle/p/16598857.html