其他分享
首页 > 其他分享> > Flink原理与调优

Flink原理与调优

作者:互联网

Flink提交流程(Yarn-Per-Job)

在这里插入图片描述

1. client运行脚本提交命令。
2. CliFrontend实例化CliFrontendParser进行参数解析。
3. CliFrontend实例化YarnJobClusterExecutor并创建客户端。
4. 在客户端中实例化YarnClusterDescriptor封装YarnClient信息,包含提交参数和命令。
5. 将信息提交给RM。
6. RM向NM的yarnRMClient发送消息,启动APPmaster。
7. NM分配资源生成APPmaster,并启动Dispatcher分发器。
8. Dispatcher启动JobMaster。
9. JobMaster启动executionGraph执行图。
10. JobMaster中的SlotPool线程池向SoltManager注册、请求slot。
11. SoltManager向Resource Manager申请资源requestNewWorker。
12. Resource Manager启动TaskManager。
13. APPmaster实例化工作线程池launcherPool。
14. 工作线程池launcherPool中实例化ExecutorRunnalbe。
15. TaskManager实例化YarnTaskExecutorRunner,并生成TaskExecutor。
16. TaskExecutor向SoltManager注册Solt。
17. SoltManager向TaskExecutor分配Solt。
18. TaskExecutor向SoltPool提供Slot。
19. JobMaster生成对应的执行图并提交给Slot中的Task执行。

Flink组件通讯过程

在这里插入图片描述

1.JobMaster和TaskExecutor的RpcService调用startServer()方法启动RpcServer,创建AkkaRpcActor。
2.RpcServer调用start()方法启动RpcEndPoint。
3.RpcService通过Connect()对方的RpcServer得到一个对方的代理客户端RpcGateWay。
4.通过RpcGateway远程调用对端的方法。
5.TaskExecutor的RpcServer转发给代理InvokeHandler。
6.代理调用incoke()->incokeRpc()
7.1.首先判断发送地址是不是本地,是本地的话不涉及网络传输,就不需要序列化,调用LocalRpcInvocation方法。
7.2.不是本地的话需要进行序列化,调用RemoteRpcInvocation方法。
7.3.判断方法是否有返回值,如果有返回值调用ask方法。
8.1.如果调用方法没有返回值,直接返回void()。
8.2.接着判断返回类型是否是CompletableFuture类型,是的话不阻塞直接返回Future。
8.3.如果返回值不是,则阻塞等待返回值。
最后TaskExecutor按照返回类型进行对应处理。

Flink内存模型(TaskManager)

在这里插入图片描述

Flink内存使用了堆上内存和堆外内存,不计入solt资源
Task执行的内存使用了堆上内存和堆外内存
网络缓冲内存是网络数据交换所使用的堆外内存
框架堆外内存、Task对外内存、网络缓冲内存都在堆外的直接内存中。
管理内存是Flink管理的堆外内存,用于管理排序、哈希表、缓冲中间结果以及RocksDB的本地内存。

JVM特有内存是JVM本身占用的内存、包括源空间和执行开销。
Flink使用内存 = 框架堆内内存、框架堆外内存+Task堆内内存、Task堆外内存+网络缓冲内存+管理内存
进程内存 = Flink使用内存 + JVM内存

Flink资源配置调优

内存设置

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn队列
-Djobmanager.memory.process.size=2048mb \ 指定JM的总进程大小 JM 2-4GB即可
-Dtaskmanager.memory.process.size=6144mb \ 指定每个TM的总进程大小 单个TM 2-8GB即可
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TM的slot数 与容器核数对应,1slot或2slot对应1core
-c com.atguigu.app.dwd.LogBaseApp \
/opt/module/flink-jar/XX.jar

最优并行度计算

  1. 先设置并行度10进行压测。
  2. 公式为:QPS/单并行度处理能力 = 并行度
  3. 最后并行度 = 压测并行度*1.2

Source端并行度配置

如果数据源是Kafka,Source的并行度设置为Kafka对应的Topic分区数。如果消费速度跟不上生产速度,则增大Kafka分区数。

Transform端并行度的配置

Sink端并行度的配置

Sink端是数据流向下游的地方,可以根据Sink端的数据量以及下游的服务抗压能力进行评估。如果Sink端是Kafka,可以设置为Kafka对应的Topic分区数。

CheckPoint设置

Flink反压处理

压测

在Kafka中积压数据,之后开启Flink任务,出现反压。

反压

反压是短时间的负载高峰导致系统接收数据的速率高于它处理数据的速率。

比如垃圾回收停顿可能会导致流入的数据快速堆积,或者临时业务活动导致流量徒增。反压得不到合理的处理会导致资源耗尽甚至崩溃。

反压机制是指系统能够自动检测被阻塞的Operator,然后自适应地降低源头或上游数据发送速率,从而维持整个系统的稳定。

反压现象及定位

监控对正常的任务运行有一定影响,因此只有当web页面切换到Job的BakPressure页面时,JobManager才会对Job触发反压监控。

默认情况下JobManager会触发100次采样,每次间隔50ms来确定反压。Web界面中可以看到的比率是多少个stack trace被卡住,比如0.01就表示100个采样中有一个被卡住。

ok标识没有反压:0<=比例<=0.1

low:0.1<=比例<=0.5

high标识反压:0.5<=比例<=1

利用Metrics定位反压位置

当某个Task吞吐量下降时,基于Credit的反压机制,上游不会给该Task发送数据,所以该Task不会频繁卡在向Buffer Pool去申请buffer。

反压监控实现原理就是监控Task是否在申请Buffer这一步,所以遇到瓶颈的Task必然会显示ok,表示没有受到反压。

如果Task吞吐量下降,造成该Task上游的Task出现反压时,必然会出现Task出现的InputChannel变满,已经申请不到可用的Buffer空间,从这个思路出发,监控Task的InputChannel使用情况进行监控,如果InputChannel使用率达到100%,那么该Task正在发生反压。

Flink数据倾斜

判断是否存在数据倾斜

可以通过WebUI精准地看到每个SubTask处理了多少数据,继而判断Flink任务是否存在数据倾斜。数据倾斜和反压会一起出现。

LocalKeyBy替代KeyBy算子

在KeyBy上游算子数据发送之前,首先在上游算子的本地对数据进行聚合再发送到下游,使下游接收到的数据量大大减少,从而使得KeyBy之后的聚合操作不再是任务的瓶颈。

但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。

//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
class LocalKeyByFlatMap extends RichFlatMapFunction<String,Tuple2<String, 
 
 private ListState<Tuple2<String, Long>> localPvStatListState;
 
 //本地 buffer,存放 local 端缓存的 app 的 pv 信息
 private HashMap<String, Long> localPvStat;
 
 //缓存的数据量大小,即:缓存多少数据再向下游发送
 private int batchSize;
 
 //计数器,获取当前批次接收的数据量
 private AtomicInteger currentSize;

 //构造器,批次大小传参
 LocalKeyByFlatMap(int batchSize){
 	this.batchSize = batchSize;
 }

 @Override
 public void flatMap(String in, Collector collector) throws Exception {
 	// 将新来的数据添加到 buffer 中
 	Long pv = localPvStat.getOrDefault(in, 0L);
 	localPvStat.put(in, pv + 1);
 	// 如果到达设定的批次,则将 buffer 中的数据发送到下游
 	if(currentSize.incrementAndGet() >= batchSize){
 		// 遍历 Buffer 中数据,发送到下游
 		for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
 			collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
 		}
 		// Buffer 清空,计数器清零
 		localPvStat.clear();
 		currentSize.set(0);
 	}
 }

 @Override
 public void snapshotState(FunctionSnapshotContext functionSnapshotConte
 	// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
 	localPvStatListState.clear();
 	for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
 		localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
 	}
 }

 @Override
 public void initializeState(FunctionInitializationContext context) {
 	// 从状态中恢复 buffer 中的数据
 	localPvStatListState = context.getOperatorStateStore().getListState
 	new ListStateDescriptor<>("localPvStat",
 	TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
 	localPvStat = new HashMap();
 	if(context.isRestored()) {
 		// 从状态中恢复数据到 localPvStat 中
 		for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
 			// 如果出现 pv != 0,说明改变了并行度,
 			// ListState 中的数据会被均匀分发到新的 subtask中
 			// 所以单个 subtask 恢复的状态中可能包含两个相同的 app 的数据
 			localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
 		}
 		// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
 		currentSize = new AtomicInteger(batchSize);
 	} else {
 		currentSize = new AtomicInteger(0);
 	}
 }

}

强制Shuffle

如果KeyBy之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况是因为数据源本身就不均匀。这种情况下可以让Flink任务强制shuffle。使用shuffle、rebalance或rescale算子既可将数据均匀分配,从而解决数据倾斜的问题。

两阶段聚合

如果使用窗口,变成了有界数据的处理,窗口默认是触发时才会发出一条结果到下游,这时就可以使用两阶段聚合的方式。

第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合。需要注意的是聚合完不再是windowedStream,要获取windowEnd作为窗口标记作为第二阶段分组,避免不同窗口的结果聚合到一起。

第二阶段聚合:去掉随机数前缀或后缀,按照原来的key以及windowsEnd作keyby聚合。

KafkaSource 调优

动态发现分区

使用FlinkKafkaConsumer初始化时,可以通过Properties指定参数开始动态发现partition。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + ""); 
第一个参数是配置项,第二个参数是多久检测一次分区。

Kafka数据源生成watermark

kafka单分区内有序,多分区间无序。这种情况下可以使用Flink识别kafka分区的watermark生成机制。使用此特性可以将Kafka消费端内部针对每个Kafka分区生成watermark,并且不同分区watermark的合并方式与在数据流shuffle时合并方式相同。

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                       .forBoundedOutOfOrderness(Duration.ofMinutes(2))
);

设置空闲等待

如果某一分区或分片在一段时间未发送事件数据,我们称这类数据源为空闲输入或空闲源。在这种情况下会造成个别partition一直没有新的数据。由于下游算子watermark的计算方式是取所有不同的上游并行数据源的watermark最小值,则其watermark不会发生变化,导致窗口、定时器都不会触发。

我们可以将watermark来检测空闲输入并将其标记为空闲状态。

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
						.withIdleness(Duration.ofMinutes(5))
);

Kafka的offset消费策略

当checkpoint机制开启时,KafkaConsumer会定期把Kafka的offset信息和其他状态保存起来,job失败后,Flink会从最近一次Checkpoint回复数据,从保存的offset重新消费kafka中的数据。

Flink SQL工作机制

Flink SQL 执行流程

Flink sql 基于Apache Calcite。Calcite执行步骤如下:

f492106e135102729c0d2603dbd88e6c.png

Flink SQL 优化器

Flink SQL调优

开启MiniBatch(提升吞吐)

MinBatch是微批处理,原理是缓存一定的数据后再触发处理,减少对State的访问,从而提升吞吐并减少数据的输入量。MinBatch依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。

// 获取 tableEnv的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 开启miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
configuration.setString("table.exec.mini-batch.size", "20000");

开启LocalGlobal(解决常见数据热点问题)

LocalGlobal优化将原来的aggregate分为Local+Global两阶段聚合,第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将受到Accumulator合并(Merge),得到最终结果(GlovalAgg)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aQIP4z5c-1647597128293)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220318110219417.png)]

  1. 如果没有开启LocalGlobal优化,由于流中的数据倾斜,key为红色的聚合算子实例需要处理更多的记录,这就导致了数据热点。

  2. 开启LocalGlobal优化,先进行本地聚合再进行全局聚合,可大大减少GlobalAgg的热点,提升性能。

开启Split Distinct(解决COUNT DISTINCT热点问题)

Count Distinct 在Local聚合时,对于Distinct key的去重率不高,导致在Global节点仍然存在热点。

为了解决这个问题,可以手动改写两层聚合来打散,Flink1.9之后,提供了自动打散功能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XcJd703e-1647597128294)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220318111838509.png)]

举例:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

手动两阶段聚合:
-- 外层聚合
SELECT day, SUM(cnt)
FROM (
	-- 手动打散 distinct key
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

改写 agg with filter语法(提升大量count distinct场景性能)

在某些场景下,可能需要从不同维度统计,这时可能会使用cash when语法。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

在这种情况下,可以使用FILTER语法,Flink可以只使用一个共享状态示例,而不是三个状态,可减少状态的大小和对状态的访问。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

TopN优化(无排名优化,解决数据膨胀问题)

根据TopN的语法,rownum字段会作为结果表的主键字段之一写入结果表。但是这会导致数据膨胀的问题。比如收到一条原排名9的更新数据,更新后排名上升到1,则从1到9的数据排名都发生变化,需要将这些数据作为更新都写入结果,就会导致数据膨胀。

可以将TopN的输出结果无需要显示rownum值,仅需在最终前端显示时进行一次排序,极大减少结果表的输入。

原SQL:
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]

优化:
SELECT col1, col2, col3
FROM (
 SELECT col1, col2, col3
   ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
 FROM table_name)
WHERE rownum <= N [AND conditions]

对于无rownum的场景,结果表主键的定义需要十分小心,如果定义有误会导致TopN结果不正确。无rownum场景,主键应该为TopN上游GroupBY节点的key列表。

TopN优化(增加TopN的Cache大小)

TopN为了提升性能有一个state Cache层,Cache层能够提升对State的访问效率。TopN的Cache命中率的计算公式为:

cache_hit = cache_size*parallelism/top_n/partition_key_num
命中率 = 配置缓存数*并发/top数/PatitionBy_key数
例如Top100配置缓存10000条,并发50,当PatitionBy的key维度10万级别
5% = 10000*50/100/100000

保留首行的去重策略(Deduplicate Keep FirstRow)

保留key下第一条出现的数据,之后出现的key的数据会被丢弃,因为state中只存储了key数据,所以性能较优。

保留末行的去重策略(Deduplicate Keep LastRow)

保留key下最后一条出现的数据,可以按照业务时间保留最后一条数据。

like操作注意事项

指定时区

为了避免时区错乱的问题可以指定时区

Flink并行度设置

可以从四个不同层面设置并行度:

优先级:算子层面>环境层面>客户端层面>系统层面

并行度设置:一般设置为Kafka分区数,遵循2的N次方。

Flink的KeyBy怎么实现的分区

对指定的key调用自身的hashCode方法,

调用murmruhash算法,进行第二次hash,得到键组ID

通过一个公式,计算当前数据应该去往哪个下游分区:
键组ID * 下游算子并行度 / 最大并行度

算子的一个并行示例可以理解为一个分区,是物理上的资源。

数据按照key进行区分可以理解为一个分组。

一个分区可以有多个分组,同一个分组的数据肯定在同一个分区。

Flink的interval join的实现原理

底层调用的是Keyby+Connect,处理逻辑如下:

interval join不会处理join不上的数据,如果需要没join上的数据,可以用coGroup+connect算子实现,或者直接使用left join或right join语法。

Flink状态机制

算子状态:作用范围是算子,算子的多个并行实例各自维护一个状态。

监控状态:每个分组维护一个状态。

状态后端:

本地状态checkpoint
内存TaskManager的内存JobManager内存
文件TaskManager的内存HDFS
RocksDBRocksDBHDFS

Flink水位线和时间语义

Watermark是一条携带时间戳的数据,用来衡量Event Time进展的机制,可以设定延迟触发,从代码指定生成的位置,插入到流中。

Watermark的生成有两种方式,官方默认提供的是周期性水位线,默认是200ms,除周期性水位线外还有间歇性水位线,来一条数据,更新一次水位线。

时间语义有三种:

Flink的窗口

一致性语义

Source:可重发

Transformation:Checkpoint机制(Chandy-Lamport算法、barrier对齐)

Sink:幂等性、事务性

Flink Checkpoint

https://blog.csdn.net/qq_41106844/article/details/114372717

标签:Task,聚合,Flink,并行度,调优,内存,原理,数据
来源: https://blog.csdn.net/qq_41106844/article/details/123581092