Spark往Redis里写入数据
作者:互联网
以下是Redis连接池的代码:
/**
* Redis连接池
*/
object RedisClient extends Serializable {
val redisHost = "192.168.115.142"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new JedisPoolConfig, redisHost, redisPort, redisTimeout)
import scala.concurrent.ExecutionContext.Implicits.global
/*
lazy val hook = Future {
println("Execute hook thread :" + this)
pool.destroy()
} onComplete {
case Success(value) => println "success" + value
case Failure(value) => println "failure" + value
}
*/
}
以下是Streaming的代码:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Streaming Process Data").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//streaming context class, need a time
val ssc = new StreamingContext(sc, Seconds(3))
//kafka的偏移量会存储到此处
ssc.checkpoint("spark-receiver")
//kafka conf
val kafkaParams = Map[String,String] (
"bootstrap.servers" -> "192.168.115.142:9092",
/* "key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],*/
"group.id" -> "spark_order"
)
val topics = Set("example_order")
//KafkaUtils.creatDirectStream 利用低级api接收数据
val kafkaStream = KafkaUtils.createDirectStream[String, String,StringDecoder,StringDecoder](
ssc,
kafkaParams,
topics
)
//flatMap方法:1.调用者必须是一个可迭代的集合。2.返回类型为调用者的类型。3.传入函数的返回值必须是一个集合
//RDD里的操作对象需要支持序列化,尽量在函数里声明对象
val paymentInfos = kafkaStream.flatMap(record => Some(new Gson().fromJson(record._2,classOf[PaymentInfo])))
val batchInfos = paymentInfos.map(v => (v.productId, v.productPrice)).groupByKey.map(v => (v._1, v._2.size,v._2.sum[Long]))
batchInfos.foreachRDD(
x =>
x.foreachPartition(
partition =>
partition.foreach(
v => {
//get redis connection
val jedis = RedisClient.pool.getResource
//select database
jedis.select(1)
//add single sales
//把Hash中对应的v._1元素的值加v._3
jedis.hincrBy("orderTotalKey", v._1, v._3)
//add all sales
//将key对应的value+v._3
jedis.incrBy("totalKey", v._3)
//return redis connection
jedis.close()
println(v._1+" "+v._3)
}
)
)
)
ssc.start()
ssc.awaitTermination()
}
需要说明的是Spark是需要把代码发送带执行器上去,如果对象不能完整的序列化,就会导致异常,比如RedisClient用properties动态加载会导致出错,若Gson用本地变量也会导致出错。
标签:String,val,Redis,写入,value,._,new,Spark,ssc 来源: https://blog.csdn.net/qq_33115589/article/details/114888484