Flink常用API之HDFS文件Source
作者:互联网
package source
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @Author yqq
* @Date 2021/12/25 13:17
* @Version 1.0
*/
object HDFSFileSource {
def main(args: Array[String]): Unit = {
val ev = StreamExecutionEnvironment.getExecutionEnvironment
ev.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//读取HDFS上读取文件
val stream: DataStream[String] = ev.readTextFile("hdfs://mycluster/wc.txt")
//单词计算
stream.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
ev.execute("wordcount")
}
}
HDFS数据图
[root@node1 ~]# hdfs dfs -cat /wc.txt
21/12/25 14:52:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello tom
andy joy
hello rose
hello joy
mark andy
hello tom
andy rose
hello joy
标签:HDFS,joy,Flink,Source,andy,rose,ev,hello 来源: https://blog.csdn.net/manba_yqq/article/details/122143548