Flink流处理-Sink之HBase
作者:互联网
TripDriveToHBaseSink
package pers.aishuang.flink.streaming.sink.hbase;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.utils.DateUtil;
import java.io.IOException;
public class TripDriveToHBaseSink extends RichSinkFunction<TripModel> {
private final static Logger logger = LoggerFactory.getLogger(TripDriveToHBaseSink.class);
private String tableName;
private Connection conn = null;
private BufferedMutator mutator = null;
public TripDriveToHBaseSink(String _tableName) {
this.tableName = _tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
//从上下文获取到全局参数
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//获取HBase Java API相关参数
String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
String port = globalJobParameters.getRequired("zookeeper.clientPort");
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set(HConstants.ZOOKEEPER_QUORUM,zkQuorum);
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
conf.set(TableInputFormat.INPUT_TABLE,tableName);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
//通过连接工厂创建连接
conn = ConnectionFactory.createConnection(hbaseConf);
//设置缓存对象的多大、多长时间刷写到HBase中
//缓存写入HBaes,与Kafka的缓存写入Kafka有异曲同工之秒
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
//设置缓存达到一定的大小:10M
params.writeBufferSize(10*1024*1024L);
//设置缓存达到一定的时间:5s
params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);
//通过连接获取表对象
try {
mutator = conn.getBufferedMutator(params);
} catch (IOException e) {
logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
}
}
//5. 重写 invoke 方法,将读取的数据写入到 hbase
@Override
public void invoke(TripModel value, Context context) throws Exception {
//5.1 setDataSourcePut输入参数value,返回put对象
try {
Put put = setDataSourcePut(value);
mutator.mutate(put);
//5.2 指定时间内的数据强制刷写到hbase
mutator.flush();
} catch (Exception ex) {
logger.error("写入到hbase失败:" + ex.getMessage());
}
}
//4.重写close方法
@Override
public void close() throws Exception {
//4.1 关闭hbase 表和连接资源
if (mutator != null) mutator.close();
if (!conn.isClosed()) conn.close();
}
//6. 实现 setDataSourcePut 方法
/**
* 每条对象生成一个 put
* 1.表名 2.rowkey 3.列簇 4.列名和列值
*
* @param tripModel
* @return
*/
private Put setDataSourcePut(TripModel tripModel) {
String rowKey = tripModel.getVin() + "_" + DateUtil.convertStringToDate(tripModel.getTripStartTime()).getTime();
String cf = "cf";
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(tripModel.getVin()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastSoc"), Bytes.toBytes(String.valueOf(tripModel.getLastSoc())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastMileage"), Bytes.toBytes(String.valueOf(tripModel.getLastMileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStartTime"), Bytes.toBytes(tripModel.getTripStartTime()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getStart_BMS_SOC())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_longitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_longitude())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_latitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_latitude())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_mileage"), Bytes.toBytes(String.valueOf(tripModel.getStart_mileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getEnd_BMS_SOC())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_longitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_longitude())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_latitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_latitude())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_mileage"), Bytes.toBytes(String.valueOf(tripModel.getEnd_mileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripEndTime"), Bytes.toBytes(tripModel.getTripEndTime()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mileage"), Bytes.toBytes(String.valueOf(tripModel.getMileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("max_speed"), Bytes.toBytes(String.valueOf(tripModel.getMax_speed())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("soc_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getSoc_comsuption())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getTime_comsuption())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_low_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_low_speed_nums())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_medium_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_medium_speed_nums())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_high_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_high_speed_nums())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_SOC())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_SOC())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_SOC())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_Mileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_Mileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_Mileage())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStatus"), Bytes.toBytes(String.valueOf(tripModel.getTripStatus())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime()));
return put;
}
}
TripSampleToHBaseSink
package pers.aishuang.flink.streaming.sink.hbase;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.utils.DateUtil;
import pers.aishuang.flink.streaming.utils.StringUtil;
import java.io.IOException;
public class TripSampleToHBaseSink extends RichSinkFunction<String[]> {
//创建日志打印器
private final static Logger logger = LoggerFactory.getLogger(TripSampleToHBaseSink.class);
//定义当前类的私有变量
private String tableName;
//定义连接
org.apache.hadoop.hbase.client.Connection conn = null;
//定义表操作的对象
BufferedMutator mutator = null;
//创建一个有参数-表名的构造方法
public TripSampleToHBaseSink(String _tableName) {
this.tableName = _tableName;
}
//重写open方法
@Override
public void open(Configuration parameters) throws Exception {
//1、从上下文获取到全局的参数
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();
//2、获取HBase Java API相关参数
//-- 指定ZK集群服务端地址(quorum:法定人数)
String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
//-- 指定ZK客户端端口号
String port = globalJobParameters.getRequired("zookeeper.clientPort");
//-- 创建配置
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
//-- 设置配置,加载参数
conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM,zkQuorum);
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
conf.set(TableInputFormat.INPUT_TABLE,tableName);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
//3、通过连接工厂创建连接
conn = ConnectionFactory.createConnection(hbaseConf);
//-- 设置缓存对象的多大、多长时间刷新到Hbase中
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
//-- 写缓存大小为10M
params.writeBufferSize(10*1024*1024L);//10M
//-- 写缓存刷写时间为5s
params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//5s
//4、通过连接获取表对象
try {
mutator = conn.getBufferedMutator(params);
} catch (IOException e) {
logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
}
}
//5、重写invoke方法,将读取的数据写入到HBase
@Override
public void invoke(String[] value, Context context) throws Exception {
//-- setDataSourcePut输入参数value,返回put对象
try {
Put put = setDataSourcePut(value);
mutator.mutate(put);
//-- 指定时间内的数据强制刷写到HBase
mutator.flush();
} catch (IOException e) {
logger.error("写入到HBase失败:" + e.getMessage());
}
}
//重写close方法
@Override
public void close() throws Exception {
//关闭hbase表和连接资源
if(mutator != null) mutator.close();
if( conn != null ) conn.close();
}
/**
* 实现setDataSourcePut方法
* 每个对象生成一个 put
* 1、表名 2、rowkey 3、列簇 4、列别和列值
* @param tripDriveArr
* @return
*/
private Put setDataSourcePut(String[] tripDriveArr) {
//1. 如何设计rowkey VIN+时间戳反转
String rowkey = tripDriveArr[0] + StringUtil.reverse(tripDriveArr[1]);
//2. 通过rowkey实例化put
Put put = new Put(Bytes.toBytes(rowkey));
//3. 定义列簇的名称
String cf = "cf";
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("vin"),Bytes.toBytes(tripDriveArr[0]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTimeStamp"),Bytes.toBytes(tripDriveArr[1]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("soc"),Bytes.toBytes(tripDriveArr[2]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("mileage"),Bytes.toBytes(tripDriveArr[3]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("speed"),Bytes.toBytes(tripDriveArr[4]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("gps"),Bytes.toBytes(tripDriveArr[5]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTime"),Bytes.toBytes(tripDriveArr[6]));
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("processTime"),Bytes.toBytes(DateUtil.getCurrentDateTime()));
return put;
}
}
标签:String,Flink,Bytes,cf,Sink,toBytes,put,HBase,addColumn 来源: https://www.cnblogs.com/zi-shuo/p/15522500.html