最新最全的spark2.3读写操作hbase
作者:互联网
package com.yss.spark.hbase
import com.yss.utils.BasicPropertites
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.filter.{BinaryComparator, CompareFilter, RowFilter}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
- @author
- @version 2019-03-25 17:57
-
describe:
-
一.传统方式
-
这种方式就是常用的TableInputFormat和TableOutputFormat来读写hbase
*/
object SparkHbase {
def main(args: Array[String]): Unit = {
//初始化spark
val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//初始化hbase,指定zookeeper的参数
val config: Configuration = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", BasicPropertites.zookeeperQuorum)
config.set("hbase.zookeeper.property.clientPort", BasicPropertites.zookeeperClientPort)
config.set("zookeeper.znode.parent", BasicPropertites.zookeeperParent)
//spark 读取Hbase 全表 数据
sparkReadAllHbase(spark, config).show()
//spark 根据filter 过滤条件读取Hbase数据
sparkReadFilterHbase(spark, config).show
//spark 将数据写入到hbase
sparkWriteToHbase(spark, config)
spark.stop()
}
/**
* spark 读取Hbase 全表数据
* @param spark
* @param config
*/
def sparkReadAllHbase(spark: SparkSession, config: Configuration) = {
val sc: SparkContext = spark.sparkContext
//设定读取的表名,只是针对于读取数据的,和写数据无关,如果没有读取数据,可以不写
config.set(TableInputFormat.INPUT_TABLE,"zhs")
//从hbase获取一张表的所有数据,得到一个RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(config,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
//构建Row类型的RDD
val rowRDD = hbaseRDD.map(p => {
val rowkey = Bytes.toString(p._2.getRow)
val name = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))
val age = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))
val id = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("id")))
Row(name,age.toInt,id)
})
//构造DataFrame的元数据
val schema = StructType(List(
StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("id",StringType,true)
))
//构造DataFrame
val dataFrame = spark.createDataFrame(rowRDD,schema)
//注册成为临时表供SQL查询操作
dataFrame.createTempView("alltable")
val result: DataFrame = spark.sql("select * from alltable")
result
}
/**
* spark 读取Hbase中指定条件的过滤
* @param spark
* @param config
*/
def sparkReadFilterHbase(spark: SparkSession, config: Configuration) = {
val sc: SparkContext = spark.sparkContext
//设定读取的表名,只是针对于读取数据的,和写数据无关,如果没有读取数据,可以不写
config.set(TableInputFormat.INPUT_TABLE,"zhs")
//创建scan对象
val scan = new Scan()
scan.setCaching(100)//默认值100,设置太大容易出现内存溢出
//设定读取hbase的filter条件
val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("20180325#1001")))
scan.setFilter(filter)
//将scan类转化成string类型
val proto= ProtobufUtil.toScan(scan)
val ScanToString = Base64.encodeBytes(proto.toByteArray());
config.set(TableInputFormat.SCAN,ScanToString)
//开始读取Hbase数据
val hBaseRDD = sc.newAPIHadoopRDD(config, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
//构建Row类型的RDD
val rowRDD = hBaseRDD.map(p => {
val rowkey = Bytes.toString(p._2.getRow)
val name = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))
val age = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))
val id = Bytes.toString(p._2.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("id")))
Row(name,age.toInt,id)
})
//构造DataFrame的元数据
val schema = StructType(List(
StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("id",StringType,true)
))
//构造DataFrame
val dataFrame = spark.createDataFrame(rowRDD,schema)
//注册成为临时表供SQL查询操作
dataFrame.createTempView("filtertable")
val result: DataFrame = spark.sql("select * from filtertable")
result
}
/**
* spark 读取Hbase中指定条件的过滤
* @param spark
* @param config
*/
def sparkWriteToHbase(spark: SparkSession, config: Configuration) = {
val sc: SparkContext = spark.sparkContext
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "zhs")
//创建RDD
val indataRDD = sc.makeRDD(Array("1003,jack,15","1004,Lily,16","1005,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes("20180327"+"#"+arr(0)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("id"),Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put) }
}
rdd.saveAsHadoopDataset(jobConf)
println("成功写入Hbase")
}
}
标签:val,读写操作,Bytes,spark2.3,toBytes,spark,config,hbase 来源: https://blog.csdn.net/weixin_42728895/article/details/88842688