flink(十三):flink-CheckPoint和SavePoint作用和区别
作者:互联网
文章目录
分享
说明
- 本博客周五更新一次
- Flink 功能模块CheckPoint(检查点)和SavePoint(保存点)是任务异常后恢复任务的重要功能,当任务异常关闭时,可以从检查点或保存点恢复任务。
CheckcPoint
- CheckPoint是 flink 实现容错机制最核心的功能,它会按照配置周期性从Stream中各个Operator 的状态生成 Snapshot ,并定期持久化这些状态信息,当 Flink 程序意外崩溃,指定合适 Snapshot 重启任务,修正因故障带来的程序数据状态中断。
运行原理
- 为 CheckPoint 设置触发时间间隔后,当需要触发 CheckPoint 时,系统会向 Flink 程序运行时的多个分布式的Stream Source中插入一个 Barrier 标记,Barrier 会根据Stream 中的数据记录一起流向下游的各个 Operator。当一个 Operator 接收到一个Barrier时,它会暂停处理 Steam 中新接收到的数据记录。因为一个 Operator 可能存在多个输入的 Stream,每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输入 Stream 中的 Barrier 都到达。当所有 Barrier 都到达该Operator,这时所有的 Barrier 在时间上看来是同一个时刻点(表示已经对齐),在等待所有 Barrier 到达的过程中,Operator 的 Buffer 中可能缓存了一些比 Barrier 早到达Operator的数据记录(Outgoing Records),此时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次CheckPoint的结果数据。
开启设置
- 开启CheckPoint功能,有两种方式,个人推荐第二种方式,针对当前任务设置。
-
1、在conf/flink_conf.yaml中做系统设置
#存储CheckPoint的目录,支持jobmanager、filesystem、rocksdb、 state.CheckPoints.dir: hdfs://namenode-host:port/flink-CheckPoints #CheckPoint存储数量,默认1 state.CheckPoints.num-retained: 1
-
2、针对任务再代码里灵活配置。
//初始化环境 StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); // 设置保存点的保存路径,这里是保存在hdfs中 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-CheckPoints")); CheckPointConfig config = env.getCheckPointConfig(); // 任务流取消和故障应保留检查点 config.enableExternalizedCheckPoints(ExternalizedCheckPointCleanup.RETAIN_ON_CANCELLATION); // 保存点模式:exactly_once config.setCheckPointingMode(CheckPointingMode.EXACTLY_ONCE); // 触发保存点的时间间隔, 每隔1000 ms进行启动一个检查点 config.setCheckPointInterval(60000); // 确保检查点之间有至少500 ms的间隔【CheckPoint最小间隔】 config.setMinPauseBetweenCheckPoints(500); // 检查点必须在一分钟内完成,或者被丢弃【CheckPoint的超时时间】 config.setCheckPointTimeout(60000); // 同一时间只允许进行一个检查点 config.setMaxConcurrentCheckPoints(1);
-
ExternalizedCheckPointCleanup有两个可选项:
- ExternalizedCheckPointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
- ExternalizedCheckPointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。
-
保存点模式支持三种:
- NONE、
- AT_LEAST_ONCE (默认):消息重试发送,多次接收消息客户端会认为上次接收无效,重复处理。
- EXACTLY_ONCE:消息重试发送,多次接收消息也会保证最多一次地传递给最终consumer
-
保存多个CheckPoint
- 默认情况下,如果设置了 CheckPoint 选项,则Flink只保留最近成功生成的1个CheckPoint,当Flink程序失败时,可以从最近的这个CheckPoint来恢复程序。
- 但如果需要保留多个CheckPoint,可以灵活的根据实际需要选择其中一个进行恢复。比如,发现最近1个小时数据记录处理异常,希望将整个状态还原到1小时前。
- Flink可以支持保留多个CheckPoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定保存的CheckPoint个数:
state.CheckPoints.num-retained: 20
恢复任务
- 如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个CheckPoint点,比如chk-860进行回放,执行如下命令
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-CheckPoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
SavePoint
运行原理
- SavePoint 是用来为整个流处理应用在某个“时间点”(point-in-time)进行快照生成的功能。该快照包含了数据源读取到的偏移量(offset),输入源的位置信息以及整个应用的状态。借助 分布式快照算法(Chandy-Lamport )的变体,我们可以在应用程序运行中得到某个“时间点”一致的快照。
- SavePoint由一个目录以及一个元数据文件构成。其中目录中通常为一个很大的二进制文件,文件中包含了整个流应用在SavePoint或CheckPoint的状态。另外元数据文件通常相对较小,其中包含了指向SavePoint目录中各个文件的指针。
创建SavePoint
- 创建一个SavePoint,需要指定对应SavePoint目录,有两种方式来指定。
- 1、在Flink的配置文件conf/flink-conf.yaml中置SavePoint存储目录,添加如下配置。
state.SavePoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-SavePoints
- 2、手动执行SavePoint命令的时候,指定SavePoint存储目录,命令格式:
bin/flink SavePoint :jobId [:targetDirectory]
- 为id为:40dcc6d2ba90f13930abce295de8d038 任务设置SavePoint,存储目录为默认SavePoint目录:
bin/flink SavePoint 40dcc6d2ba90f13930abce295de8d038
- 指定SavePoint目录:
bin/flink SavePoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/SavePoints
- 为id为:40dcc6d2ba90f13930abce295de8d038 任务设置SavePoint,存储目录为默认SavePoint目录:
恢复任务
- 恢复任务命令:
bin/flink run -s :SavePointPath [:runArgs]
- 实例:
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/SavePoints/SavePoint-40dcc6-a90008f0f82f flink-app-jobs.jar
SavePoint和CheckPoint的区别
- SavePoint和CheckPoint是两种不同的任务状态保存机制,大致区别如下图:
详细区别
- CheckPoint的侧重点是“容错”,当Flink作业意外失败,并重启时能直接从早先打下的CheckPoint恢复运行,且不影响作业逻辑的准确性。而SavePoint侧重点是“维护”,当Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从SavePoint恢复现场。
- SavePoint是“通过CheckPoint机制”创建的,所以SavePoint本质上是特殊的CheckPoint。
- CheckPoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;SavePoint面向用户,完全根据用户的需要触发与清理。
- CheckPoint 的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以CheckPoint的存储格式非常轻量级,但作为 trade-off 牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。 SavePoint 则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证 portability ,如并行度改变或代码升级之后,仍然能正常恢复。
- CheckPoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。SavePoint并不会连续自动触发,所以不支持增量。
总结
- ChekPoint 和 SavePoint 是一个功能,只是在使用上有区别,都是对计算过程状态的存储。
- 做事先做人,修心修身。
标签:flink,SavePoint,Flink,CheckPoint,检查点,Operator 来源: https://blog.csdn.net/qq_22973811/article/details/118603825