其他分享
首页 > 其他分享> > 360 政企安全集团基于 Flink 的 PB 级数据即席查询实践

360 政企安全集团基于 Flink 的 PB 级数据即席查询实践

作者:互联网

本文整理自 360 政企安全集团的大数据工程师苏军以及刘佳在 Flink Forward Asia 2020 分享的议题《基于 Flink 的 PB 级数据即席查询实践》,文章内容为:
1.Threat Hunting 平台的架构与设计(苏军)
2.以降低 IO 为目标的优化与探索(刘佳)
3.未来规划

首先做一个简单的个人以及团队介绍。我们来自 360 政企安全集团,目前主要从事 360 安全大脑的 “威胁狩猎“ 项目的开发工作。我们团队接触 Flink 的时间比较早,在此期间,我们基于 Flink 开发出了多款产品,并在 2017 年和 2019 年参加了于柏林举办的 Flink Forward 大会,分别介绍了我们的 “UEBA” 以及 “AutoML” 两款产品。

本次分享主要分为两块内容:

一、Threat Hunting 平台的架构与设计 (苏军)

第一部分内容大致分为三个部分,分别是:

1. 平台的演进

我们认为所有技术的演化和革新都需要具体的商业问题来驱动,以下是我们团队近几年基于 Flink 开发的几款产品:

通过调查发现,拥有 PB 级数据规模的客户往往有以下几个商业需求:

2. 架构设计

首先,数据是来自于已经存储在 ES 中的历史数据和 kafka 里的实时数据,其中 ES 里的历史数据我们通过自己开发的同步工具来同步,kafka 里的实时数据我们则通过 Streaming File Sink 写 orc 文件到存储集群。在数据同步的同时,我们会将这批数据的索引信息更新到数据库中。

安全分析人员会从前端页面通过写交互式分析语言 HQL 发起数据检索的请求,此时请求会进入调度系统,一旦开始执行作业,首先会将分析语句解析成算子列表,算子缓存算法会判断该次查询是否可以命中缓存系统中已有的缓存数据。

我们会先提取出查询语言的过滤条件或者是 Join 条件来做谓词下推,进入索引数据库中获得目前符合该查询的文件列表,随后将文件列表交给计算引擎来进行计算。计算引擎我们采用双引擎模式,其中复杂度高的语句我们通过 Flink 引擎来完成,其它较为简单的任务我们交给平台内部的 “蜂鸟引擎”。“蜂鸟引擎” 基于 Apache arrow 做向量化执行,加上 LLVM 编译,查询延迟会非常小。

由于整个系统的存算分离,为了加速数据读取,我们在计算集群节点上增加了 alluxio 来提供数据缓存服务,其中不仅缓存 remote cluster 上的数据,同时会缓存部分历史作业结果,通过算子缓存的算法来加速下次计算任务。

这里还需要强调两点:

3. 深入探索索引结构

数据库为了加速数据检索,我们往往会事先为数据创建索引,再在扫描数据之前通过索引定位到数据的起始位置,从而加速数据检索。而传统数据库常见的是行索引,通过一个或若干字段创建索引,索引结果以树形结构存储,此类索引能够精确到行级别,索引效率最高。

某些大数据项目也支持了行索引,而它所带来的弊端就是大量的索引数据会造成写入和检索的延时。而我们平台处理的是机器数据,例如终端/网络这类数据,它的特点是重复度非常高,而安全分析的结果往往非常少,极少数的威胁行为会隐藏在海量数据里,占比往往会是 1/1000 甚至更少。

所以我们选择性价比更高的块索引方案,已经能够支撑目前的应用场景。目前通过客户数据来看, 索引能够为 85% 的语句提供 90% 以上的裁剪率,基本满足延时要求。

某些大数据平台是将索引数据以文件的形式存储在磁盘上,外加一些 cache 机制来加速数据访问,而我们是将索引数据直接存在了数据库中。主要有以下两个方面的考虑:

上图为块索引的设计。在我们的索引数据库中,我们把这些数据分为不同类别数据源,比如终端数据为一类数据源,网络数据为一类数据源,我们分类数据源的逻辑是他们是否拥有统一的 Schema。就单个数据源来说,它以日期作为 Partition,Partition 内部是大量的 ORC 小文件,具体到索引结构,我们会为每一个字段建 min/max 索引,基数小于 0.001 的字段我们建 Bloom 索引。

上文提到过,安全人员比较喜欢用 like 和全文检索。对于 like 这一块,我们也做了一些优化。全文检索方面,我们会为数据来做分词,来构建倒排索引,同时也会对于单个分词过后的单个 item 来做文件分布层面的位图索引。

上图是一个索引大小的大致的比例假设,JSON 格式的原始日志大有 50PB,转化成 ORC 大概是 1PB 左右。我们的 Index 数据是 508GB, 其中 8GB 为 Min/Max 索引,500GB 为 Bloom。加上上文提到的位图以及倒排,这个索引数据的占比会进一步加大。基于此,我们采用的是分布式的索引方案。

我们知道日志是在不断的进行变化的,对于有的数据员来说,他有时会增加字段或者减少字段,甚至有时字段类型也会发生变化。

那么我们采取这种 Merge Schema 模式方案,在文件增量写入的过程中,也就是在更新这批数据的索引信息的同时来做 Schema Merge 的操作。如图所示,在 block123 中,文件 3 是最后一个写入的。随着文件的不断写入,会组成一个全新的 Merge Schema。可以看到 B 字段和 C 字段其实是历史字段,而 A_V 字段是 A 字段的历史版本字段,我们用这种方式来尽量多的让客户看到比较全的数据。最后基于自己开发的 Input format 加 Merge Schema 来构建一个新的 table source ,从而打通整个流程。

二、以降低 IO 为目标的优化与探索 (刘佳)

上文介绍了为什么要选择块索引,那么接下来将具体介绍如何使用块索引。块索引的核心可以落在两个字上:“裁剪”。裁剪就是在查询语句被真正执行前就将无关的文件给过滤掉,尽可能减少进入计算引擎的数据量,从数据源端进行节流。

这张图展示了整个系统使用 IndexDB 来做裁剪流程:

同时,构建 source 的时候,我们在细节上做了一些优化。比如在将 filter 传给 ORC reader 的时候,清除掉已经 pushdown 了的 filter, 避免在引擎侧进行二次过滤。当然, 这里并不是将所有 filter 都清除掉了,我们保留了 like 表达式,关于 like 的 filter pushdown 会在后文介绍。

接下来着重介绍一下四大优化点:

1. 裁剪率的理论上限及 Hilbert 空间填充曲线

裁剪可以抽象成 N 个球扔进 M 个桶的概率问题,在这里我们直接说结论。假设行在块中随机均匀分布,所有块的总行数固定,查询条件命中的总行数也固定,则块命中率直接与 “命中的总行数 / 总块数” 正相关。

结论有两个:

为什么使用 hilbert 曲线?主要是基于两点:

hilbert 用法,就是实现一个 UDF,输入列值,输出坐标值,然后根据坐标值排序。

我们抽样了客户环境所使用的 1500 条 SQL 语句,过滤掉了其中裁剪率为分之 100% 的相关语句,也就是没有命中文件的无效语句。然后还剩下 1148 条,我们使用这些语句做了裁剪率排序后,对裁剪率进行了对比,裁剪率 95 百分位从之前的 68% 提升到了 87%,提升了 19%。可能大家会觉得 19% 这个数值不是特别高,但如果我们带上一个基数,比如说 10 万个文件,这样看的话就会很可观了。

2. 字典索引上 Like 的优化

之前也有讲到安全行业的特殊性,我们做威胁检测的时候会严重依赖 like 查询。鉴于此,我们也对它做了优化。

例如图上所示,最左边的 SQL 中有三个表达式。前两个在上文中已经提到了,是将 filter 直接 pushdown 到 index db 中完成,我们交给 orc reader 的 filter 只有最后一个 attachment_name like '%投标%',真正需要读取的记录只是 dict 包含 ”投标“ 的 row group,也就是做到了 row group 级别的过滤,进一步减少了需要进入计算引擎的数据量。

3. 基于索引对 join 的优化

威胁情报的匹配中大量使用 join 操作,如果要加速 join 的性能,仅仅是 where 条件的 filter pushdown 是远远不够的。

Flink 中已经内置了许多 join 算法,比如 broadcast join, hash join 和 sort merge join。其中,sort merge join 对预先排好序的表 join 非常友好,而上文有提到我们使用 Hilbert 曲线来对多字段进行联合排序,所以 sort merge join 暂时不在我们的优化范围之内。

另外,我们知道 join 的性能和左右表的大小正相关,而威胁情报 join 的稀疏度非常高,所以事先对左右表做裁剪,能够大幅减少进入 join 阶段的数据。

上文提到过我们已经为常见字段建立了 bloom 索引。那么利用这些已经创建好的 bloom,来进行文件预过滤,就变得顺理成章,并且省掉了构建 bloom 的时间开销。

对于 broadcast join,我们直接扫描小表,将小表记录依次进入大表所属文件的 bloom,判断该数据块是否需要, 对数据量大的表做预裁剪。

对于 hash join,正如我们看到的,我们可以预先对 join key 的文件级 bloom 做 “预 join” 操作,具体就是将左表所属的某个文件的 bloom 依次与右表所属文件的 bloom 做 “与” 操作,只保留左右表能 ”与后结果条数不为 0“ 的文件,再让各表剩余的文件进入引擎做后续计算。

比如说图上的这三张表,分别是 table1、 table2 和 table3 。我们可以从 index DB 中获取到表的统计信息,也就是文件个数或者说是文件表的大小。图上就直接列的是文件个数:table 1 是 1000 个, 然后 table 2 是 5 万个文件, table 3 是 3 万个文件。

我们就是参照上一张图片里面的逻辑进行预 join,然后预估 join 的成本。我们会让成本低的预 join 先进行,这样的话就能够大幅度减少中间结果,提升 join 的效率。

4. Alluxio 作为对象存储的缓存

因为底层文件存储系统的多种多样,所以我们选取了 Alluxio 数据编排系统,Alluxio 的优点是让数据更靠近计算框架,利用内存或者 SSD 多级缓存机制加速文件访问,如果在完全命中 cache 的情况下,能够达到内存级 IO 的文件访问速度,减少直接从底层文件系统读文件的频次,很大程度上缓解了底层文件系统的压力。

对我们系统来说就是它带来了更高的并发,而且对低裁剪率的查询更友好,因为低裁剪率的话就意味着需要读取大量的文件。

如果这些文件在之前的查询中已经被 load 到 cache 里面,就能够大幅度的提升查询速度。

在做完这些优化以后,我们做了性能对比测试。我们选取了一个规模为 249TB 的 es 集群。它使用了 20 台服务器,Flink 使用了两台服务器,为了在图标上看到更直观的对比效果,我们选取了 16 条测试结果。

图表上红橙色的是 es,蓝色的是 HQL 优化前,绿色的是 HQL 优化后。上面的数字标签是与 es 相比,HQL 的性能差值。比如第一个标签就意味着 HQL 的性能五倍于 es,其中 6 号和 7 号比 es 慢,主要是因为 HQL 是块索引,es 是行索引,全在内存里面,所以可以做到超快的检索速度。13 号是因为 HQL 在使用 not equal 的情况下,裁剪率相对较差。

总体说,优化效果是很明显的,大部分语句在与 es 查询速度相比是持平甚至略优的。完全满足客户对长周期数据存储和查询的期望。

三、未来规划

上图是未来规划。因为客户现场经常会涉及到很多的 BI Dashboard 运算和长周期运算报告的需求,所以我们下一步会考虑做 BI 预算,以及苏军提到的容器化和 JVM 预热,当然还有对标 es,以及提升多用户并发查询的能力。

原文链接

本文为阿里云原创内容,未经允许不得转载。

标签:文件,join,Flink,查询,PB,索引,数据,我们,政企
来源: https://blog.csdn.net/weixin_43970890/article/details/119111724