其他分享
首页 > 其他分享> > Flink实例(126):状态管理(十五)State 过期时间TTL

Flink实例(126):状态管理(十五)State 过期时间TTL

作者:互联网

1 State 过期时间TTL

  使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。

  例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

  对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理,下面我们具体介绍一下。

1.1 State TTL 功能的用法

在 Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
 
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。这个 StateTtlConfig 对象可以通过构造器模式(Builder Pattern)来创建,典型地用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Update Type)和状态可见性(State Visibility),这两个功能的含义将在下面的文章中详细描述。当 StateTtlConfig 对象构造完成后,即可在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。

从上述的代码也可以看到,State TTL 功能所指定的过期时间并不是全局生效的,而是和某个具体的状态所绑定。换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入 StateTtlConfig 对象。对 Flink 源码感兴趣的同学,可以尝试为 Flink 增加一个默认的 StateTTL 选项,实现起来很简单,这里不再展开说明了。

State TTL 使用的更多案例,可以参见官方的 flink-stream-state-ttl-test 包,它提供了很多测试用例可以参考。

1.2 StateTtlConfig 的参数说明

配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。

策略类型描述
StateTtlConfig.UpdateType.Disabled 禁用TTL,永不过期
StateTtlConfig.UpdateType.OnCreateAndWrite 每次写操作都会更新State的最后访问时间
StateTtlConfig.UpdateType.OnReadAndWrite 每次读写操作都会跟新State的最后访问时间
策略类型描述
StateTtlConfig.StateVisibility.NeverReturnExpired 永不返回过期状态
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回过期但尚未被清理的状态值

1.3 Notes:

2 State清除策略

2.1 Cleanup in full snapshot

  默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。

  此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。

  在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig 中配置。(1)下面的配置选项不适用于 RocksDB state backend上的 increamental checkpointing;(2)对于现有作业,此清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后可以使用。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

2.2 Incremental cleanup

  另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。

  每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。这个特性可以在StateTtlConfig中激活:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally
    .build

上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。Notes:

2.3 Cleanup during RocksDB compaction

  如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。

  默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flink job来说如果一个自定义的RocksDB 状态管理被创建那么它可以调用 RocksDBStateBackend::enableTtlCompactionFilter。

  然后任何带有TTL的状态都可以配置来去使用过滤器。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter
    .build

  RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。

  cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。

Notes:

  目前,管理 operator state 仅仅支持使用 List 类型。当前,支持 List 样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配 non-keyed state 的最佳粒度。根据状态访问方法,定义一下重新分配方案。

 

标签:状态,清理,过期,Flink,State,126,TTL,StateTtlConfig
来源: https://www.cnblogs.com/qiu-hua/p/14471568.html