其他分享
首页 > 其他分享> > springboot集成Hbase

springboot集成Hbase

作者:互联网

前提:hbase版本是1.2.6,详情看windows安装hbase_Fullmark0608-CSDN博客

1、添加maven

  <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2、添加配置

# hbase 数据库配置
hbase.config.hbase.zookeeper.quorum=127.0.0.1
hbase.config.hbase.zookeeper.property.clientPort=2181

3、编写代码

@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
    private Map<String, String> config;

    public Map<String, String> getConfig() {
        return config;
    }

    public void setConfig(Map<String, String> config) {
        this.config = config;
    }
}
@EnableConfigurationProperties(HbaseProperties.class)
@org.springframework.context.annotation.Configuration
public class HbaseConfig {
    private final HbaseProperties prop;

    public HbaseConfig(HbaseProperties properties) {
        this.prop = properties;
    }

    @Bean
    public Configuration configuration() {
        Configuration configuration = HBaseConfiguration.create();
        Map<String, String> config = prop.getConfig();
        config.forEach(configuration::set);
        return configuration;
    }

    @Bean
    public Connection getConnection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }
}
package com.catt.service;

import com.catt.config.HbaseConfig;
import com.catt.tools.system.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.*;

/**
 * @author fanghuanbiao
 */
@Service
@Slf4j
public class HbaseService {
    @Autowired
    private HbaseConfig config;

    private static Connection connection = null;
    private static Admin admin = null;

    @PostConstruct
    private void init() {
        if (connection != null) {
            return;
        }
        try {
            connection = ConnectionFactory.createConnection(config.configuration());
            admin = connection.getAdmin();
        } catch (IOException e) {
            log.error("HBase create connection failed:", e);
        }
    }

    /**
     * create 'tableName','[Column Family 1]','[Column Family 2]'
     *
     * @param tableName      表名
     * @param columnFamilies 列族名
     * @throws IOException 异常
     */
    public void createTable(String tableName, String... columnFamilies) throws IOException {
        TableName name = TableName.valueOf(tableName);
        boolean isExists = this.tableExists(tableName);
        if (isExists) {
            throw new TableExistsException(tableName + "is exists!");
        }
        TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
        List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
        for (String columnFamily : columnFamilies) {
            ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
                    .newBuilder(columnFamily.getBytes()).build();
            columnFamilyList.add(columnFamilyDescriptor);
        }
        descriptorBuilder.setColumnFamilies(columnFamilyList);
        TableDescriptor tableDescriptor = descriptorBuilder.build();
        admin.createTable(tableDescriptor);
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @param value        值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
            throws IOException {
        this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param columns      多个列
     * @param values       多个值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
            throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        for (int i = 0; i < columns.length; i++) {
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
            table.put(put);
        }
    }

    /**
     * @param tableName 表名
     * @param rowKey    行主键
     * @throws IOException 异常
     */
    public void deleteRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        table.delete(delete);
    }

    /**
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @throws IOException 异常
     */
    public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addFamily(Bytes.toBytes(columnFamily));
        table.delete(delete);
    }

    /**
     * delete 'tableName','rowKey','columnFamily:column'
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @throws IOException 异常
     */
    public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(delete);
    }

    /**
     * disable 'tableName' 之后 drop 'tableName'
     *
     * @param tableName 表名
     * @throws IOException 异常
     */
    public void deleteTable(String tableName) throws IOException {
        boolean isExists = this.tableExists(tableName);
        if (!isExists) {
            return;
        }
        TableName name = TableName.valueOf(tableName);
        admin.disableTable(name);
        admin.deleteTable(name);
    }

    /**
     * get 'tableName','rowkey','family:column'
     *
     * @param tableName 表名
     * @param rowkey    行主键
     * @param family    列族
     * @param column    列
     * @return
     */
    public String getValue(String tableName, String rowkey, String family, String column) {
        Table table = null;
        String value = "";
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
                .isBlank(column)) {
            return null;
        }
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(rowkey.getBytes());
            g.addColumn(family.getBytes(), column.getBytes());
            Result result = table.get(g);
            List<Cell> ceList = result.listCells();
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return value;
    }

    /**
     * get 'tableName','rowKey'
     *
     * @param tableName 表名
     * @param rowKey    行主键
     * @return 行值
     * @throws IOException 异常
     */
    public String selectOneRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(rowKey.getBytes());
        Result result = table.get(get);
        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
        for (Cell cell : result.rawCells()) {
            String row = Bytes.toString(cell.getRowArray());
            String columnFamily = Bytes.toString(cell.getFamilyArray());
            String column = Bytes.toString(cell.getQualifierArray());
            String value = Bytes.toString(cell.getValueArray());
            // 可以通过反射封装成对象(列名和Java属性保持一致)
            System.out.println(row);
            System.out.println(columnFamily);
            System.out.println(column);
            System.out.println(value);
        }
        return null;
    }

    /**
     * scan 't1',{FILTER=>"PrefixFilter('2015')"}
     *
     * @param tableName    表名
     * @param rowKeyFilter 行主键前缀
     * @return 主键前缀的所有行
     * @throws IOException 异常
     */
    public String scanTable(String tableName, String rowKeyFilter) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        if (!StringUtils.isEmpty(rowKeyFilter)) {
            RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
            scan.setFilter(rowFilter);
        }
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                System.out.println(Bytes.toString(result.getRow()));
                for (Cell cell : result.rawCells()) {
                    System.out.println(cell);
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }
        return null;
    }


    /**
     * 判断表是否已经存在,这里使用间接的方式来实现
     *
     * @param tableName 表名
     * @return 真or假
     * @throws IOException 异常
     */
    public boolean tableExists(String tableName) throws IOException {
        TableName[] tableNames = admin.listTableNames();
        if (tableNames != null && tableNames.length > 0) {
            for (int i = 0; i < tableNames.length; i++) {
                if (tableName.equals(tableNames[i].getNameAsString())) {
                    return true;
                }
            }
        }
        return false;
    }
}

4、测试

示例:Student 数据表
行键列族 StuInfo列族 Grades时间戳
NameAgeSexClassBigDataComputerMath
0001Tom Green18Male809085T2
0002Amy19019589T1
0003Allen19Male029088T1
public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(HbaseFzpGxydApplication.class)
                .run(args);
        HbaseService hbaseService = (HbaseService) context.getBean("hbaseService");
        try {
            // hbaseService.createTable("Student", "StuInfo", "Grades");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "name", "Tom Green");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "Age", "18");
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "name"));
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "Age"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0001"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0002"));

        } catch (Exception e) {
            e.printStackTrace();
        }
        context.close();
    }

标签:集成,springboot,tableName,param,IOException,rowKey,Hbase,throws,String
来源: https://blog.csdn.net/qq_22165667/article/details/120460203