其他分享
首页 > 其他分享> > Seatunnel超高性能分布式数据集成平台使用体会

Seatunnel超高性能分布式数据集成平台使用体会

作者:互联网

@

目录

概述

定义

http://seatunnel.incubator.apache.org/

https://github.com/apache/incubator-seatunnel

SeaTunnel是一个非常易用的超高性能分布式数据集成平台,在企业中由于开发时间或开发部门不通用往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行;而一个有价值的数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享;SeaTunnel 支持海量数据的实时同步,它每天可以稳定高效地同步数百亿的数据,并已用于近100家公司的生产。最新版本为v2.1.3(作为下一代高性能、分布式、海量数据集成框架)

使用场景

特点

工作流程

SeaTunnel 有丰富的连接器且以 Spark 和 Flink 为引擎,所以可以很好地进行分布式的海量数据同步。一般来说使用SeaTunnel作为出仓入仓的工具,或者用于数据集成,主要流程如下:

Source[Data Source Input] -> Transform[Data Processing] -> Sink[Result Output]

image-20220815101853167

数据处理管道由多个滤波器组成,以满足各种数据处理需求,最简单有效就是通过SQL直接构造数据处理管道。目前,SeaTunnel支持的过滤列表还在扩展中。此外,还可以开发自己的数据处理插件,得益于SeaTunnel系统基于插件化设计思想实现的强可扩展性。

连接器

连接器在实际开发中的使用直接查找官方对应章节即可

image-20220824124951256

转换

用于转换或过滤插件包括如下:添加校验、转换、日期、删除、Grok、Json、KV、大写、小写、删除、重命名、重分区、替换、样本、拆分、Sql、表、截断、Uuid,自主开发的过滤器插件。详细可查阅官网地址

image-20220824124611944

为何选择SeaTunnel

SeaTunnel将尽力解决在海量数据同步过程中可能遇到的问题:

安装

下载

Seatunnel支持多种安装方式,包括locally本地的二进制安装、docker安装、k8s安装,我们先以locally的方式安装和演示。

# 下载
wget https://dlcdn.apache.org/incubator/seatunnel/2.1.3/apache-seatunnel-incubating-2.1.3-bin.tar.gz
# 解压
tar -xvf apache-seatunnel-incubating-2.1.3-bin.tar.gz
# 进入目录
cd apache-seatunnel-incubating-2.1.3

配置文件

在config/seatunnel-env.sh中更改设置;如果使用Spark作为引擎,请更改SPARK_HOME,如果使用Flink,请更改FLINK_HOME默认的话,SPARK_HOME和FLINK_HOME用的都是对应的系统环境变量值,如果没有,使用:-后面的值,按需修改即可。我们本篇示例以Flink为主,在前面关于Flink文章提前设置好FLINK_HOME这个环境变量,这里直接使用即可。

image-20220824122925561

在SeaTunnel中,最重要的是配置文件,可以通过配置文件定制自己的数据同步需求,最大限度地发挥SeaTunnel的潜力。配置文件包含几个部分:env、source、transform、sink;相关模块功能如下描述:

部署模式

在缩放中运行引擎,本节是关于引擎的,不是SeaTunnel本身,不做过多阐述,Spark和Flink都可以运行在不同类型的集群中,并且可以运行在任何规模的集群中。下面只展示建立在Spark或Flink引擎之上的SeaTunnel的基本用法,如果想扩展引擎集群请参阅Spark或Flink官网文档。

./bin/start-seatunnel-spark.sh \
--master local[4] \
--deploy-mode client \
--config ./config/application.conf
bin/start-seatunnel-flink.sh \
--config config-path

# -p 2 specifies that the parallelism of flink job is 2. You can also specify more parameters, use flink run -h to view
bin/start-seatunnel-flink.sh \
-p 2 \
--config config-path
bin/start-seatunnel-flink.sh \
-m yarn-cluster \
--config config-path

# -ynm seatunnel specifies the name displayed in the yarn webUI as seatunnel, you can also specify more parameters, use flink run -h to view
bin/start-seatunnel-flink.sh \
-m yarn-cluster \
-ynm seatunnel \
--config config-path
# cluster mode
./bin/start-seatunnel-spark.sh \
--master mesos://ip:7077 \
--deploy-mode cluster \
--config ./config/application.conf

入门示例

如何部署Flink可以参考前一篇文章《新一代分布式实时流处理引擎Flink入门实战操作篇]》,在前面文章中部署的是最新版本1.15.1的,超过SeaTunnel官网支持Flink版本范围,因此可选择安装flink1.12.7或flink-1.13.6的版本,安装过程步骤参考之前文章

image-20220826143230162

部署好flink1.12.7或flink-1.13.6的Flink,启动Standalone集群

# 环境变量配置flink得Home目录
export FLINK_HOME=/home/commons/flink-1.12.7
# 进入Flink目录
cd /home/commons/flink-1.12.7
# 启动flink的Standalone集群
./bin/start-cluster.sh

image-20220826141113043

在config目录下官网提供flink和spark的多个简易配置模板,上一节说到SeaTunnel的连接器是非常丰富的,入门示例就以flink简单流式处理从Socket数据源读取数据,转换后输出到Console控制台,配置模板使用官网提供的flink.streaming.conf.template,创建flink-streaming-test-01.conf,内容如下:

vi config/flink-streaming-test-01.conf

env {
  # flink 配置
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
# 配置数据源
source {  
  SocketStream {
    host = hadoop1
    result_table_name = "socket_demo_table"
    field_name = "info"    
  }
}
# 配置转换插件
transform {
  Split{
    separator = "#"
    fields = ["name","age"]
  }
  sql {
    sql = "select info,split(info) from socket_demo_table"
  }
}
# 声明输出
sink {
  ConsoleSink {}
}

保存配置内容后,先在hadoop1开一个连接窗口,开启一个netcat服务来发送数据,nc -lk 9999 监听socket端口

# 启动seatunnel
./bin/start-seatunnel-flink.sh \
--config ./config/flink-streaming-test-01.conf

在hadoop1也即是ckserver1监听端口按#分隔输出下面几条数据

image-20220826144324984

访问Flink的控制台UI页面http://hadoop1:8081/ ,找到刚运行的任务的日志,可以看到已经将info字段拆分为name和age两个字段输出

image-20220826144226067

启动脚本

在前面示例使用start-seatunnle-flink.sh可以指定3个参数,分别是

配置文件使用参数示例

使用前面的入门示例,输出年龄大于指定输出参数年龄的数据,修改transform的sql语句

sql = "select * from (select info,split(info) as info_record from socket_demo_table) where age > '"${age}"'"

保存配置内容后,先在hadoop1开一个连接窗口,开启一个netcat服务来发送数据,nc -lk 9999 监听socket端口

# 启动seatunnel
./bin/start-seatunnel-flink.sh \
--config ./config/flink-streaming-test-01.conf -i age=25

image-20220826160036926

在hadoop1也即是ckserver1监听端口按#分隔输出下面几条数据

image-20220826155951605

访问Flink的控制台UI页面http://hadoop1:8081/ ,找到刚运行的任务的日志,可以看到已经将info字段拆分为name和age两个字段输出

image-20220826154452452

Kafka进Kafka出的ETL示例

编写配置文件,vi config/flink-streaming-test-02.conf

env {
  # flink 配置
  execution.parallelism = 1
}
# 配置数据源
source {  
  KafkaTableStream {
    consumer.bootstrap.servers = "192.168.5.120:9092,192.168.5.121:9092,192.168.5.122:9092"
    consumer.group.id = "seatunnel-test"
    topics = seatunnel-kafka-in
    result_table_name = test
    format.type = csv
    schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
    format.field-delimiter = ";"
    format.allow-comments = "true"
    format.ignore-parse-errors = "true"
  }
}
# 配置转换插件
transform {
  sql {
    sql = "select name,age from test where age > '"${age}"'"
  }
}
# 声明输出
sink {
  kafka {
    topics = "seatunnel-kafka-out"
    producer.bootstrap.servers = "192.168.5.120:9092,192.168.5.121:9092,192.168.5.122:9092"
  }
}

创建kafka的测试示例的输入和输出的topic

# 如果先创建过可以执行先删除topic,可选
./kafka-topics.sh --delete --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic seatunnel-kafka-in
./kafka-topics.sh --delete --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic seatunnel-kafka-out

./kafka-topics.sh --create --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --replication-factor 1 --partitions 3 --topic seatunnel-kafka-in
./kafka-topics.sh --create --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --replication-factor 1 --partitions 3 --topic seatunnel-kafka-out

启动seatunnel

./bin/start-seatunnel-flink.sh --config ./config/flink-streaming-test-02.conf -i age=25

image-20220826175846419

查询Flink的控制台UI页面http://hadoop1:8081/ ,找到刚运行的任务查看其概览可以看到使用Kafka的Source和Sink

image-20220826181038097

kafka生产数据

./kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic seatunnel-kafka-out

image-20220826175905174

kafka消费数据,结果是正确的

# 消费输出
./kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic seatunnel-kafka-in

image-20220826175825204

-p 2指定flink job的并行度为2,还也可以指定更多的参数,使用flink运行-h查看,

# 将配置文件中env中execution.parallelism注释掉,改为参数传递的方式
# execution.parallelism = 1
# 启动seatunnel
./bin/start-seatunnel-flink.sh --config ./config/flink-streaming-test-02.conf -p 2 -i age=25

访问Flink的控制台UI页面http://hadoop1:8081/ 查看当前job有两个,3个slot使用了2个,可用只剩1个

image-20220826182543019

**本人博客网站 **IT小神 www.itxiaoshen.com

标签:seatunnel,Flink,--,flink,Seatunnel,高性能,SeaTunnel,config,分布式
来源: https://www.cnblogs.com/itxiaoshen/p/16629671.html