canal同步mysql实战
作者:互联网
环境
mysql 5.6.41
canal 1.15
1.16测试过后,一直报错canal_config表不存在,更换版本后正常
目的 : 同步一个数据库中的二个表
1、创建表
CREATE TABLE `user01` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
CREATE TABLE `user02` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
2下载安装包
服务端
https://github.com/alibaba/canal/releases
下载deploy的server包,还有adapter的client包
直接解压,我的配置文件如下
最重要的是
canal.instance.master.address=172.20.70.34:3308
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=hx_erp\\.user01
sh bin/startup.sh启动
其中看服务器内存资源,我这里是修改了XMS和xmx,xmn为256,减少内存消耗
JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
3客户端
[root@master canaladapter]# cat conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
username: xxx
password: xxxx
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
jdbc.username: xxx
jdbc.password: xxx
其中最重要的是src和outerAdapters源和目标数据库的url账号密码
注意点:
- 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
- adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
RDB文件
[root@master canaladapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: hx_erp
table: user01
targetTable: user02
targetPk:
id: id
mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
从user01表同步到user02表
JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
同样修改启动脚本
echo CLASSPATH :$CLASSPATH
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>/dev/null 2>&1 &
echo $! > $base/bin/adapter.pid
echo "cd to $current_path for continue"
cd $current_path
[root@master canaladapter]# ./bin/startup.sh
同事存在客户端和客户端
查看服务端日志
这样就是正常的
客户端日志
现在来验证下,想user01表插入数据
查看客户端日志
这样就是成功的。
现在我们测试下,如果服务端挂了,期间user01有DDL,服务端启动以后,user02是否会同步
停止服务
插入数据
启动服务
[root@master canalserver]# ./bin/startup.sh
结果是没有自动同步过去
客户端还在报错
重启客户端
[root@master canaladapter]# ./bin/restart.sh
现在可以了,自动同步OK了
这就表示,即使服务端因为异常挂掉了,重启服务端和客户端也可以自动同步
canal也可以同步restfulapi的方式进行管理
查询所有同步实例状态
[root@master canalserver]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}][root@master canalserver]#
curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启
注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关
查询单个实例同步状态
curl http://127.0.0.1:8081/syncSwitch/example
查询key为mysql1的rdb实例的数据量
[root@master canalserver]# curl http://127.0.0.1:8081/count/rdb/mysql1/mytest_user.yml
{"count":3,"targetTable":"`user02`"}[root@master canalserver]
手动全量同步
[root@master canalserver]# curl http://127.0.0.1:8081/etl/rdb/mysql1/mytest_user.yml -X POST
{"succeeded":true,"resultMessage":"导入RDB 数据:4 条"}[root@master canalserver]#
如果出现上述canalserver服务端挂掉了,也可以手动全量同步来处理一遍
标签:canal,实战,同步,kafka,XX,master,mysql,root 来源: https://www.cnblogs.com/whitelittle/p/16598857.html