数据库
首页 > 数据库> > Canal-保存mysql篇

Canal-保存mysql篇

作者:互联网

Canal-保存mysql篇

一、java实现

先用java代码手写一遍,方便后续业务逻辑理解

1、maven配置:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.24</version>
        </dependency>

        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

2、application.properties配置

# 服务端口
server.port=10000
# 服务名
spring.application.name=canal-client

# 环境设置:dev、test、prod
spring.profiles.active=dev

# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=603409875

3、代码实现

4、效果

运行sql语句:

INSERT INTO `tb_commodity_info` ( `id`, `commodity_name`, `commodity_price`, `number`, `description` )
VALUES
	( '030acbd3b71011ecb9760242ac110005', '测试0001', '5.88', 11, '描述信息0001' );

执行结果:

库1:

库2:

二、使用canal.adapter来做实现

上面的代码初步可以实现一般情况下的数据迁移,接下来我们来实现一下给予adapter的方式

1、安装canal.adapter

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz

mkdir /tmp/canal_adapter
tar zxvf canal.adapter-1.1.5.tar.gz  -C /tmp/canal_adapter
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    #kafka.bootstrap.servers: 127.0.0.1:9092
    #kafka.enable.auto.commit: false
    #kafka.auto.commit.interval.ms: 1000
    #kafka.auto.offset.reset: latest
    #kafka.request.timeout.ms: 40000
#    kafka.session.timeout.ms: 30000
#    kafka.isolation.level: read_committed
#    kafka.max.poll.records: 1000
    # rocketMQ consumer
#    rocketmq.namespace:
#    rocketmq.namesrv.addr: 127.0.0.1:9876
#    rocketmq.batch.size: 1000
#    rocketmq.enable.message.trace: false
#    rocketmq.customized.trace.topic:
#    rocketmq.access.channel:
#    rocketmq.subscribe.filter:
    # rabbitMQ consumer
#    rabbitmq.host:
#    rabbitmq.virtual.host:
#    rabbitmq.username:
#    rabbitmq.password:
#    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://源数据库ip:端口/cancal_mysql?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
#      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/cancal_mysql?useUnicode=true
          jdbc.username: root
          jdbc.password: 603409875
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
sh bin/startup.sh

不出意外的果然出意外了

由于出现了意外,所以我们吧源码拉下来跑一遍后,更换了上面的配置等问题,终于成功了

三、源码解析

1、拉取源码到本地

git clone https://github.com/alibaba/canal.git

2、打开项目

3、修改配置

adapter项目启动入口是client-adapter包下的launcher模块

修改配置文件:application.ymlrdb目录下的tb_commodity_info.yml

4、启动项目

启动类CanalAdapterApplication

5、记录问题

通过源码追溯

在启动时,依据配置需要加载rdb的spi,需要在之前加载logger,所以需要在配置中加入logger

canalAdapters:
    - instance: example # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
          - name: logger   #解决此bug问题
            - name: rdb
              key: mysql1
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
                jdbc.username: root
                jdbc.password: 603409875

这时候可以将client-adapter的maven进行install一遍,会出现不少缺jar包的方式,我执行时缺了俩个jar

将这俩个jar打包出来后,在install adapter就可以了

注意:若还不行,可以先将launcher package然后在执行

进入ConfigLoader&load(Properties envProperties)方法中,看到加载表映射关系,跟代码后发现,说没有映射文件

为了查这个问题,我又跑到spi加载文件的地方去查,查到加载的路径是:

canal/1.1.5/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin/client-adapter.rdb-1.1.5-jar-with-dependencies.jar

打开jar包,原来是之前没改的时候的jar,此时又去查询install了rdb的包,在重新pack一遍,终于可以运行了

标签:Canal,canal,jdbc,CanalEntry,mysql,保存,sql,entry
来源: https://www.cnblogs.com/java5wanping/p/16124352.html