flink-9-算子(Operators)
作者:互联网
1.map:
调用:
val stream = environment.readTextFile("data/access.log") println(stream.parallelism) val accessStream = stream.map( x => { val splits = x.split(",") val time = splits(0).trim.toLong val domain = splits(1).trim val traffices = splits(2).trim.toDouble Access(time, domain, traffices) }) accessStream.print()
分组求和:
val stream = environment.readTextFile("data/access.log") println(stream.parallelism) val accessStream = stream.map( x => { val splits = x.split(",") val time = splits(0).trim.toLong val domain = splits(1).trim val traffices = splits(2).trim.toDouble Access(time, domain, traffices) }) accessStream.keyBy(1).sum(2).print() // 过时的 accessStream.keyBy(_.domain).sum(2).print() // 不过时的
看看map底层源码:
再看看map(mapper)的map是干啥的:
看这个map方法,大部分算子都用了transform,所以我们可以直接用它:
下面我们来个自定义的map:
刚才那个跑完就关了,看不到web,下面改改代码
val stream = environment.socketTextStream("182.92.99.53", 8081) stream.map(x => { x.toInt * 10 }).print()
我们先观察一下print的写法:(有个addsink方法)
下面我们来改map的源码:
注意这个方法,java的多了个类型:
我们去找scala的写法:
下面我们根据java的map方法写我们自己实现的方法:
第一步:
根据这个我们也能发现:
具体实现:
结果:
filter的写法:
flatMap的:
再看看更底层的:(这个是最底层的)
来实现一下:(接口的都是没实现的,所以我们这里继承一个实现类,再继承这个单操作符的接口)
参考其他算子的源码,我们可以知道,map是一进一出的,所以我们这里是要指定的,这里方便演示,类型我们都弄成Int来做:
class ZxStreamMap extends AbstractStreamOperator[Int] with OneInputStreamOperator[Int, Int] { override def processElement(element: StreamRecord[Int]): Unit = { val result = element.getValue.toInt * 10 element.replace(result) output.collect(element) } }
调用报错:
stream.transform("ZxMap2", new ZxStreamMap)
原因是这里进来是一个String类型:
所以我们要改一下:
结果ok:
stream.transform("ZxMap2", new ZxStreamMap).print()
FlatMap:
看结果:(pk被干掉了不输出)
自定义实现:
class ZxStreamFlatMap extends AbstractStreamOperator[String] with OneInputStreamOperator[String, String] { override def processElement(element: StreamRecord[String]): Unit = { val splits = element.getValue.split(",") splits.filter(_ != "pk").map(x => { output.collect(element.replace(x)) }) } }
KeyBy:
来份数据:(省,市,访问量)
因为有中间状态的,看最后几行就行了:
Tuple这种方式就不过时了:(建议用case class来做,现在的下标不清晰)
用KeySelector方式:
进来是一个tuple,key是两个字段所以也是tuple
实现一下:
用case class的方式:(这种方式更直观,推荐用)
最后是转成KeyedStream类型的:
标签:map,splits,val,Operators,flink,element,算子,print,stream 来源: https://www.cnblogs.com/unknowsthing/p/14941643.html