HBase实践:HBase2.x协处理器同步数据到数据仓库(那些你不知道的坑)
作者:互联网
目录
前言
HBase是基于Hadoop存储的一种超大型KV数据库,从字面意思可以看出HBase对KV结构支持比较友好,虽然现在还支持Phoenix查询,但是对于很多应用场景中,我们需要HBase廉价的存储和支持超大高并发查询的性能,但是我们不仅仅是想通过rowkey来获取对应的数据,还可能通过其他字段获取,什么我想像普通数据库一样求和,统计数量都难以达到,这种情况又要如何处理呢?
HBase自带的协处理器(Coprocessor)功能,很多公司对HBase进行了深度开发,利用Coprocessor实现了像标准数据库一样的操作,可以为我们提供一些HBase本身无法实现的功能。
一. 什么是协处理器
简单来说,Coprocessor是一个框架,这个框架可以让你很容易地在Region Server运行你的业务逻辑代码。
Coprocessor主要分为两种类型:Observer Coprocessor和EndPoint Coprocessor,如果和RDBMS做类比的话:
- Observer Coprocessor –> RDBMS 中的触发器
- EndPoint Coprocessor –> RDBMS中的存储过程
Observer Coprocessor
类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子, 在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执 行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。
Endpoint Coprocessor
类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处 理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见 的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的 操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行, 势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最 大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客 户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体 的执行效率就会提高很多
二.开发案列
应用场景:
Mysql的维度表Binlog日志实时同步到HBase后,当维度表更新时想要第一时间获知,且想要同步到数仓表中该怎么办,这里利用了HBase Observer Coprocessor获取binlog的Update和Delete行为数据,在即将Put前同步到数仓中。
Maven:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.1.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.9</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.18</version>
</dependency>
开发代码:
这里使用的是HBase2.1.9版本,HBase2.x版本的API会有很大的变化,需要继承
RegionObserver, RegionCoprocessor两个接口,重写getRegionObserver,start,stop,prePut,preDelete方法.
public class GreenplumObserver implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
/**
* 初始化Greenplum连接
* @param env
* @throws IOException
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
try {
Class.forName("org.postgresql.Driver");
connection = DriverManager.getConnection(GREENPLUM_URL, GREENPLUM_USER, GREENPLUM_PASSWORD);
logger.info("初始化连接Greenplum...." + connection);
} catch (SQLException | ClassNotFoundException throwables) {
throwables.printStackTrace();
}
threadPoolExecutor = ThreadPoolUtil.getInstance();
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
Map<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
HashMap<String, String> resultMap = new HashMap<>();
//TODO 获取sql语句中的查询条件和更新条件
for (Map.Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
if (arrayList.contains(key)) {
resultMap.put(key, value);
}
}
}
String queryCondition = resultMap.getOrDefault("query_conditions", null);
if (!Strings.isNullOrEmpty(sinkTable)) {
String gpTables = hashMap.getOrDefault(sinkTable, null);
String[] split = gpTables.split("\\|", -1);
for (String table : split) {
if ("3".equals(operationAction)) {
String updatesql = "UPDATE dwd." + table + " SET " + updateInfo + " WHERE " + queryCondition;
logger.info("执行语句: " + updatesql);
GreenplumBulkOperator.addUpdateAndDeleteToDB(connection, updatesql);
} else if ("2".equals(operationAction)) {
String deletesql = "DELETE FROM dwd." + table + " WHERE " + queryCondition;
logger.info("执行语句: " + deletesql);
GreenplumBulkOperator.addUpdateAndDeleteToDB(connection, deletesql);
}
}
}
}
});
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
logger.info("============= PreDelete ===================");
}
/**
* 关闭Greenplum连接
* @param env
* @throws IOException
*/
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
logger.info("==================== END ==================");
if (connection != null) {
try {
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
总结
总而言之,利用HBase 协处理器的确帮我们解决了很多问题,比如上面的更新删除维度表或者现在很多利用协处理器来做二级索引,开发思路大致一样。
但是同时也面临着以下几点问题,这些都是我在HBase开发使用中遇到的问题:
- 利用HBase存储+二级索引架构适用于超大数据量存储(十亿百亿级别),因为前端请求查询时会通过索引数据库映射得到rowkey再去查询HBase,这种情况势必会影响到查询性能。如果数据量不大,不建议这么使用,可以直接用Phoenix或者换一种数仓OLAP数据库,但是如果数据量达到数十亿上百亿这种规模,这种架构还是很香的。毕竟大部分数据库要在这个数量级使用相同服务器配置的话,HBase的优势立马体现了。
- 因为同步到数据库其实是一种双写的操作,在双写的过程中无法完全保证数据一致性,这里要靠自己的开发手段去弥补。
- 协处理器无法冗余历史数据,如果想重新计算,只能用过触发器来重新触发。
关注我,不迷路,带你们玩引擎,玩数据库,玩源码~
标签:String,HBase2,数据库,数据仓库,Server,Coprocessor,HBase,协处理器 来源: https://blog.csdn.net/BlackArmand/article/details/118500790