其他分享
首页 > 其他分享> > Flink中如何实现一个自定义MetricReporter

Flink中如何实现一个自定义MetricReporter

作者:互联网

什么是 Metrics

在 flink 任务运行的过程中,用户通常想知道任务运行的一些基本指标,比如吞吐量、内存和 cpu 使用情况、checkpoint 稳定性等等。而通过 flink metrics 这些指标都可以轻而易举地获取到,避免任务的运行处于黑盒状态,通过分析这些指标,可以更好的调整任务的资源、定位遇到的问题、对任务进行监控。接下来本文将介绍 flink metrics 的一些基本概念与原理以及实践。

Flink 对于指标监测有一套自己的实现,同时 flink 自身系统有一些固定的 metric 数据, 包括系统的一些指标,CPU,内存, IO  或者各个 task 运行的一些指标。指标的统计方式有四种,这些指标都实现了 Metric 这个接口,而 Metric 这个接口只是一个标识,本身并没有定义如何方法接口,部分子类的继承关系如下所示。

从图中可以看出,Metric 这个接口有四个直接子类,分别是:

Metrics 使用

下面以 Counter 为例,说明 Metric 的具体用法,Counters 通常用来计数,可以通过 inc 或 dec 方法来对计数值进行增加或减少。

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

Metric Reporter

获取 Metrics 有三种方法,首先可以在 WebUI 上看到;其次可以通过 RESTful API 获取,RESTful API 对程序比较友好,比如写自动化脚本或程序,自动化运维和测试,通过 RESTful API 解析返回的 Json 格式对程序比较友好;最后,还可以通过 Metric Reporter 获取,监控主要使用 Metric Reporter 功能。

flink 提供了很多外部监控系统的支持:JMX(java 自带的技术,不严格属于第三方)、Graphite、InfluxDB、Prometheus、StatsD、Datadog、Slf4j(直接打 log 里)等,也可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。如果想要定期发送报告,可以实现 Scheduled 接口。

Metric Reporter 是如何配置的?首先 Metrics Reporters 的名字用逗号分隔,然后通过 metrics.reporter.jmx.class 的 classname 反射找 reporter,还需要拿到 metrics.reporter.jmx.port 的配置,比如向第三方系统通过网络发送的比较多,但要知道往哪里发,ip 地址、port 信息是比较常见的。

vim flink/conf/flink-conf.yaml

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
metrics.reporter.my_other_reporter.interval: 60 SECONDS

开发者可以实现自己的 reporter,将 metrics 数据导出到不同的系统。

自定义 Metric Reporter

MetricReporter 是用来向外暴露 Metric 的监测结果的接口。由于 MetricReporter 的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共、无参的构造函数,这个接口的定义如下:

public interface MetricReporter {
   void open(MetricConfig config);
   void close();
   void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
   void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

关注 gzh “HEY DATA” 后台回复关键字 MetricReporter 可获得自定义 MetricReporter 实现例子文件。

标签:flink,自定义,reporter,Metric,Flink,metrics,MetricReporter,my
来源: https://blog.csdn.net/weixin_40278610/article/details/121440853