数据库
首页 > 数据库> > Spark往Redis里写入数据

Spark往Redis里写入数据

作者:互联网

以下是Redis连接池的代码:

/**
 * Redis连接池
 */
object RedisClient extends Serializable {

  val redisHost = "192.168.115.142"
  val redisPort = 6379
  val redisTimeout = 30000

  lazy val pool = new JedisPool(new JedisPoolConfig, redisHost, redisPort, redisTimeout)

  import scala.concurrent.ExecutionContext.Implicits.global

  /*
    lazy val hook = Future {
      println("Execute hook thread :" + this)
      pool.destroy()
    } onComplete {
      case Success(value) => println "success" + value
      case Failure(value) => println "failure" + value
    }
  */


}

以下是Streaming的代码:

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Streaming Process Data").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //streaming context class, need a time
    val ssc = new StreamingContext(sc, Seconds(3))
    //kafka的偏移量会存储到此处
    ssc.checkpoint("spark-receiver")

    //kafka conf
    val kafkaParams = Map[String,String] (
      "bootstrap.servers" -> "192.168.115.142:9092",
/*      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],*/
      "group.id" -> "spark_order"
    )
    val topics = Set("example_order")
    //KafkaUtils.creatDirectStream 利用低级api接收数据
    val kafkaStream = KafkaUtils.createDirectStream[String, String,StringDecoder,StringDecoder](
      ssc,
      kafkaParams,
      topics
    )
    //flatMap方法:1.调用者必须是一个可迭代的集合。2.返回类型为调用者的类型。3.传入函数的返回值必须是一个集合
    //RDD里的操作对象需要支持序列化,尽量在函数里声明对象
    val paymentInfos = kafkaStream.flatMap(record => Some(new Gson().fromJson(record._2,classOf[PaymentInfo])))
    val batchInfos = paymentInfos.map(v => (v.productId, v.productPrice)).groupByKey.map(v => (v._1, v._2.size,v._2.sum[Long]))
    batchInfos.foreachRDD(
      x =>
        x.foreachPartition(
          partition =>
            partition.foreach(
              v => {
                //get redis connection
                val jedis = RedisClient.pool.getResource
                //select database
                jedis.select(1)
                //add single sales
                //把Hash中对应的v._1元素的值加v._3
                jedis.hincrBy("orderTotalKey", v._1, v._3)
                //add all sales
                //将key对应的value+v._3
                jedis.incrBy("totalKey", v._3)
                //return redis connection
                jedis.close()
                println(v._1+" "+v._3)
              }
            )
        )
    )

    ssc.start()
    ssc.awaitTermination()
  }

需要说明的是Spark是需要把代码发送带执行器上去,如果对象不能完整的序列化,就会导致异常,比如RedisClient用properties动态加载会导致出错,若Gson用本地变量也会导致出错。

标签:String,val,Redis,写入,value,._,new,Spark,ssc
来源: https://blog.csdn.net/qq_33115589/article/details/114888484