其他分享
首页 > 其他分享> > 4.9

4.9

作者:互联网

解压
在hadoop102节点服务器上创建安装目录/opt/module,将flink执行解压命令,解压至当前目录。
§tar-zxvf flink-1.13.0-bin-scala _2.12. tgz-C/opt/module/
flink-1.13.0/
flink-1.13.0γ1og/
flink-1.13.0/LICENSE
flink-1.13.0/lib/
3.启动
进入解压后的目录,执行启动命令,并查看进程。
$cd flink-1.13.0/
$bin/start-cluster.sh
Starting cluster.
Starting taskexecutor daemon on host hadoop102(hadoop102主机名)
$jps
10369 StandaloneSessionClusterEntrypoint
10680 TaskManagerRunner
10717 Jps

 

 

ackage com. atguigu. wc;
import org. apache. flink. api. java. ExecutionEnvironment ;

public class BoundedStreamWordCount {
public static void main(String[]args) {
1.创建流式的执行环境
StreamExecutionEnvironment env(自定义) streamExecutionEnvironmentecutionEnvironment();

//2.读取文件
Datastreams οurce<String) lineDatastreamSource =enν. readTextFile (hile(fiput/words. txt")

//3.转换计算
Singlecutuputstramperator < Tuple2KString , Long-> worldmonenuple =line0ates/ transmsrunce . flathttp/(String line, co).
String[]words=line. split(regex:“");
for(String word:words){
out. collect(Tuple2. of(word,1L));
}
})
returns(Types.TUPLE(Types,STRING, Types.LONG));

//4.分组
Keyed Stream. Cuplec<String, long>, String> wordqndonekeyed StreBm=world name Tuple, keygy(data->data, ff).

//5.求和
SingleOutputstreamoperatorkTuple χ(string, long>>sum awordOnekeyedStream . sum

//6、打印
sum. print();

//7.启动执行
env.execute();
}

标签:解压,1.13,sum,flink,4.9,hadoop102,String
来源: https://www.cnblogs.com/18396947681ww/p/16120570.html