flink的DataStreamAPI
作者:互联网
一、WordCount流程
1 import org.apache.flink.streaming.api.scala._ 2 3 object StreamWordCount { 4 def main(args:Array[String]):Unit={ 5 //创建流处理的执行环境 6 val env=StreamExecutionEnvironment.getExecutionEnvironment; 7 8 //接受一个socket文本流即创建数据源 9 val dataStream=env.socketTextStream("localhost",7777); 10 11 //对每条数据进行处理 12 val wordCountDataStream=dataStream.flatMap(_.split(" ")) 13 .filter(_.nonEmpty) 14 .map(line=>(line,1)) 15 .keyBy(line=>line._1) 16 .sum(1); 17 //输出结果,可以直接输出也可以将处理的结果存储到外部系统中如kafka 18 wordCountDataStream.print(); 19 //flink的操作是惰性的,需要启动executor。 20 env.execute("stream WC job") 21 } 22 }
二、流程解析:env —> transform —> source
(1)创建环境Environment
1、getExecutionEnvironment:创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
val env= ExecutionEnvironment.getExecutionEnvironment,如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
2、createLocalEnvironment:返回本地执行环境,需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1),返回本地执行环境,需要在调用时指定默认的并行度。
3、createRemoteEnvironment:返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
(2)transform算子操作
1、map:val streamMap = stream.map { x => x * 2 }
2、flatMap:val streamFlatMap = stream.flatMap(x=>x.split(",")) //花括号与小括号均可以。
3、filter:val streamFilter=stream.filter(x=>x%2==1)
4、keyBy:DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key,DataStream → KeyedStream:逻辑上将一个流拆分成不相交的分区,每个分区包含具有相同hash(key)的元素。keyBy指定key的三种方法
1)根据字段位置keyBy(0),主要对tuple类型,pojo类会出错,注意:这个是相对于最外元素而言。对于tuple类型还可以指定key的位置keyBy(x=>x._1)
2)根据字段名称,主要是pojo类。stream.map(x=>Person(x,2)).keyBy("name"),也可以多字段分区如keyBy("name","age")
3)自定义keyselector
5、Reduce:KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值。
val sumLambdaStream = dataStream.keyBy("name").reduce((s1, s2) => Score(s1.name, "Sum", s1.score + s2.score))
val inputDataSet=env.fromCollection(List(1,2,3,4,5)).reduce((x,y)=>x+y).print()
6、
7、
8、
(3)
getExecutionEnvironment
标签:flink,env,val,keyBy,DataStreamAPI,key,stream 来源: https://www.cnblogs.com/hdc520/p/12988862.html