Canal学习笔记
作者:互联网
Cancle
1.目的
阿里解决异地机房数据同步(基于cannal的Otter)问题。直接读取数据库进行同步会造成数据库服务器压力过大,所以通过读取binlog增量日志(增删改)来进行增量数据的获取,由此衍生了数据订阅和消费的业务。
2.原理及作用
基于数据库日志解析完成数据库增量数据同步。
3.MySql的binglog
3.1作用
以事件的形式记录了DDL和DML操作,且是事务安全型的。开启binglog会有大概1%的性能损耗。
3.2应用
数据实时收仓,数据恢复,缓存刷新,es同步,Otter组件(原本的目的收到数据后otter同步到不同机房数据库)
3.3binlog的分类
a statement 语句级别
节省空间但是可能造成数据不一致 如now()函数
b row 行级别
保持数据一致性,只记录sql执行后的结果,占用空间。数据分析一般用这个。
c mixed
二者的结合,不常用
4.工作原理
把自己伪装成一个slave,假装从Master中复制数据。
5.环境准备
a Mysql 8
(Mysql8 依赖)https://www.microsoft.com/zh-cn/download/details.aspx?id=42642
b 不能远程连接问题
use mysql;
update user set host = '%' where user ='root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //赋予任何主机访问数据的权限
FLUSH PRIVILEGES;
b 创建数据库和表
c 修改配置文件开启Binlog
win需要打开可查看隐藏文件,C:\ProgramData\MySQL\MySQL Server 8.0可以查看到my.ini
server-id=服务id
log-bin=二进制日志文件的名称
binlog_format=日志类型
binlog-do-db=数据库
INSERT INTO fruit VALUES (1,'香蕉',20.22),(2,'苹果',40.22);
INSERT INTO fruit VALUES (3,'菠萝',20.22),(4,'榴莲',40.22);
INSERT INTO fruit VALUES (4,'西瓜',20.22),(5,'葡萄',40.22);
binlog日志变大
重启服务会新产生一个binlog文件
6.下载和安装
a 下载链接 https://github.com/alibaba/canal/releases
b 目录结构
c 修改 canal.properties
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#这后面不需要分号!!!
canal.instance.tsdb.url = jdbc:mysql://192.168.67.222:13306/goods_mark
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = root
7启动canal
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd
http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cannalDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cannalDemo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
测试代码
package com.example.cannaldemo.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws
InvalidProtocolBufferException {
//1.获取 canal 连接对象 user pwd 为cannal的密码可以不写
CanalConnector canalConnector =
CanalConnectors.newSingleConnector(new
InetSocketAddress("localhost", 11111), "example", "", "");
//2.获取连接 只需要获取一次即可
canalConnector.connect();
while (true) {
//3.指定要监控的数据库 * 表示数据库下面所有表
canalConnector.subscribe("goods_mark.*");
//4.获取 Message
Message message = canalConnector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();
if (entries.size() <= 0) {
System.out.println("没有数据,休息一会");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
for (CanalEntry.Entry entry : entries) {
//TODO 获取表名
String tableName =
entry.getHeader().getTableName();
// TODO Entry 类型
CanalEntry.EntryType entryType =
entry.getEntryType();
// TODO 判断 entryType 是否为 ROWDATA
if
(CanalEntry.EntryType.ROWDATA.equals(entryType)) {
// TODO 序列化数据
ByteString storeValue = entry.getStoreValue();
// TODO 反序列化
CanalEntry.RowChange rowChange =
CanalEntry.RowChange.parseFrom(storeValue);
//TODO 获取事件类型
CanalEntry.EventType eventType =
rowChange.getEventType();
//TODO 获取具体的数据
List<CanalEntry.RowData> rowDatasList =
rowChange.getRowDatasList();
//TODO 遍历并打印数据
for (CanalEntry.RowData rowData : rowDatasList)
{
List<CanalEntry.Column> beforeColumnsList =
rowData.getBeforeColumnsList();
JSONObject beforeData = new JSONObject();
for (CanalEntry.Column column :
beforeColumnsList) {
beforeData.put(column.getName(),
column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList =
rowData.getAfterColumnsList();
for (CanalEntry.Column column :
afterColumnsList) {
afterData.put(column.getName(),
column.getValue());
}
System.out.println("TableName:" + tableName
+
",EventType:" + eventType +
",After:" + beforeData +
",After:" + afterData);
}
}
}
}
}
} }
标签:Canal,canal,TODO,CanalEntry,笔记,学习,import,com,tsdb 来源: https://blog.csdn.net/qq_40850266/article/details/122464896