DStream输出操作-外连接mysql
作者:互联网
package org.hnsw import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkLearn { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Jxq").setMaster("local[*]") val sc = new StreamingContext(conf, Seconds(5)) val dString = sc.socketTextStream("192.168.3.66",8888) //统计指定窗口范围内 单词个数 val words = dString.flatMap((x)=>{ x.split(" ") }) val wordMap = words.map((x)=>{ (x,1) }) val wordcount = wordMap.reduceByKeyAndWindow((v1:Int,v2:Int)=>{ v1+v2 },Seconds(60),Seconds(30)) //简单打印 wordcount.print() //结果输出到外部系统 -- foreachRdd wordcount.foreachRDD((rdd)=>{ //执行在driver端 代码在哪运行 rdd.foreachPartition((partRdd)=>{ //执行在worker端 工作节点 //1)获取数据库连接 val connect = ConnectionPool.getConnection() val state = connect.createStatement() // 创建第三方预处理对象 connect.setAutoCommit(false) //默认是自动链接 关闭自动连接 //2)使用连接拼接sql insert into aaa ('a','b') partRdd.foreach((x)=>{ // state.addBatch("insert into searcheKeyWord(insert_name,keyword,search_count) values(now(), '" + x._1 +"','"+ x._2 + "')") state.addBatch("insert into searcheKeyWord(insert_time,keyword,search_count) values(now(), '" + x._1 + "','" + x._2+"')") }) //3)执行sql,提交数据到服务器 state.executeBatch() connect.commit() //最终执行命令是mysql服务器 //4)返回连接池 ConnectionPool.returnConnection(connect) }) }) sc.start() sc.awaitTermination() } }
连接池工具类
package org.hnsw.streaming import java.sql.{Connection, DriverManager} import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig} import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} object ConnectionPool { // 创建连接 val conFactory = new MysqlConnectionFactory("jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8", "root", "root", "com.mysql.jdbc.Driver") // GenericObjectPoolConfig 线程池的配置 val objectPoolConfig = new GenericObjectPoolConfig[Connection](); objectPoolConfig.setMaxTotal(100) // 创建线程池 private val pool = new GenericObjectPool[Connection](conFactory, objectPoolConfig) // 获取连接 def getConnection(): Connection = { pool.borrowObject() } //返回连接 def returnConnection(conn: Connection): Unit = { pool.returnObject(conn) } } class MysqlConnectionFactory(url: String, userName: String, password: String, className: String) extends BasePooledObjectFactory[Connection] { //创建连接对象 override def create(): Connection = { Class.forName(className) DriverManager.getConnection(url, userName, password) } /** * DefaultPooledObject对象对对象池中对象进行的包装。 * * 将我们自定义的对象放置到这个包装中,工具会统计对象的状态、创建时间、更新时间、返回时间、出借时间、使用时间等等信息进行统计 */ override def wrap(conn: Connection): PooledObject[Connection] = new DefaultPooledObject[Connection](conn) /* *激活对象 */ override def validateObject(pObj: PooledObject[Connection]) = !pObj.getObject.isClosed /* *销毁对象 */ override def destroyObject(pObj: PooledObject[Connection]) = pObj.getObject.close() }
下载链接
标签:输出,val,insert,org,Connection,mysql,new,DStream,def 来源: https://www.cnblogs.com/857weir09432/p/16362798.html