Flink APIs(数据来源,数据流向)
作者:互联网
Flink APIs
1. flink apis
2. Flink版的WordCount
package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
object Demo1WordCount {
def main(args: Array[String]): Unit = {
/**
* 1.创建flink的环境
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//此种只能作用于有界流,当作用在无界流时就会报错
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
//设置flink任务的并行度
//默认和电脑的核数有关
env.setParallelism(2)
//数据从上游发送到下游的超时时间
//默认是200毫秒
env.setBufferTimeout(200)
/**
* 2.读取数据
*
*/
val lines: DataStream[String] = env.socketTextStream("master", 8888)
/**
* 3.统计单词的数量
*/
//将一行转换为多行
val words: DataStream[String] = lines.flatMap(lines => lines.split(","))
//转化为kv格式
val kvDS: DataStream[(String, Int)] = words.map(words => (words, 1))
//按照单词分组
val groupByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
//对value进行汇总
val countDS: DataStream[(String, Int)] = groupByDS.sum(1)
/**
* 4.查看数据
*/
countDS.print()
/**
* 5.启动flink 的程序
*
*/
env.execute()
}
}
3. Transformation
4. Source:数据源
-
ListSource
package com.wt.flink.scurce import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.scala._ object Demo1ListSource { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * flink: 执行模式 * * RuntimeExecutionMode.BATCH:批处理模式,只能用于有界流,计算输出最终的结果 * * RuntimeExecutionMode.STREAMING : 流处理模式,可以用于有界流也可以用于五界流,输出连续的结果 * */ env.setRuntimeMode(RuntimeExecutionMode.BATCH) /** * 集合本地集合source - 有界流 * * 当读取的数据源是一个有界流时,flink处理完数据就结束了 * */ val lineDS: DataStream[String] = env.fromCollection(List("java,spark", "java,hadoop", "java")) lineDS .flatMap(_.split(",")) .map((_,1)) .keyBy(_._1) .sum(1) .print() env.execute() } }
-
FileSource
package com.wt.flink.scurce import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.scala._ object Demo2FileSource { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //批处理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH) /** * 基于集合构建source -- 有界流 * */ val studentsDS: DataStream[String] = env.readTextFile("data/students.txt") val clazzDS: DataStream[(String, Int)] = studentsDS.map(stu => (stu.split(",")(4), 1)) val clazzNumDS: DataStream[(String, Int)] = clazzDS.keyBy(kv => kv._1).sum(1) clazzNumDS.print() env.execute() } }
-
SocketSource
package com.wt.flink.scurce import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.scala._ object Demo3SocketSource { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //定义无界流 env.setRuntimeMode(RuntimeExecutionMode.STREAMING) /** * 基于socket构建source --无界流 * * 无界流只能使用流处理模式,不能使用批处理模式 */ val linesDS: DataStream[String] = env.socketTextStream("master", 8888) linesDS .flatMap(_.split(",")) //按照,分割数据 .map((_,1)) //将每个数据尾部拼成(k,1)变成KV格式, .keyBy(_._1) //按照K来进行分组 .sum(1) //对v进行求和 .print() env.execute() } }
-
MySqlSource
package com.wt.flink.scurce import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} object Demo4MySqlSource { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //使用自定义的source读取mysql中的数据 val mysqlDS: DataStream[String] = env.addSource(new MySqlSource()) mysqlDS .map(stu => (stu.split("\t")(4), 1)) .keyBy(_._1) .sum(1) .print() env.execute() } /** * 自定义source ,实现SourceFunction接口 * */ class MySqlSource extends SourceFunction[String] { /** * run: 用于读取外部数据的方法,只执行一次 * * @param ctx : 上下文对象,用于将读取到的数据发送到下游 */ override def run(ctx: SourceFunction.SourceContext[String]):Unit = { /** * 使用jdbc读取mysql的数据,将读取到的数据发送到下游 * */ //创建连接 Class.forName("com.mysql.jdbc.Driver") val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17", "root", "123456") //编写查询的数据sql val stat: PreparedStatement = conn.prepareStatement("select * from students") //执行查询 val result: ResultSet = stat.executeQuery() //解析数据 while(result.next()){ val id: Long = result.getLong("id") val name: String = result.getString("name") val age: Long = result.getLong("age") val gender: String = result.getString("gender") val clazz: String = result.getString("clazz") //将每一条数据发送到下游 ctx.collect(s"$id\t$name\t$age\t$gender\t$clazz") } //关闭连接 stat.close() conn.close() } //任务被取消的时候执行,一般用于回收资源 override def cancel(): Unit = {} } }
5. Sink数据发送到哪里
-
FileSink
package com.wt.flink.sink import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.configuration.MemorySize import org.apache.flink.connector.file.sink.FileSink import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala._ object Demo1FileSink { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.BATCH) //读取数据 val studentDS: DataStream[String] = env.readTextFile("data/students.txt") //统计i班级的人数 val kvDS: DataStream[(String, Int)] = studentDS.map(stu => (stu.split(",")(4), 1)) val countDS: DataStream[(String, Int)] = kvDS.keyBy(_._1).sum(1) //将统计好的结果保存到文件中 //老版本 //countDS.writeAsText("data/flink/clazz_num") //新版本的api val sink: FileSink[(String, Int)] = FileSink .forRowFormat(new Path("data/flink/clazz_num"), new SimpleStringEncoder[(String, Int)]("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() //至少包含多少时间的数据 //.withRolloverInterval(Duration.ofSeconds(10)) //多少时间没有新的数据 //.withInactivityInterval(Duration.ofSeconds(10)) //数据达到多大 .withMaxPartSize(MemorySize.ofMebiBytes(1)) .build()) .build() //使用file sink countDS.sinkTo(sink) env.execute() } }
-
Print
package com.wt.flink.sink import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object Demo2Print { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val lineDS: DataStream[String] = env.socketTextStream("master", 8888) lineDS.print() env.execute() } }
-
SinkFunction 自定义函数
package com.wt.flink.sink import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object Demo3SinkFunction { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //使用自定义的Sink linesDS.addSink(new SinkFunction[String]{ /** * invoke 每一条数据都会执行一次 * * @param value : 一条数据 * @param context : 上下文对象 */ override def invoke(value: String, context: SinkFunction.Context): Unit = { println(value) } }) env.execute() } }
-
MySqlSink
package com.wt.flink.sink import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import java.sql.{Connection, DriverManager, PreparedStatement} object Demo4MysqlSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val lineDS: DataStream[String] = env.socketTextStream("master", 8888) //统计单词的数量 val countDS: DataStream[(String, Int)] = lineDS .flatMap(_.split(",")) .map((_, 1)) .keyBy(_._1) .sum(1) //将统计号的结果保存到MySlq中 countDS.addSink(new MySlqSink) env.execute() } /** * 自定斯诺克将数据保存到mysql中 * SinkFunction: * RichSinkFunction: 多个open和close方法 * */ class MySlqSink extends RichSinkFunction[(String,Int)]{ var con: Connection = _ var stat: PreparedStatement = _ /** * open:在invoke之前执行,每个task中只执行一次 * 一般用于初始化数据库的连接 * */ override def open(parameters:Configuration):Unit ={ //1、加载驱动 Class.forName("com.mysql.jdbc.Driver") //创建链接 con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17", "root", "123456") //编写插入数据的sql //replace :如果不存在插入,如果存在就替换,需要在表中设置主键 stat = con.prepareStatement("replace into word_num(word,num) values(?,?)") } /** * 任务关闭时后执行,一般用于回收资源 */ override def close():Unit={ //关闭连接 stat.close() con.close() } /** * 每一条数据会执行一次 * 使用jdbc将数据保存到mysql中 * * @param kv : 一行数据 * @param context : 上下文对象 */ override def invoke(kv: (String, Int), context: SinkFunction.Context): Unit = { //设置参数 stat.setString(1, kv._1) stat.setInt(2, kv._2) //执行插入 stat.execute() } } }
标签:Flink,String,val,APIs,flink,apache,env,数据流,import 来源: https://www.cnblogs.com/atao-BigData/p/16513567.html