其他分享
首页 > 其他分享> > Spark 写入HBase

Spark 写入HBase

作者:互联网

我们将Spark处理完的数据一般吸入外部存储系统中,常见的外部存储系统有HBase,MySQL,ElasticSearch,redis,HDFS等。

现在我们主要介绍Spark写入HBase的方法。废话不多说,先贴代码:

result.foreachPartition(it=>{

      //创建HBase连接
      val conn: client.Connection = HBaseUtil.getConnection("node2,node3,node4",2181)

      //创建容量为100的集合,存放批量的待写入HBase的数据
      val puts = new util.ArrayList[Put](100)
      val table = conn.getTable(TableName.valueOf("order"))
      //遍历迭代器中的数据
      it.foreach(bean=>{

        //设置数据,包括rk,列族的数据
        //设置rowkey
        val put = new Put(Bytes.toBytes(bean.oid))

        //设置列族的数据
        put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("category_name"),Bytes.toBytes(bean.categoryName))
        put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("money"),Bytes.toBytes(bean.money))

        //将put放入puts这个list中
        puts.add(put)

        if(puts.size()==100){
          //将数据写入HBase
          table.put(puts)
          puts.clear()
        }

      })
      //将没有达到100的数据也写入到HBase中
      table.put(puts)
      conn.close()
    }
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}

/**
  * Created by jihn88 on 2020/11/28.
  * HBase工具类,用来创建HBase 的connection
  * zkQuorum zookeeper地址,多个要用逗号分隔
  * port  zookeeper端口
  */
object HBaseUtil {
  def getConnection(zkQuorum: String,port: Int): Connection = synchronized{
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum",zkQuorum)
    conf.set("hbase.zookeeeper.property.clientPort",port.toString)
    ConnectionFactory.createConnection(conf)
  }

} 

我们这里使用HBase 中put API写入HBase。

HBaseUtil工具类用于建立HBase连接。这里使用foreachRDD算子,一个分区建立一个一次连接,如果使用foreach算子,一条数据就会建立一次连接,网络开销太大。

标签:val,puts,写入,Bytes,toBytes,put,Spark,HBase
来源: https://www.cnblogs.com/itachilearner/p/14090471.html