其他分享
首页 > 其他分享> > 基于EMR OLAP的开源实时数仓解决方案之ClickHouse事务实现

基于EMR OLAP的开源实时数仓解决方案之ClickHouse事务实现

作者:互联网

简介:Flink 和 ClickHouse 分别是实时流式计算和 OLAP 领域的翘楚,很多互联网、广告、游戏等客户都将两者联合使用于构建用户画像、实时 BI 报表、应用监控指标查询、监控等业务,形成了实时数仓解决方案

image.png

作者 | 扬流、枢木、辰繁
来源 | 阿里技术公众号

一 背景

Flink 和 ClickHouse 分别是实时流式计算和 OLAP 领域的翘楚,很多互联网、广告、游戏等客户都将两者联合使用于构建用户画像、实时 BI 报表、应用监控指标查询、监控等业务,形成了实时数仓解决方案(如图-1)。这些业务对数据的准确性要求都十分严格,所以实时数仓整个链路需要保证端到端的 Exactly-Once。通常来说 Flink 的上游是可以重复读取或者消费的 pull-based 持久化存储(例如Kafka),要实现 Source 端的 Exactly-Once 只需要回溯 Source 端的读取进度即可。Sink 端的 Exactly-Once 则比较复杂,因为 Sink 是 push-based 的,需要依赖目标输出系统的事务保证,但社区 ClickHouse 对事务并不支持,所以针对此情况阿里云 EMR ClickHouse 与 Flink 团队一起深度研发,支持了 Flink 到 ClickHouse 的 Exactly-Once写入来保证整个实时数仓数据的准确性。本文将分别介绍下现有机制以及实现方案。

image.png

图-1 实时数仓架构

二 机制梳理

1 ClickHouse 写入机制

ClickHouse 是一个 MPP 架构的列式 OLAP 系统(如图-2),各个节点是对等的,通过 Zookeeper 协同数据,可以通过并发对各个节点写本地表的方式进行大批量的数据导入。

ClickHouse 的 data part 是数据存储的最小单元,ClickHouse 接收到的数据 Block 在写入时,会按照 partition 粒度进行拆分,形成一个或多个 data part。data part 在写入磁盘后,会通过后台merge线程不断的合并,将小块的 data part 合并成大块的 data part,以此降低存储和读取的开销。

在向本地表写入数据时,ClickHouse 首先会写入一个临时的 data part,这个临时 data part 的数据对客户端不可见,之后会直接进行 rename 操作,使这个临时 data part 成为正式 data part,此时数据对客户端可见。几乎所有的临时 data part 都会快速地成功被 rename 成正式 data part,没有被 rename 成功的临时 data part 最终将被 ClickHouse 清理策略从磁盘上删除。

通过上述分析,可以看出 ClickHouse 的数据写入有一个从临时 data part 转为正式 data part 的机制,加以修改可以符合两阶段提交协议,这是实现分布式系统中事务提交一致性的重要协议。

image.png

图-2 Flink作业写入ClickHouse

注:多个 Flink Task 可以写入同一个 shard 或 replica

2 Flink 写机制

Flink 作为一个分布式处理引擎,提供了基于事务的 Sink 机制,该机制可以保障写入的 Exactly-Once,相应的数据接收方需要提供遵守 XA 规范的 JDBC 。由于完整的 XA 规范相当复杂,因此,我们先对 Flink 的处理机制进行梳理,结合 ClickHouse 的实际情况,确定需要实现的接口范围。

为了实现分布式写入时的事务提交统一,Flink 借助了 checkpoint 机制。该机制能够周期性地将各个 Operator 中的状态生成快照并进行持久化存储。在 checkpoint 机制中,有一个 Coordinator 角色,用来协调所有 Operator 的行为。从 Operator 的角度来看,一次 checkpoint 有三个阶段,初始化-->生成快照-->完成/废弃 checkpoint。从Coordinator的角度来看,需要定时触发 checkpoint,以及在所有 Operator 完成快照后,触发 complete 通知。(参考附录1)

接下来介绍 Flink 中的 Operator 是如何借助事务和 checkpoint 机制来保障 Exactly-Once,Operator 的完整执行需要经过 initial、writeData、snapshot、commit 和 close 阶段。

initial阶段:

writeData阶段:

snapshot阶段:

complete阶段:

在所有 Operator 的 snapshot 阶段全部正常完成后,Coordinator 会通知所有 Operator 对已经成功的checkpoint 进行 complete 操作,在与 ClickHouse 的交互中,此阶段为 Operator 调用 JDBC 提供的 commit() 接口对事务进行提交。

close阶段:

从上述流程可以总结出,Flink 通过 checkpoint 和事务机制,将上游数据按 checkpoint 周期分割成批,保障每一批数据在全部写入完成后,再由 Coordinator 通知所有 Operator 共同完成 commit 操作。当有 Operator 写入失败时,将会退回到上次成功的 checkpoint 的状态,并根据快照记录的 xid 对这一批 checkpoint 的所有 xid 进行 rollback 操作。在有 commit 操作失败时,将会重试 commit 操作,仍然失败将会交由人工介入处理。

三 技术方案

1 整体方案

根据 Flink 和 ClickHouse 的写入机制,可以描绘出一个Flink 到 ClickHouse 的事务写入的时序图(如图-3)。由于写的是 ClickHouse 的本地表,并且事务的统一提交由 Coordinator 保障,因此 ClickHouse 无需实现 XA 规范中标准的分布式事务,只需实现两阶段提交协议中的少数关键接口,其他接口在 JDBC 侧进行缺省即可。

image.png

图-3 Flink到ClickHouse事务写入的时序图

2 ClickHouse-Server

状态机

为了实现 ClickHouse 的事务,我们首先定义一下所要实现的事务允许的几种操作:

事务状态:

完整的状态机如下图-4所示:

image.png

图-4 ClickHouse Server支持事务的状态机

图中所有操作均是幂等的。其中,Committing 到 Committed 和 Aborting 到 Aborted 是不需要执行任何操作的,在开始执行 Commit 或 Rollback 时,事务的状态即转成 Committing 或 Aborting;在执行完 Commit 或 Rollback 之后,事务的状态会被设置成 Committed 或 Aborted。

事务处理

Client 通过 HTTP Restful API 访问 ClickHouse Server,Client 与 ClickHouse Server 间一次完整事务的交互过程如图-5所示:

image.png

图-5 Clickhouse事务处理的时序图

正常流程:

异常处理:

3 ClickHouse-JDBC

根据 XA 规范,完整的分布式事务机制需要实现大量的标准接口(参考附录2)。在本设计中,实际上只需要实现少量关键接口,因此,采用了基于组合的适配器模式,向 Flink 提供基于标准 XA 接口的 XAResource 实现,同时对 ClickHouse Server 屏蔽了不需要支持的接口。

对于 XADataSource 的实现,采用了基于继承的适配器模式,并针对 Exactly-Once 的特性,修改了部分默认配置,如发送失败的重试次数等参数。

另外,在生产环境中,通常不会通过分布式表,而是通过 SLB 进行数据写入时的负载均衡。在 Exactly-Once 场景中,Flink 侧的 Task 需要保持针对某一 ClickHouse Server 节点的连接,因此不能使用 SLB 的方式进行负载均衡。针对这一问题,我们借鉴了 BalanceClickHouseDataSource 的思路,通过在 URL 中配置多个IP,并在 properties 配置中将 write_mode 设置为 Random ,可以使 XADataSource 在保障 Exactly-Once 的同时,具有负载均衡的能力。

4 Flink-Connector-ClickHouse

Flink 作为一个流式数据处理引擎,支持向多种数据接收端写入的能力,每种接收端都需要实现特定的Connector。针对 Exactly-Once,ClickHouse Connector 增加了对于 XADataSource 的选项配置,根据客户端的配置提供 Exactly-Once 功能。

四 测试结果

1 ClickHouse事务性能测试

由图-6可以看出,无论ClickHouse 是否开启事务, ClickHouse 的吞吐量都与 Client 端并发写的线程数成正比。开启事务时,ClickHouse中临时 data part 不会立刻被转为正式 data part,所以在事务完成前大量临时 data part 不会参与 ClickHouse merge 过程,降低磁盘IO对写性能的影响,所以开启事务写性能较未开启事务写性能更好;但事务内包含的批次变多,临时 data part 在磁盘上的增多导致了合并时 CPU 的压力增大,从而影响了写入的性能,开启事务的写性能也会降低。

image.png

图-6 ClickHouse写入性能压测(一)

由图-7可以看出,无论ClickHouse 是否开启事务, ClickHouse 的吞吐量都与单批次数据量大小成正比。开启事务时,每批次数据越小,ClickHouse 的吞吐量受事务是否开启的影响就越大,这是因为每批次写入的时间在事务处理的占比较小,事务会对此产生一定的影响,因此,一次事务包含的批次数量越多,越能够减少事务对写入性能的影响;当事务包含批次的增大,事务处理时间在写入中的占比逐渐降低,ClickHouse merge 产生的影响越来越大,从而影响了写入的性能,开启事务较不开启事务写性能更好。

image.png

图-7 ClickHouse写入性能压测(二)

2 Flink写入ClickHouse性能比较

image.png

图-8 Flink写入ClickHouse测试

五 未来规划

该版本 EMR ClickHouse 实现的事务还不是很完善,只支持单机事务,不支持分布式事务。分布式系统一般都是通过 Meta Server 来做统一元数据管理来支持分布式事务机制。当前我们也正在规划设计 ClickHouse MetaServer 来支持分布式事务,同时可以移除 ClickHouse 对 ZooKeeper 的依赖。

原文链接
本文为阿里云原创内容,未经允许不得转载。 

标签:数仓,事务,Flink,写入,OLAP,part,EMR,data,ClickHouse
来源: https://www.cnblogs.com/yunqishequ/p/15628799.html