其他分享
首页 > 其他分享> > Canal学习笔记

Canal学习笔记

作者:互联网

Cancle

1.目的

阿里解决异地机房数据同步(基于cannal的Otter)问题。直接读取数据库进行同步会造成数据库服务器压力过大,所以通过读取binlog增量日志(增删改)来进行增量数据的获取,由此衍生了数据订阅和消费的业务。

2.原理及作用

基于数据库日志解析完成数据库增量数据同步。

3.MySql的binglog

3.1作用

以事件的形式记录了DDL和DML操作,且是事务安全型的。开启binglog会有大概1%的性能损耗。

3.2应用

数据实时收仓,数据恢复,缓存刷新,es同步,Otter组件(原本的目的收到数据后otter同步到不同机房数据库)

3.3binlog的分类

a statement 语句级别

节省空间但是可能造成数据不一致 如now()函数

b row 行级别

保持数据一致性,只记录sql执行后的结果,占用空间。数据分析一般用这个。

c mixed

二者的结合,不常用

4.工作原理

把自己伪装成一个slave,假装从Master中复制数据。

5.环境准备

a Mysql 8

(Mysql8 依赖)https://www.microsoft.com/zh-cn/download/details.aspx?id=42642

b 不能远程连接问题
在这里插入图片描述

use mysql;
update user set host = '%' where user ='root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //赋予任何主机访问数据的权限
FLUSH PRIVILEGES;

在这里插入图片描述
b 创建数据库和表

在这里插入图片描述
c 修改配置文件开启Binlog

win需要打开可查看隐藏文件,C:\ProgramData\MySQL\MySQL Server 8.0可以查看到my.ini
在这里插入图片描述
在这里插入图片描述
server-id=服务id
log-bin=二进制日志文件的名称
binlog_format=日志类型
binlog-do-db=数据库
在这里插入图片描述

INSERT INTO fruit VALUES (1,'香蕉',20.22),(2,'苹果',40.22);
INSERT INTO fruit VALUES (3,'菠萝',20.22),(4,'榴莲',40.22);
INSERT INTO fruit VALUES (4,'西瓜',20.22),(5,'葡萄',40.22);

在这里插入图片描述
binlog日志变大

重启服务会新产生一个binlog文件
在这里插入图片描述

6.下载和安装

a 下载链接 https://github.com/alibaba/canal/releases

b 目录结构
在这里插入图片描述
c 修改 canal.properties

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

# canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#这后面不需要分号!!!
canal.instance.tsdb.url = jdbc:mysql://192.168.67.222:13306/goods_mark
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = root

7启动canal
在这里插入图片描述
POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd
http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>cannalDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cannalDemo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

测试代码

package com.example.cannaldemo.config;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws
            InvalidProtocolBufferException {
        //1.获取 canal 连接对象 user pwd 为cannal的密码可以不写
        CanalConnector canalConnector =
                CanalConnectors.newSingleConnector(new
                        InetSocketAddress("localhost", 11111), "example", "", "");
        //2.获取连接 只需要获取一次即可
        canalConnector.connect();
        while (true) {
            //3.指定要监控的数据库 * 表示数据库下面所有表
            canalConnector.subscribe("goods_mark.*");
            //4.获取 Message
            Message message = canalConnector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() <= 0) {
                System.out.println("没有数据,休息一会");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                for (CanalEntry.Entry entry : entries) {
                    //TODO 获取表名
                    String tableName =
                            entry.getHeader().getTableName();
                    // TODO Entry 类型
                    CanalEntry.EntryType entryType =
                            entry.getEntryType();
                    // TODO 判断 entryType 是否为 ROWDATA
                    if
                    (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // TODO 序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // TODO 反序列化
                        CanalEntry.RowChange rowChange =
                                CanalEntry.RowChange.parseFrom(storeValue);
                        //TODO 获取事件类型
                        CanalEntry.EventType eventType =
                                rowChange.getEventType();
                        //TODO 获取具体的数据
                        List<CanalEntry.RowData> rowDatasList =
                                rowChange.getRowDatasList();
                        //TODO 遍历并打印数据
                        for (CanalEntry.RowData rowData : rowDatasList)
                        {
                            List<CanalEntry.Column> beforeColumnsList =
                                    rowData.getBeforeColumnsList();
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column :
                                    beforeColumnsList) {
                                beforeData.put(column.getName(),
                                        column.getValue());
                            }
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList =
                                    rowData.getAfterColumnsList();
                            for (CanalEntry.Column column :
                                    afterColumnsList) {
                                afterData.put(column.getName(),
                                        column.getValue());
                            }
                            System.out.println("TableName:" + tableName
                                    +
                                    ",EventType:" + eventType +
                                    ",After:" + beforeData +
                                    ",After:" + afterData);
                        }
                    }
                }
            }
        }
    } }

在这里插入图片描述

标签:Canal,canal,TODO,CanalEntry,笔记,学习,import,com,tsdb
来源: https://blog.csdn.net/qq_40850266/article/details/122464896