其他分享
首页 > 其他分享> > Flink常用API之HDFS文件Source

Flink常用API之HDFS文件Source

作者:互联网

package source

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * @Author yqq
 * @Date 2021/12/25 13:17
 * @Version 1.0
 */
object HDFSFileSource {
  def main(args: Array[String]): Unit = {
    val ev = StreamExecutionEnvironment.getExecutionEnvironment
    ev.setParallelism(1)
    import org.apache.flink.streaming.api.scala._
    //读取HDFS上读取文件
    val stream: DataStream[String] = ev.readTextFile("hdfs://mycluster/wc.txt")
    //单词计算
    stream.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .print()
    ev.execute("wordcount")

  }
}

HDFS数据图

[root@node1 ~]# hdfs dfs -cat /wc.txt
21/12/25 14:52:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello tom 
andy joy 
hello rose 
hello joy 
mark andy 
hello tom 
andy rose 
hello joy

在这里插入图片描述

标签:HDFS,joy,Flink,Source,andy,rose,ev,hello
来源: https://blog.csdn.net/manba_yqq/article/details/122143548