数据库
首页 > 数据库> > nifi的去重方案设计(二)-外部存储mysql全局去重

nifi的去重方案设计(二)-外部存储mysql全局去重

作者:互联网

这里只抛砖引玉列一个方案

nifi内本身支持存储插件,但对不同业务类型不够通用,不够灵活

外部存储可以用支持 唯一索引的db,kv类的存储等

可以组合nifi的官方组件,或自定义开发组件

processor1 检查外部存储是否存在key,不存在的则进入下级flowfile

processor2 把flowfile写入外部存储

因为有中间队列处理的时间差,所以无法绝对去重,会漏掉一些重复数据

基于官方组件+mysql的去重方案

介绍下个人选择的mysql去重方案,之所以用mysql,flowfile的attr信息,实际也是mysql业务信息,可以统一管理查看任务的处理进度

该方案最大的优点是不需要定制开发,组合nifi的官方组件即可完成方案的应用,且方便结合业务场景,做任务的状态管理

需要做的是配置相关nifi组件的参数

mysql相关表配置唯一索引

应用nifi的 PUTSQL组件,使用INSERT IGNORE 语句往mysql写入数据

并设置组件的属性 Obtain Generated Keys:true 这个配置是关键,描述record写入mysql是否保留mysql记录自生成的id(例如自增id)

实现原理为 如果唯一索引不存在冲突,则数据写入成功,mysql会为该记录生成id,并返回给PUTSQL processor,这个id会维持在 flowfile的 attr sql.generated.key下,例如sql.generated.key=1

如果唯一索引存在冲突,则数据定入失败,也不就不存在sql.generated.keysql.generated.key=null

在PUTSQL Processor的下级再添加一项RouteOnAttribute Processor

配置规则 ${sql.generated.key:notNull()}

nifi官方不提供filter processor 也应该是因为RouteOnAttribute完全可以作为一个filter使用,配置有效规则,终止unmatched的项,将未匹配的数据直接丢弃,实际也就是提取出了,原先数据库不冲突的项

通过sql.generated.key 是否为null 来判断,不为null,表示写入成功,若sql配置了自增主键则该值为写入成功的生成键,若为null,表示因唯一索引冲突写入失败

写入成功的项,为数据库中原先没有的项,也即去重后要处理的项 下列示例以url_md5为唯一索引

INSERT IGNORE `distinct_db`.`download_url`(`url_md5`,`domain`,`url`) VALUES('${task.md5}', '${task.domain}','${task.url}');

nifi示例和图片有空再补

通过insert ignore 判断,此种需要配置好数据库的唯一索引

关键配置是Obtain Generated Keys 是否唯持写入后的key

实现原理为 如果唯一索引不冲突,则写入成功,下流的该条数据会新增一项attribute ${sql.generated.key:notNull()}

通过sql.generated.key 是否为null 来判断,不为null,表示写入成功,若sql配置了自增主键则该值为写入成功的生成键,若为null,表示因唯一索引冲突写入失败

写入成功的项,为数据库中原先没有的项,也通常为需要继续处理的项

通过RouteOnAttribute 1.11.4 做路由分枝,终止unmatched的项,等于丢弃未写入的数据,实际过滤出了,原先数据库不冲突的项

方法已经适用99%以上的场景,但特点是依赖唯一索引,若需重新处理某条数据,则需要物理删除相应的sql记录

部分研发规范,不建议使用物理删除

改进的方式是两项,1删除前将数据写入至另一张同构表后(或任何其他外部存储,保证物理删除后,可追踪),再物理删除该项数据

同样的思路可以尝试下其他的外部存储的方案

如果表数据量过大,可以采用定制Bloom filter的方案来实现

有时间会会结合上篇的局部去重,整合一个示例demo,分享补充一个方案的应用示例

标签:nifi,方案设计,mysql,写入,索引,key,sql
来源: https://www.cnblogs.com/zihunqingxin/p/14460169.html