使用amazonriver同步PG数据到Kafka
作者:互联网
最近生产上,DW有实时同步数据的需求,这里我们研究了下社区的开源组件,最终选择了amazonriver这个哈罗单车开源的小工具来实现。
前提条件:
1、上游的PG主库要开启逻辑复制
2、表必须有主键
部署方案:
项目地址:https://github.com/hellobike/amazonriver
amazonriver的安装部署很简单,根据官方github操作即可。
我这里的部署路径是: /usr/local/amazonriver
root@node1:/usr/local/amazonriver # tree ├── adb-tb1.json ├── amazonriver
cat adb-tb1.json 配置文件如下:
{ "pg_dump_path": "/usr/pgsql-13/bin/pg_dump", "subscribes": [{ "dump": false, "slotName": "dts_adb_tb1", "pgConnConf": { "host": "192.168.2.4", "port": 5432, "database": "adb", "user": "dts", "password": "dts" }, "rules": [ { "table": "tb1", "pks": ["id"], "topic": "rti_p_adb_tb1" } ], "kafkaConf": { "addrs": ["dev-kafka-01:9092"] }, "retry": -1 }], "prometheus_address": ":8212" }
cat /etc/supervisord.d/adb-tb1.ini
[program:amazonriver-adb-tb1] command=/usr/local/amazonriver/amazonriver -config /usr/local/amazonriver/adb-tb1.json directory=/usr/local/amazonriver redirect_stderr=true stdout_logfile=/tmp/supervisord/%(program_name)s.log stdout_logfile_maxbytes=100MB stdout_logfile_backups=2 stderr_logfile=/tmp/supervisord/%(program_name)s.log stderr_logfile_maxbytes=100MB stderr_logfile_backups=2 process_name=%(program_name)s numprocs=1 numprocs_start=0 autostart=true autorestart=unexpected startsecs=10 startretries=999 exitcodes=0,2 stopsignal=TERM stopwaitsecs=10 stopasgroup=false killasgroup=false
我这里使用的是supervisord控制的,直接起来即可。
注意事项:
1、Amazonriver的配置文件中,注意下面这个参数:
"retry": -1
这样可保证在amazonriver人工重启或者异常重启的时候数据不丢失(但是会出现下游数据多得情况),下游数据多,需要自行去重下。
可以看这个ISSUE:https://github.com/hellobike/amazonriver/issues/27
2、PG中的注意事项
如果某个amazonriver进程不在用了,意味着它创建的对应的复制槽也不再用了,记得及时删除掉,不然wallog会越积越多,把磁盘空间打满。
包装成一个api接口,方便使用:
标签:tb1,Kafka,amazonriver,adb,PG,logfile,local,usr 来源: https://blog.51cto.com/lee90/2842865