其他分享
首页 > 其他分享> > flink-9-算子(Operators)

flink-9-算子(Operators)

作者:互联网

 

 

 1.map:

 

 

 

 

调用:

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessStream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.toDouble
      Access(time, domain, traffices)
    })
    accessStream.print()

 

 

 分组求和:

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessStream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.toDouble
      Access(time, domain, traffices)
    })
    accessStream.keyBy(1).sum(2).print()  // 过时的
    accessStream.keyBy(_.domain).sum(2).print()  // 不过时的

 

 

 看看map底层源码:

 再看看map(mapper)的map是干啥的:

 

 看这个map方法,大部分算子都用了transform,所以我们可以直接用它:

 

 

 

下面我们来个自定义的map:

 

 

 

 刚才那个跑完就关了,看不到web,下面改改代码

    val stream = environment.socketTextStream("182.92.99.53", 8081)
    stream.map(x => {
      x.toInt * 10
    }).print()

 

 

 我们先观察一下print的写法:(有个addsink方法)

 

 

 下面我们来改map的源码:

注意这个方法,java的多了个类型:

 

 

 我们去找scala的写法:

 

 

 下面我们根据java的map方法写我们自己实现的方法:

 

 

 第一步:

 

 

 根据这个我们也能发现:

 

 

 

具体实现:

 

结果:

 

 

 

 

 

filter的写法:

flatMap的:

 

 

 

 

再看看更底层的:(这个是最底层的)

 

 来实现一下:(接口的都是没实现的,所以我们这里继承一个实现类,再继承这个单操作符的接口)

 

 

 参考其他算子的源码,我们可以知道,map是一进一出的,所以我们这里是要指定的,这里方便演示,类型我们都弄成Int来做:

class ZxStreamMap extends AbstractStreamOperator[Int] with OneInputStreamOperator[Int, Int] {
  override def processElement(element: StreamRecord[Int]): Unit = {
    val result = element.getValue.toInt * 10
    element.replace(result)
    output.collect(element)
  }
}

调用报错:

stream.transform("ZxMap2", new ZxStreamMap)

 

 

 原因是这里进来是一个String类型:

 

 

 所以我们要改一下:

 

 

 结果ok:

stream.transform("ZxMap2", new ZxStreamMap).print()

 

 

 

FlatMap:

 

 

 

看结果:(pk被干掉了不输出)

 

 

 自定义实现:

class ZxStreamFlatMap extends AbstractStreamOperator[String] with OneInputStreamOperator[String, String] {
  override def processElement(element: StreamRecord[String]): Unit = {
    val splits = element.getValue.split(",")
    splits.filter(_ != "pk").map(x => {
      output.collect(element.replace(x))
    })
  }
}

 

KeyBy:

来份数据:(省,市,访问量)

 

 

因为有中间状态的,看最后几行就行了:

 

 Tuple这种方式就不过时了:(建议用case class来做,现在的下标不清晰)

 

 

用KeySelector方式:

进来是一个tuple,key是两个字段所以也是tuple

 

 

实现一下:

用case class的方式:(这种方式更直观,推荐用)

 

 

最后是转成KeyedStream类型的:

 

标签:map,splits,val,Operators,flink,element,算子,print,stream
来源: https://www.cnblogs.com/unknowsthing/p/14941643.html