其他分享
首页 > 其他分享> > 【翻译】Apache Flink 1.12.0 Release Announcement

【翻译】Apache Flink 1.12.0 Release Announcement

作者:互联网

本文来自官网: https://flink.apache.org/news/2020/12/10/release-1.12.0.html

2020年12月10日Marta Paes(@morsapaes)和Aljoscha Krettek(@aljoscha

Apache Flink社区很高兴地宣布 Flink 1.12.0 发布了!近 300 个 contributors  在1000个 threads 上工作,对可用性进行了重大改进,并提供了简化(并统一)整个API堆栈的Flink处理的新功能。

发布要点

这篇博客文章描述了所有主要的新功能和改进,需要注意的重要更改以及预期的发展。

Apache Flink 1.12.0发行公告

 

2020年12月10日Marta Paes(@morsapaes)和Aljoscha Krettek(@aljoscha

Apache Flink社区很高兴宣布Flink 1.12.0的发布!近300个贡献者在1000个线程上进行了工作,以显着提高可用性以及新功能,这些功能简化了(并统一了)整个API堆栈的Flink处理。

发布要点

这篇博客文章描述了所有主要的新功能和改进,需要注意的重要更改以及预期的发展。

现在,可以在Flink网站的更新的“下载”页面上找到二进制发布包和源码包,并且可以在 PyPI 上获得最新的 PyFlink 发布包。请仔细阅读 release notes,并查看完整的发行变更日志更新文档以获取更多详细信息。

我们鼓励您下载这个发行版,并通过Flink邮件列表JIRA与社区分享您的反馈。

新功能和改进

DataStream API 中的批处理执行模式

Flink 的核心 API 在项目的整个生命周期中都得到了有机开发,并且最初在设计时就考虑了特定的用例。尽管Table API / SQL已经具有统一的算子,但使用较低级别的抽象仍然需要您在批处理(DataSet API)和流式(DataStream API)的两个语义上不同的API之间进行选择。由于批处理是无限制流的子集,因此将它们合并到单个API中有一些明显的优势:

考虑到这些优点,社区已朝着统一DataStream API迈出了第一步:支持高效的批处理执行(FLIP-134)。从长远来看,这意味着 DataSet API 将被 DataStream API 和 Table API / SQL(FLIP-131)弃用和包含。对于统一工作的概述,请参阅这次最近的 Flink Forward 大会

批处理流

您已经可以使用DataStream API来处理有界的流(例如文件),其限制是运行时不“知道”作业是有界的。为了优化有界输入的 runtime ,新 BATCH 模式执行使用基于排序的 shuffles (具有纯内存的聚合)和改进的调度策略(请参见流水线区域调度)。结果,BATCH DataStream API 中的模式执行已经非常接近 Flink 1.12 中DataSet API 的性能。有关性能基准的更多详细信息,请查看原始提案(FLIP-140)。

 

 

在 Flink 1.12 中,默认执行模式为 STREAMING。要配置作业以 BATCH 模式运行,您可以在提交作业时设置配置:

bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

,或以编程方式执行:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeMode.BATCH);

注意:尽管尚未弃用 DataSet API,但我们建议用户优先使用具有 BATCH 执行模式的 DataStream API 来执行新的批处理作业,并考虑迁移现有的 DataSet 作业。 

新的 Data Sink API(Beta)

以前的版本中,已经确保 Data Source 连接器可以同时在两种执行模式下工作,因此在Flink 1.12中,社区专注于实现统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit protocol 和一个更加模块化的接口,其中各个组件透明地暴露给框架。

一个 Data Sink 实现必须提供 what  和 How:SinkWriter 写入哪些需要提交的数据和输出(即 committables ); 以及封装了如何处理提交表的CommitterGlobalCommitter。该框架负责 when 和 where:在什么时间和在 which  机器或程序提交。

 

这种更加模块化的抽象允许为 BATCH 和 STREAMING 模式执行不同的 runtime  实现,这些 runtime  实现了预期的目的,但是仅使用一个统一的 data sink 实现。在Flink 1.12中,FileSink 连接器是 StreamingFileSink(FLINK-19758)的统一替代品。其余的连接器将在未来的版本中移植到新接口。 

Kubernetes高可用性(HA)服务

Kubernetes 提供了Flink可以用于 JobManager   故障转移的内置功能,而不是依赖 ZooKeeper。为了启用 “ ZooKeeperless” HA 设置,社区在 Flink 1.12(FLIP-144)中实现了 Kubernetes HA 服务。该服务与 ZooKeeper 实现基于相同的基本接口构建,并使用 Kubernetes 的 ConfigMap 对象处理从 JobManager 故障中恢复所需的所有元数据。有关如何配置高可用性 Kubernetes 集群的更多详细信息和示例,请参阅文档

注意:这并不意味着将删除 ZooKeeper 依赖,而只是在 Kubernetes 上为 Flink 用户提供了替代方法。

其他改进

将现有的 connector 迁移到新的 Data Source API

先前版本引入了新的 Data Source API(FLIP-27),允许实现既可以作为有界(批处理)源也可以作为无界(流式)源使用的 connector。在 Flink 1.12 中,社区开始从 FileSystem 连接器(FLINK-19161)开始将现有 source 连接器移植到新接口。

注意:统一的 source 实现将是完全独立的连接器,与旧版本不兼容。

Pipelined Region 调度(FLIP-119

Flink 的调度程序在很大程度上是分别为批处理和流处理工作负载而设计的。此版本引入了统一的调度策略,该策略可识别 blocking 的数据交换,以将执行图分解为 Pipelined Region。这样,仅当有数据可以执行工作时才调度每个区域,并且仅在所有必需的资源都可用时才部署它。以及独立重启失败的区域。特别是对于批处理作业,新策略可提高资源利用率,并消除死锁。

支持 Sort-Merge Shuffles(FLIP-148

为了提高大规模批处理作业的稳定性,性能和资源利用率,社区引入了 sort-merge shuffle,以替代 Flink 已经使用的原始 shuffle 实现。这种方法可以显着减少 shuffle 时间,并使用更少的文件句柄和文件写入缓冲区(这对于大规模作业是有问题的)。后续版本(FLINK-19614)中将实现进一步的优化。

注意:此功能是实验性的,默认情况下未启用。要启用 sort-merge shuffle,您可以在 TaskManager 网络配置选项中配置合理的最小并行度阈值。

对Flink WebUI(FLIP-75)的改进

作为对 Flink WebUI 的一系列改进的延续,该社区致力于在 WebUI(FLIP-104)上公开 JobManager 的内存相关指标和配置参数。TaskManager 的指标页面也已更新,以反映对 Flink 1.10(FLIP-102)中引入的 TaskManager 内存模型更改,并为托管内存,网络内存和元空间添加了新的指标。

Table API / SQL:SQL 连接器中的元数据处理

某些 Source(和 Format)将其他字段公开为元数据,这些字段对于用户与记录数据一起处理是很有价值的。一个常见的例子是 kafka,你可能要如 访问 offset,partition 或 topic 的信息,读/写记录 key 或使用嵌入的元数据的 timestamp 基于时间的操作。在新版本中,Flink SQL支持元数据列以读取和写入表每一行的特定于连接器和格式的字段(FLIP-107)。这些列在CREATE TABLE语句中使用METADATA(保留)关键字声明。

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
  headers MAP<STRING, BYTES> METADATA  -- access Kafka 'headers' metadata
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 
  'format' = 'avro'
);

在 Flink 1.12 中,公开了 Kafka 和 Kinesis 连接器的元数据,并且已经计划在 FileSystem 连接器上支持(FLINK-19903)。由于 Kafka 记录的结构更为复杂,因此还专门为 Kafka 连接器实现了新属性,以控制如何处理键/值对。有关 Flink SQL 中元数据支持的完整概述,请查看每个连接器的文档以及原始提案中的用例。

Table API / SQL:Upsert Kafka 连接器

对于某些用例,例如 interpreting compacted topics 或写出(更新)汇总结果,有必要将 Kafka 记录键作为真正的主键来处理,以确定可以插入,删除或更新的内容。为了实现这一点,社区创建了一个专用的 upsert 连接器upsert-kafka),用于扩展基本实现以在upsert 模式(FLIP-149)中工作。

新的 upsert-kafka 连接器可用于 Source 和 Sink ,并提供与现有 Kafka 连接器相同的基本功能和持久性保证,因为它下重用了大部分代码。要使用 upsert-kafka connector,您必须在创建表时定义一个主键约束,并为键(key.format)和值(value.format)指定(反序列化)格式。

Table API / SQL:在SQL中支持 Temporal Table Joins

现在,您无需创建临时表函数来在某个特定时间点查询表,而只需使用标准 SQL 子句 FOR SYSTEM_TIME AS OF(SQL:2011)来表示临时表联接。此外,现在支持对具有时间属性和主键的任何类型的表进行时态联接,而不仅仅是 append-only table。这可以解锁一组新的用例,例如直接针对 Kafka compacted 的主题或数据库变更日志(例如,来自Debezium)执行临时联接。

-- Table backed by a Kafka topic
CREATE TABLE orders (
    order_id STRING,
    currency STRING,
    amount INT,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kafka',
    ...
);

-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates ( 
    currency STRING,
    currency_rate DECIMAL(38, 10),
    currency_time TIMESTAMP(3),
    WATERMARK FOR currency_time AS currency_time - INTERVAL '5' SECOND,
    PRIMARY KEY (currency) NOT ENFORCED      
) WITH (
  'connector' = 'upsert-kafka',
  ...
);

-- Event-time temporal table join
SELECT 
    o.order_id,
    o.order_time, 
    o.amount * r.currency_rate AS amount,
    r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;

 

前面的示例还显示了如何在 temporal table joins 的上下文中利用新的 upsert-kafka  连接器。

Hive Tables in Temporal Table Joins

您还可以通过自动读取最新的表分区作为 temporal table(FLINK-19644)或整个表作为在执行时跟踪最新版本的有界流来对 Hive 表执行 temporal table join。有关在临时表联接中使用Hive表的示例,请参考文档


Table API / SQL的其他改进

Kinesis Flink SQL 连接器(FLINK-18858

从 Flink 1.12开始,Table API / SQL 本身也支持将 Amazon Kinesis Data Streams(KDS)作为 source/sink 。新的 Kinesis SQL 连接器随附对增强的扇出(EFO)和 sink 分区的支持。有关支持的功能,配置选项和公开的元数据的完整概述,请查看更新的文档

FileSystem / Hive 连接器中的 streaming sink 压缩(FLINK-19345

当写为大文件时,许多批量格式(例如Parquet)是最有效的。当启用频繁 checkpoint 时,这是一个挑战,因为创建的小文件太多(需要在检查点上滚动)。在 Flink 1.12中,fink sink 支持文件压缩,从而允许作业保留较小的 checkpoint 间隔,而不会生成大量文件。要启用文件压缩,您可以按照文档中的说明设置 FileSystem 连接器的属性 auto-compaction=true。

Kafka 连接器中的水印下推(FLINK-20041

为了确保从 Kafka 消费时的正确性,通常最好在每个分区的基础上生成水印,因为分区中的乱序通常比所有分区中的乱序性要低。Flink 现在将下推水印策略,以从 Kafka 内部发出分区的水印。source 的输出水印将由其读取的分区上的最小水印确定,从而产生更好(即更接近实时)的水印。通过水印下推功能,您还可以配置分区的空闲状态检测,以防止空闲分区阻止整个应用程序的事件时间进度。

新支持的格式

FormatDescriptionSupported Connectors
Avro Schema Registry 读写 Confluent Schema Registry KafkaAvroSerializer 序列化的数据。 Kafka, Upsert Kafka
Debezium Avro 读取和写入使用Confluent Schema Registry KafkaAvroSerializer序列化的Debezium记录。 Kafka
Maxwell (CDC) 读写 Maxwell JSON 记录.

Kafka

FileSystem

Raw 读写 raw (byte-based) 值做为一列值.

Kafka, Upsert Kafka

Kinesis

FileSystem

 

用于联接优化的多输入运算符(FLINK-19621

为了消除不必要的序列化和数据溢出并提高批处理和流表API / SQL作业的性能,default planner 现在利用最新版本(FLIP-92)中引入的 N-ary stream operator 来实现运算符的“链接”通过前边缘连接。

Table API UDAF的类型推断(FLIP-65

此版本结束了 Flink 1.9 中针对 Table API 的新数据类型系统上的工作,并向该新类型系统公开了聚合函数(UDAF)。从 Flink 1.12 开始,UDAF 的行为类似于标量和表函数,并支持所有数据类型。


为了扩展 PyFlink 的可用性,此版本引入了 Python DataStream API(FLIP-130)的第一个版本,该版本支持无状态操作(例如Map,FlatMap,Filter,KeyBy)。

from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment

class MyMapFunction(MapFunction):

    def map(self, value):
        return value + 1


env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")

Kubernetes 上的 PyFlink作业(FLINK-17480

除了独立部署和 YARN 部署之外,PyFlink 作业现在还可以本地部署在 Kubernetes 上。该部署文档都有详细的关于如何启动一个指令会话应用 Kubernetes 集群。

用户定义的汇总函数(UDAF)

在 Flink 1.12中,您可以在 PyFlink(FLIP-139)中定义和注册 UDAF 。与不处理状态并且一次只处理一行的普通 UDF 相比,UDAF 是有状态的,可用于计算多个输入行上的自定义聚合。要从矢量化中受益,您还可以使用Pandas UDAFFLIP-137)(最快10倍)。

注意:常规UDAF仅在 group aggregations and in streaming mode 下受支持。对于批处理模式或窗口聚合,请使用Pandas UDAF。


重要变化

发行说明

如果您打算将安装程序升级到Flink 1.12(注:官网写的是 1.11, 写错了),请仔细查看发行说明,以获取详细的更改和新功能列表。此版本与使用@Public批注进行批注的API的1.x早期版本兼容。

 

贡献者名单

Apache Flink社区要感谢使该版本成为可能的300位贡献者中的每一位

(重复太多,跳过)

 

 

 

 

 

 

 

标签:Announcement,1.12,Flink,Kafka,API,连接器,SQL,Table
来源: https://www.cnblogs.com/Springmoon-venn/p/14136153.html