使用canal监听mysql数据变化
作者:互联网
### canal介绍
canal是阿里开源的数据库同步框架,采用非侵入式方式,解析mysql的`binary log`,再发送到目的地,目的地可是`mq`,`hbase`,`mysql`,`es`等.
### 本章流程
1. 开启mysql的bin-log日志
2. 创建mysql用户获取bin-log日志
3. canal采集bin-log日志
4. canal-client获取mysql变化信息
#### 开启bin-log日志
只需要在`mysqld.cnf`新增配置
```
server-id=1
log-bin=mysql-bin
```
#### 创建mysql用户
```sql
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
```
#### 配置canal
```bash
# 配置文件1:canal-server/conf/canal.properties
# 端口
canal.port = 11111
# 配置文件2:canal-server/conf/example/instance.properties
# 数据库连接信息
canal.instance.master.address=192.168.41.128:3307
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 监听的表(正则表达式)
canal.instance.filter.regex=.*\\..*
# 主题
canal.mq.topic=example
```
#### 启动mysql/canal
```bash
# 本地测试采用docker方式启动
docker run -d --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v $PWD/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf hub.c.163.com/library/mysql:5.7
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
```
#### 编写canal-client
```java
package com.deri.stream.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @ClassName: Main
* @Description: TODO
* @Author: wuzhiyong
* @Time: 2021/6/11 9:41
* @Version: v1.0
**/
public class Main {
public static void main(String[] args) throws InterruptedException {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.41.128",11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 没有变化,等一秒钟再去拉取数据
Thread.sleep(1000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
```
### 参考连接
- https://www.cnblogs.com/moris5013/p/12371549.html
- https://blog.csdn.net/weixin_35852328/article/details/87600833
标签:canal,CanalEntry,connector,mysql,entry,com,监听 来源: https://blog.51cto.com/happywzy/2895785