其他分享
首页 > 其他分享> > Flink流处理-Sink之HBase

Flink流处理-Sink之HBase

作者:互联网

TripDriveToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.io.IOException;

public class TripDriveToHBaseSink extends RichSinkFunction<TripModel> {
    private final static Logger logger = LoggerFactory.getLogger(TripDriveToHBaseSink.class);


    private String tableName;
    private Connection conn = null;
    private BufferedMutator mutator = null;

    public TripDriveToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //从上下文获取到全局参数
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //获取HBase Java API相关参数
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set(HConstants.ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //通过连接工厂创建连接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //设置缓存对象的多大、多长时间刷写到HBase中
        //缓存写入HBaes,与Kafka的缓存写入Kafka有异曲同工之秒
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //设置缓存达到一定的大小:10M
        params.writeBufferSize(10*1024*1024L);
        //设置缓存达到一定的时间:5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);
        //通过连接获取表对象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
        }
    }

    //5. 重写 invoke 方法,将读取的数据写入到 hbase
    @Override
    public void invoke(TripModel value, Context context) throws Exception {
        //5.1 setDataSourcePut输入参数value,返回put对象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //5.2 指定时间内的数据强制刷写到hbase
            mutator.flush();
        } catch (Exception ex) {
            logger.error("写入到hbase失败:" + ex.getMessage());
        }
    }

    //4.重写close方法
    @Override
    public void close() throws Exception {
        //4.1 关闭hbase 表和连接资源
        if (mutator != null) mutator.close();
        if (!conn.isClosed()) conn.close();
    }

    //6. 实现 setDataSourcePut 方法

    /**
     * 每条对象生成一个 put
     * 1.表名 2.rowkey 3.列簇  4.列名和列值
     *
     * @param tripModel
     * @return
     */
    private Put setDataSourcePut(TripModel tripModel) {
        String rowKey = tripModel.getVin() + "_" + DateUtil.convertStringToDate(tripModel.getTripStartTime()).getTime();
        String cf = "cf";
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(tripModel.getVin()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastSoc"), Bytes.toBytes(String.valueOf(tripModel.getLastSoc())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastMileage"), Bytes.toBytes(String.valueOf(tripModel.getLastMileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStartTime"), Bytes.toBytes(tripModel.getTripStartTime()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getStart_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_longitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_latitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_mileage"), Bytes.toBytes(String.valueOf(tripModel.getStart_mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getEnd_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_longitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_latitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_mileage"), Bytes.toBytes(String.valueOf(tripModel.getEnd_mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripEndTime"), Bytes.toBytes(tripModel.getTripEndTime()));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mileage"), Bytes.toBytes(String.valueOf(tripModel.getMileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("max_speed"), Bytes.toBytes(String.valueOf(tripModel.getMax_speed())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("soc_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getSoc_comsuption())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getTime_comsuption())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_low_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_low_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_medium_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_medium_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_high_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_high_speed_nums())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_Mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStatus"), Bytes.toBytes(String.valueOf(tripModel.getTripStatus())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime()));
        return put;
    }
}

TripSampleToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.utils.DateUtil;
import pers.aishuang.flink.streaming.utils.StringUtil;

import java.io.IOException;

public class TripSampleToHBaseSink extends RichSinkFunction<String[]> {
    //创建日志打印器
    private final static Logger logger = LoggerFactory.getLogger(TripSampleToHBaseSink.class);

    //定义当前类的私有变量
    private String tableName;
    //定义连接
    org.apache.hadoop.hbase.client.Connection conn = null;
    //定义表操作的对象
    BufferedMutator mutator = null;

    //创建一个有参数-表名的构造方法
    public TripSampleToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    //重写open方法
    @Override
    public void open(Configuration parameters) throws Exception {
        //1、从上下文获取到全局的参数
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        //2、获取HBase Java API相关参数
        //-- 指定ZK集群服务端地址(quorum:法定人数)
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        //-- 指定ZK客户端端口号
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        //-- 创建配置
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        //-- 设置配置,加载参数
        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //3、通过连接工厂创建连接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //-- 设置缓存对象的多大、多长时间刷新到Hbase中
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //-- 写缓存大小为10M
        params.writeBufferSize(10*1024*1024L);//10M
        //-- 写缓存刷写时间为5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//5s
        //4、通过连接获取表对象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
        }
    }

    //5、重写invoke方法,将读取的数据写入到HBase
    @Override
    public void invoke(String[] value, Context context) throws Exception {
        //-- setDataSourcePut输入参数value,返回put对象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //-- 指定时间内的数据强制刷写到HBase
            mutator.flush();
        } catch (IOException e) {
            logger.error("写入到HBase失败:" + e.getMessage());
        }
    }

    //重写close方法
    @Override
    public void close() throws Exception {
        //关闭hbase表和连接资源
        if(mutator != null) mutator.close();
        if( conn != null ) conn.close();
    }


    /**
     * 实现setDataSourcePut方法
     * 每个对象生成一个 put
     * 1、表名 2、rowkey 3、列簇 4、列别和列值
     * @param tripDriveArr
     * @return
     */
    private Put setDataSourcePut(String[] tripDriveArr) {
        //1. 如何设计rowkey VIN+时间戳反转
        String rowkey = tripDriveArr[0] + StringUtil.reverse(tripDriveArr[1]);
        //2. 通过rowkey实例化put
        Put put = new Put(Bytes.toBytes(rowkey));
        //3. 定义列簇的名称
        String cf = "cf";
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("vin"),Bytes.toBytes(tripDriveArr[0]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTimeStamp"),Bytes.toBytes(tripDriveArr[1]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("soc"),Bytes.toBytes(tripDriveArr[2]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("mileage"),Bytes.toBytes(tripDriveArr[3]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("speed"),Bytes.toBytes(tripDriveArr[4]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("gps"),Bytes.toBytes(tripDriveArr[5]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTime"),Bytes.toBytes(tripDriveArr[6]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("processTime"),Bytes.toBytes(DateUtil.getCurrentDateTime()));

        return put;
    }
}

标签:String,Flink,Bytes,cf,Sink,toBytes,put,HBase,addColumn
来源: https://www.cnblogs.com/zi-shuo/p/15522500.html