其他分享
首页 > 其他分享> > Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单

Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单

作者:互联网

Flink的状态与容错是这个框架很核心的知识点。其中一致检查点也就是Checkpoints也是Flink故障恢复机制的核心,这篇文章将详细介绍Flink的状态管理和Checkpoints的概念以及在生产环境中的参数设置。

什么是State状态?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-su4PajFa-1635586788360)(picture/image-20211030165453846.png)]

有状态和无状态介绍

 

 

 

State状态后端:存储在哪里

什么是Checkpoint检查点

 

Flink 捆绑的些检查点存储类型:

 

端到端(end-to-end)状态一致性

数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的

在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)

端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。

 

有关检查点配置的常用参数配置介绍

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
env.getCheckpointConfig().setCheckpointInterval(1000);
// 设置状态级别模式为exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//超时时间,可能是保存太耗费时间或者是状态后端的问题,任务同步执行不能一直阻塞
env.getCheckpointConfig().setCheckpointTimeout(60000L);  
// 设置取消和故障时是否保留Checkpoint数据,这个设置较为重要,没有正确的选择好可能会导致检查点数据失效
//有两个参数可以设置
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。必须在取消后手动清理检查点状态。
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 

实战部分:

为了模拟生产环境中实时产生的订单数据,这里我们自己定义一个数据源来源源不断的产生模拟订单数据

订单类:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
   private String tradeNo;
   private String title;
   private int money;
   private int userId;
   private Date createTime;

   @Override
   public String toString() {
       return "VideoOrder{" +
               "tradeNo='" + tradeNo + '\'' +
               ", title='" + title + '\'' +
               ", money=" + money +
               ", userId=" + userId +
               ", createTime=" + createTime +
               '}';
  }
}
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {

  private volatile Boolean flag = true;

  private Random random = new Random();

  private static List<VideoOrder> list = new ArrayList<>();
  static {
      list.add(new VideoOrder("","java",10,0,null));
      list.add(new VideoOrder("","spring boot",15,0,null));
  }


  /**
    * run 方法调用前 用于初始化连接
    * @param parameters
    * @throws Exception
    */
  @Override
  public void open(Configuration parameters) throws Exception {
      System.out.println("-----open-----");
  }

  /**
    * 用于清理之前
    * @throws Exception
    */
  @Override
  public void close() throws Exception {
      System.out.println("-----close-----");
  }


  /**
    * 产生数据的逻辑
    * @param ctx
    * @throws Exception
    */
  @Override
  public void run(SourceContext<VideoOrder> ctx) throws Exception {

      while (flag){
          Thread.sleep(1000);
          String id = UUID.randomUUID().toString().substring(30);
          int userId = random.nextInt(10);
          int videoNum = random.nextInt(list.size());
          VideoOrder videoOrder = list.get(videoNum);
          videoOrder.setUserId(userId);
          videoOrder.setCreateTime(new Date());
          videoOrder.setTradeNo(id);
          System.out.println("产生:"+videoOrder.getTitle()+",价格:"+videoOrder.getMoney()+", 时间:"+ TimeUtil.format(videoOrder.getCreateTime()));
          ctx.collect(videoOrder);
      }
  }

  /**
    * 控制任务取消
    */
  @Override
  public void cancel() {
      flag = false;
  }
}

产生数据的格式如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J8KdoB7o-1635586788363)(picture/image-20211030171800058.png)]

主程序:使用reduce算子对数据进订单价格进行滚动计算,并设置Checkpoint保证数据状态可以存取


public class FlinkKeyByReduceApp {

   /**
    * source
    * transformation
    * sink
    *
    * @param args
    */
   public static void main(String[] args) throws Exception {

       //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
       env.enableCheckpointing(5000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
       //这是我本机的ip地址      
       env.getCheckpointConfig().setCheckpointStorage(new                                           FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint"));

      DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
       KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
           @Override
           public String getKey(VideoOrder value) throws Exception {
               return value.getTitle();
          }
      });

       SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
           @Override
           public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
               VideoOrder videoOrder = new VideoOrder();
               videoOrder.setTitle(value1.getTitle());
               videoOrder.setMoney(value1.getMoney() + value2.getMoney());
               return videoOrder;
          }
      });

       reduce.print();

       env.execute("job");
  }

}

在本地测试运行结果,可以看到数据根据订单分组不断的进行滚动计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D8NDkIa1-1635586788365)(picture/image-20211030172433842.png)]

进入服务器的HDFS查看检查点数据是否存在

在这里插入图片描述

 

之后将应用进行打包,上传到服务器进行测试,可以使用Flink的Web页面进行手动提交jar包运行,也可以使用命令进行提交,之后可以看到程序运行过程中的相关日志输出

./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar 

在这里插入图片描述

 

模拟宕机

运行程序的时候我们可以在Flink看到任务进行的id号,这个时候我们手动的cancel掉或者是直接把服务kill掉,这个时候任务被强制暂停。

进入到HDFS可以看到我们设置的检查点的数据依旧存在,我们使用如下命令,让程序从上次宕机前的订单计算状态继续往下计算。

-s : 指定检查点的元数据的位置,这个位置记录着宕机前程序的计算状态
./bin/flink run -s /checkpoint/id号/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar

在这里插入图片描述

运行命令,进入WEB页面进行查看,是否成功。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DDN7NI7I-1635586788372)(picture/image-20211014174041183.png)]

可以看到出现一次close的时候,代表我们的程序以及停止,服务器已经宕机,这个时候订单的计算结果如上图的红色方框。在我们运行了上面那条命令后再次查看日志的数据,从open开始可以看到这次就不是从订单最初的状态开始进行的了,而是从上一次宕机前计算的结果,继续往下计算,到这里Checkponit的实战应用测试就完成了。

 

 

标签:状态,宕机,Flink,videoOrder,订单,检查点,env,new,电商
来源: https://www.cnblogs.com/soyboke/p/15592888.html