spark streaming整合kafka中非聚合类运算如何和kafka保持exactly once一致性语义(幂等性方式)
作者:互联网
object KafkaToHbase {
def main(args: Array[String]): Unit = {
//true a1 g1 ta,tb
val Array(isLocal, appName, groupId, allTopics) = args
val conf = new SparkConf()
.setAppName(appName)
if (isLocal.toBoolean) {
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(5000))
val topics = allTopics.split(",")
//SparkSteaming跟kafka整合的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId,
"auto.offset.reset" -> "earliest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
"enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量
)
//查询历史偏移量【上一次成功写入到数据库的偏移量】
val historyOffsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromHbase("myorder", groupId)
//跟Kafka进行整合,需要引入跟Kafka整合的依赖
//createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费
//直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffsets) //指定订阅Topic的规则, 从历史偏移量接着读取数据
)
kafkaDStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//获取KakfaRDD的偏移量
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//获取KafkaRDD中的数据
val lines: RDD[String] = rdd.map(_.value())
val orderRDD: RDD[Order] = lines.map(line => {
var order: Order = null
try {
order = JSON.parseObject(line, classOf[Order])
} catch {
case e: JSONException => {
//TODO
}
}
order
})
//过滤问题数据
val filtered: RDD[Order] = orderRDD.filter(_ != null)
filtered.foreachPartition(iter => {
if (iter.nonEmpty) {
//先获取当前Task的分区编号,然后根据Task分区编号再获取当前分区的偏移量
val offsetRange = offsetRanges(TaskContext.get.partitionId)
//获取一个Hbase的Connection【在Executor端获取的】
val connection: Connection = HBaseUtil.getConnection("node-1.51doit.cn,node-2.51doit.cn,node-3.51doit.cn", 2181)
val t_orders: Table = connection.getTable(TableName.valueOf("myorder"))
//定义一个集合,将数据先缓存到集合中
val puts = new util.ArrayList[Put]()
//迭代分区中的每一条数据
iter.foreach(o => {
// new 了一个put,就是hbase一行数据
val put = new Put(Bytes.toBytes(o.oid))
//put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("order_id"), Bytes.toBytes(o.oid))
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("total_money"), Bytes.toBytes(o.totalMoney))
//如果是一个批次中的最后一条数据,将偏移量和数据同时写入Hbase的同一行中
if (!iter.hasNext) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val untilOffset = offsetRange.untilOffset
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("groupid"), Bytes.toBytes(groupId))
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic_partition"), Bytes.toBytes(topic + "_" + partition))
put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(untilOffset))
}
puts.add(put)
// if (puts.size() % 5 == 0) {
// t_orders.put(puts)
// puts.clear()
// }
})
//批量写入
t_orders.put(puts)
//关闭Hbase的table
t_orders.close()
//关闭Hbase连接
connection.close()
}
})
}
})
ssc.start()
ssc.awaitTermination()
}
}
标签:String,val,exactly,Bytes,偏移量,kafka,streaming,toBytes,put 来源: https://www.cnblogs.com/xstCoding/p/16111676.html