Flink 题目
作者:互联网
Flink 题目
从MySql中读取数据,通过Flink处理之后在存储到MySql中
package com.wt.flink.homework
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object Text1 {
/**
*
* 1、从数据库mysql中读取学生表的数据,
2、统计班级的人数
3、将统计好的结果保存到数据库中,一个班级只保存一条数据
*/
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//使用自定义的source读取mysql中的数据
val mysqlDS: DataStream[String] = env.addSource(new MySqlSource())
val countDS: DataStream[(String, Int)] = mysqlDS
.map(stu => (stu.split("\t")(4), 1))
.keyBy(_._1)
.sum(1)
//将统计号的结果保存到MySlq中
countDS.addSink(new MySlqSink)
//触发任务执行
env.execute()
}
/**
* 自定义source ,实现SourceFunction接口
*
*/
class MySqlSource extends SourceFunction[String] {
/**
* run: 用于读取外部数据的方法,只执行一次
*
* @param ctx : 上下文对象,用于将读取到的数据发送到下游
*/
override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
/**
* 使用jdbc读取mysql的数据,将读取到的数据发送到下游
*
*/
//创建连接
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
//编写查询的数据sql
val stat: PreparedStatement = conn.prepareStatement("select * from students")
//执行查询
val result: ResultSet = stat.executeQuery()
//解析数据
while(result.next()){
val id: Long = result.getLong("id")
val name: String = result.getString("name")
val age: Long = result.getLong("age")
val gender: String = result.getString("gender")
val clazz: String = result.getString("clazz")
//将每一条数据发送到下游
ctx.collect(s"$id\t$name\t$age\t$gender\t$clazz")
}
//关闭连接
stat.close()
conn.close()
}
//任务被取消的时候执行,一般用于回收资源
override def cancel(): Unit = {}
}
//写到mysql中
class MySlqSink extends RichSinkFunction[(String,Int)]{
var con: Connection = _
var stat: PreparedStatement = _
/**
* open:在invoke之前执行,每个task中只执行一次
* 一般用于初始化数据库的连接
*
*/
override def open(parameters:Configuration):Unit ={
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//创建链接
con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
//编写插入数据的sql
//replace :如果不存在插入,如果存在就替换,需要在表中设置主键
stat = con.prepareStatement("replace into clazz_num(clazz,num) values(?,?)")
}
/**
* 任务关闭时后执行,一般用于回收资源
*/
override def close():Unit={
//关闭连接
stat.close()
con.close()
}
/**
* 每一条数据会执行一次
* 使用jdbc将数据保存到mysql中
*
* @param kv : 一行数据
* @param context : 上下文对象
*/
override def invoke(kv: (String, Int), context: SinkFunction.Context): Unit = {
//设置参数
stat.setString(1, kv._1)
stat.setInt(2, kv._2)
//执行插入
stat.execute()
}
}
}
标签:stat,flink,题目,String,val,Flink,result,mysql 来源: https://www.cnblogs.com/atao-BigData/p/16513590.html