Spark 写入HBase
作者:互联网
我们将Spark处理完的数据一般吸入外部存储系统中,常见的外部存储系统有HBase,MySQL,ElasticSearch,redis,HDFS等。
现在我们主要介绍Spark写入HBase的方法。废话不多说,先贴代码:
result.foreachPartition(it=>{ //创建HBase连接 val conn: client.Connection = HBaseUtil.getConnection("node2,node3,node4",2181) //创建容量为100的集合,存放批量的待写入HBase的数据 val puts = new util.ArrayList[Put](100) val table = conn.getTable(TableName.valueOf("order")) //遍历迭代器中的数据 it.foreach(bean=>{ //设置数据,包括rk,列族的数据 //设置rowkey val put = new Put(Bytes.toBytes(bean.oid)) //设置列族的数据 put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("category_name"),Bytes.toBytes(bean.categoryName)) put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("money"),Bytes.toBytes(bean.money)) //将put放入puts这个list中 puts.add(put) if(puts.size()==100){ //将数据写入HBase table.put(puts) puts.clear() } }) //将没有达到100的数据也写入到HBase中 table.put(puts) conn.close() }
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} /** * Created by jihn88 on 2020/11/28. * HBase工具类,用来创建HBase 的connection * zkQuorum zookeeper地址,多个要用逗号分隔 * port zookeeper端口 */ object HBaseUtil { def getConnection(zkQuorum: String,port: Int): Connection = synchronized{ val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum",zkQuorum) conf.set("hbase.zookeeeper.property.clientPort",port.toString) ConnectionFactory.createConnection(conf) } }
我们这里使用HBase 中put API写入HBase。
HBaseUtil工具类用于建立HBase连接。这里使用foreachRDD算子,一个分区建立一个一次连接,如果使用foreach算子,一条数据就会建立一次连接,网络开销太大。
标签:val,puts,写入,Bytes,toBytes,put,Spark,HBase 来源: https://www.cnblogs.com/itachilearner/p/14090471.html