其他分享
首页 > 其他分享> > HBase详解

HBase详解

作者:互联网

文章目录

HBase简介

HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库

利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。

HBase特点

  1. 大:一个表可以有上亿行,上百万列。

  2. 面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。

  3. 稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。

  4. 无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增 加,同一张表中不同的行可以有截然不同的列。

  5. 数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配, 版本号就是单元格插入时的时间戳。

  6. 数据类型单一:HBase中的数据都是字节数组,没有类型。

HBase架构

在这里插入图片描述

Master

RegionServer

Region

Memstore&Storefile

HLog

HBase数据模型

在这里插入图片描述

RowKey(行键)

ColumnFamily&Qualifier(列簇和列)

TimeStamp(时间戳)

Cell(存储单元)

HBase读写流程

在这里插入图片描述

HBase Shell

在这里插入图片描述

HBaseAPI

常用java类

java类HBase数据模型
Admin / HBaseAdmin/ HBaseConfiguration数据库
HTable/HTableDescriptor
HColumnDescriptor列簇
Put/Delete/Get/Scan/ResultScanner/
CellUtil存储单元

示例代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Demo2API {
    Configuration conf = null;
    Connection conn = null;

    @Before
    public void init() {
        conf = HBaseConfiguration.create();
        // 其他配置参考http://hbase.apache.org/1.4/book.html#config.files
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 创建表
    @Test
    public void create_table() throws IOException {
        Admin admin = conn.getAdmin();
        // admin 相当于HBase的管理员
        // 创建表 传入表名(TableName.valueOf())
        HTableDescriptor tableName = new HTableDescriptor(TableName.valueOf("tableName"));
        // 创建列簇 传入列簇名
        HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
        // 对列簇进行一些配置
        cf1.setMaxVersions(5); // 设置版本号
        cf1.setTimeToLive(30); // 设置TTL时间
        // 将创建好的列簇加入表
        tableName.addFamily(cf1);
        // 使用admin对象创建表
        admin.createTable(tableName);
    }

    // 删除表
    @Test
    public void drop_table() throws IOException {
        Admin admin = conn.getAdmin();
        String tableName = "tableName";
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

    // 添加一条数据
    @Test
    public void put() throws IOException {
        // 如果想要插入数据 查询数据 需要使用Table对象
        // 如果需要对表进行修改,获取表的一些配置、结构 使用HTableDescriptor对象
        Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));

        Put put = new Put("00001".getBytes());
        put.addColumn("cf1".getBytes(), "name".getBytes(), "zhangSan".getBytes());

        testJavaAPI.put(put);


    }

    // 获取一条数据
    @Test
    public void get() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
        Get get = new Get("00001".getBytes());
        Result rs = testJavaAPI.get(get);
        byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
        System.out.println(Bytes.toString(value));
    }

    // 指定rowkey范围 扫描表
    @Test
    public void scan() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
        Scan scan = new Scan();
        // 包含startRow 不包含 endRow
        scan.withStartRow("001".getBytes());
        scan.withStopRow("007".getBytes());
        ResultScanner scanner = testJavaAPI.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {

            byte[] row = rs.getRow();// 获取rowkey
            String rk = Bytes.toString(row);
            System.out.println();
            if ("001".equals(rk)) {
                byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
                System.out.println(Bytes.toString(value));
            } else if ("002".equals(rk)) {
                byte[] value = rs.getValue("cf1".getBytes(), "name0".getBytes());
                System.out.println(Bytes.toString(value));
                byte[] value1 = rs.getValue("cf1".getBytes(), "name1".getBytes());
                System.out.println(Bytes.toString(value1));
                byte[] value2 = rs.getValue("cf1".getBytes(), "name100".getBytes());
                System.out.println(Bytes.toString(value2));
                byte[] value3 = rs.getValue("cf1".getBytes(), "name2".getBytes());
                System.out.println(Bytes.toString(value3));
                byte[] value4 = rs.getValue("cf1".getBytes(), "name3".getBytes());
                System.out.println(Bytes.toString(value4));
                byte[] value5 = rs.getValue("cf1".getBytes(), "name4".getBytes());
                System.out.println(Bytes.toString(value5));
                byte[] value6 = rs.getValue("cf1".getBytes(), "name5".getBytes());
                System.out.println(Bytes.toString(value6));
            } else if ("007".equals(rk)) {
                byte[] value6 = rs.getValue("cf1".getBytes(), "name".getBytes());
                System.out.println(Bytes.toString(value6));
                byte[] value7 = rs.getValue("cf1".getBytes(), "age1".getBytes());
                System.out.println(Bytes.toString(value7));
            }

            rs = scanner.next();
        }
    }

    @Test
    public void cellUtil() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
        Scan scan = new Scan();
        // 包含startRow 不包含 endRow
        scan.withStartRow("001".getBytes());
        scan.withStopRow("007".getBytes());
        ResultScanner scanner = testJavaAPI.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            for (Cell cell : rs.listCells()) {
                byte[] rk = CellUtil.cloneRow(cell);
                byte[] cf = CellUtil.cloneFamily(cell);
                byte[] qualifier = CellUtil.cloneQualifier(cell);
                byte[] value = CellUtil.cloneValue(cell);

                System.out.println("rowkey:" + Bytes.toString(rk) + ",columnsFamily:" + Bytes.toString(cf) + ",qualifier:" + Bytes.toString(qualifier) + ",value:" + Bytes.toString(value));
            }

            rs = scanner.next();
        }
    }

    // 读取文件并写入HBase
    @Test
    public void putAll() throws IOException {
        Admin admin = conn.getAdmin();
        // 判断表是否存在,不存在即创建
        if (!admin.tableExists(TableName.valueOf("students"))) {
            HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
            HColumnDescriptor info = new HColumnDescriptor("info");
            students.addFamily(info);
            admin.createTable(students);
        }

        Table students = conn.getTable(TableName.valueOf("students"));
        // 读取文件
        BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
        String line = br.readLine();
        while (line != null) {
            String[] splits = line.split(",");
            String rk = splits[0];
            String name = splits[1];
            String age = splits[2];
            String gender = splits[3];
            String clazz = splits[4];

            Put put = new Put(Bytes.toBytes(rk));
            put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
            put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
            put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());

            students.put(put);

            line = br.readLine();
        }
        br.close();

    }

    @After
    public void closeAll() throws IOException {
        if (conn != null) {
            conn.close();
        }
    }
}

HBase过滤器

作用

比较过滤器

比较运算符
常见的六大比较过滤器
BinaryComparator
BinaryPrefixComparator
NullComparator
BitComparator
RegexStringComparator
SubstringComparator
示例代码
rowKey过滤器:RowFilter

通过RowFilter与BinaryComparator过滤比rowKey 1500100010小的所有值出来

    @Test
    // 通过RowFilter过滤比rowKey 1500100010 小的所有值出来
    public void BinaryComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(1500100010));
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator);
        Scan scan = new Scan();
        scan.setFilter(rowFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
列簇过滤器:FamilyFilter

通过FamilyFilter与SubstringComparator查询列簇名包含in的所有列簇下面的数据

    @Test
    // 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
    public void SubstringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SubstringComparator substringComparator = new SubstringComparator("in");
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();
        scan.setFilter(familyFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }

通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的列簇下的所有数据

// 通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的所有列簇下的所有数据
    @Test
    public void BinaryPrefixComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 二进制前缀比较器
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("info".getBytes());
        // FamilyFilter 作用于列簇的过滤器
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(familyFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }
列过滤器:QualifierFilter

通过QualifierFilter与SubstringComparator查询列名包含in的列的值

    public void printRS(ResultScanner scanner) throws IOException {
        for (Result rs : scanner) {
            String rowkey = Bytes.toString(rs.getRow());
            System.out.println("当前行的rowkey为:" + rowkey);
            for (Cell cell : rs.listCells()) {
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                byte[] bytes = CellUtil.cloneValue(cell);
                if ("age".equals(qualifier)) {
                    int value = Bytes.toInt(bytes);
                    System.out.println(family + ":" + qualifier + "的值为" + value);
                } else {
                    String value = Bytes.toString(bytes);
                    System.out.println(family + ":" + qualifier + "的值为" + value);
                }
            }
        }
    }

    @Test
    // 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
    public void SubstringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SubstringComparator substringComparator = new SubstringComparator("in");
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();
        scan.setFilter(familyFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }

过滤出 列的名字 中 包含 “am” 所有的列 及列的值

    // 过滤出 列的名字 中 包含 "am" 所有的列 及列的值
    @Test
    public void SubstringComparatorQualifierFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        SubstringComparator substringComparator = new SubstringComparator("am");
        // 作用在列名上的过滤器
        QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(qualifierFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }
列值过滤器:ValueFilter

通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 “张” 开头的学生

    @Test
    // 通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 "张" 开头的学生
    public void BinaryPrefixComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("张".getBytes());
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
        Scan scan = new Scan();
        scan.setFilter(valueFilter);
        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

过滤出文科的学生,只会返回clazz列,其他列的数据不符合条件,不会返回

    // 过滤出文科的学生
    // 只会返回clazz列,其他列的数据不符合条件,不会返回
    @Test
    public void RegexStringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 使用正则表达式比较器
        RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");
        // ValueFilter 会返回符合条件的cell,并不会返回整条数据
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, regexStringComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(valueFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);
    }

专用过滤器

单列值过滤器:SingleColumnValueFilter

通过SingleColumnValueFilter与查询文科班所有学生信息

    @Test
    // 通过SingleColumnValueFilter与查询文科班所有学生信息
    public void RegexStringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                new RegexStringComparator("^文科.*")
        );

        Scan scan = new Scan();
        scan.setFilter(singleColumnValueFilter);
        ResultScanner scanner = students.getScanner(scan);

        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
列值排除过滤器:SingleColumnValueExcludeFilter

通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列

    @Test
    // 通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列
    public void RegexStringComparatorExcludeFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                new BinaryComparator("文科一班".getBytes())
        );

        Scan scan = new Scan();
        scan.setFilter(singleColumnValueExcludeFilter);
        ResultScanner scanner = students.getScanner(scan);

        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            // clazz列为空
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
rowkey前缀过滤器:PrefixFilter

通过PrefixFilter查询以150010008开头的所有前缀的rowkey

    @Test
    // 通过PrefixFilter查询以150010008开头的所有前缀的rowkey
    public void PrefixFilterFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
        Scan scan = new Scan();
        scan.setFilter(prefixFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            // clazz列为空
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }
    }
分页过滤器PageFilter
    @Test
    // 通过PageFilter查询第三页的数据,每页10条
    public void PageFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageNum = 3;
        int PageSize = 10;
        Scan scan = new Scan();
        if (PageNum == 1) {
            scan.withStartRow("".getBytes());
            //使用分页过滤器,实现数据的分页
            PageFilter pageFilter = new PageFilter(PageSize);
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            printRS(scanner);
        } else {
            String current_page_start_rows = "";
            int scanDatas = (PageNum - 1) * PageSize + 1;
            PageFilter pageFilter = new PageFilter(scanDatas);
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            for (Result rs : scanner) {
                current_page_start_rows = Bytes.toString(rs.getRow());
            }
            scan.withStartRow(current_page_start_rows.getBytes());
            PageFilter pageFilter1 = new PageFilter(PageSize);
            scan.setFilter(pageFilter1);
            ResultScanner scanner1 = students.getScanner(scan);
            printRS(scanner1);

        }

    }

通过合理的设置rowkey来实现分页功能

    @Test
    // 通过合理的设置rowkey来实现分页功能,提高效率
    public void PageFilterTest2() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageSize = 10;
        int PageNum = 3;

        int baseId = 1500100000;
        int start_row = baseId + (PageNum - 1) * PageSize + 1;
        int end_row = start_row + PageSize;
        Scan scan = new Scan();
        scan.withStartRow(String.valueOf(start_row).getBytes());
        scan.withStopRow(String.valueOf(end_row).getBytes());

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);


    }

多过滤器综合查询

查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息

    @Test
    // 查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
    public void FilterListFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes()
                , "clazz".getBytes()
                , CompareFilter.CompareOp.EQUAL
                , new RegexStringComparator("^文科.*"));
        PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
        SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter(
                "info".getBytes()
                , "age".getBytes()
                , CompareFilter.CompareOp.LESS
                , new BinaryComparator(Bytes.toBytes(23)));

        FilterList filterList = new FilterList();
        filterList.addFilter(singleColumnValueFilter);
        filterList.addFilter(prefixFilter);
        filterList.addFilter(singleColumnValueFilter1);
        scan.setFilter(filterList);
        ResultScanner scanner = students.getScanner(scan);
        printRS(scanner);

    }

Phoenix

Phoenix搭建

Phoenix 4.15 HBase 1.4.6 hadoop 2.7.6

关闭hbase集群,在master中执行
stop-hbase.sh
上传解压配置环境变量

解压

tar -xvf apache-phoenix-4.15.0-HBase-1.4-bin.tar.gz 

改名

mv apache-phoenix-4.15.0-HBase-1.4-bin phoenix-4.15.0
将phoenix-4.15.0-HBase-1.4-server.jar复制到所有节点的hbase lib目录下
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar master:/usr/local/soft/hbase-1.4.6/lib/

scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node1:/usr/local/soft/hbase-1.4.6/lib/

scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node2:/usr/local/soft/hbase-1.4.6/lib/

启动hbase , 在master中执行
start-hbase.sh
配置环境变量
vim /etc/profile
增加:export PHOENIX_HOME=/usr/local/soft/phoenix-4.15.0
	path:$PHOENIX_HOME/bin
source /etc/profile

Phoenix使用

连接sqlline
sqlline.py master,node1,node2

# 出现
163/163 (100%) Done
Done
sqlline version 1.5.0
0: jdbc:phoenix:master,node1,node2> 


常用命令
# 1、创建表

CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
);

# 2、显示所有表
 !table

# 3、插入数据
upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');


# 4、查询数据,支持大部分sql语法,
select * from STUDENT ;
select * from STUDENT where age=24;
select gender ,count(*) from STUDENT group by gender;
select * from student order by gender;

# 5、删除数据
delete from STUDENT where id='1500100004';


# 6、删除表
drop table STUDENT;
 
 
# 7、退出命令行
!quit

更多语法参照官网
https://phoenix.apache.org/language/index.html#upsert_select

phoenix映射
视图映射

Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作

# hbase shell 进入hbase命令行
hbase shell 

# 创建hbase表
create 'test','name','company' 

# 插入数据
put 'test','001','name:firstname','zhangsan'
put 'test','001','name:lastname','zhangsan'
put 'test','001','company:name','数加'
put 'test','001','company:address','合肥'


# 在phoenix创建视图, primary key 对应到hbase中的rowkey

create view "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"  varchar,
"company"."name"  varchar,
"company"."address" varchar
);



CREATE view "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR ,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

# 在phoenix查询数据,表名通过双引号引起来
select * from "test";

# 删除视图
drop view "test";

表映射

使用Apache Phoenix创建对HBase的表映射,有两类:

1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。

2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。

第1)种情况下,如在之前的基础上已经存在了test表,则表映射的语句如下:

create table "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"varchar,
"company"."name"  varchar,
"company"."address" varchar
)column_encoded_bytes=0;

upsert into  "test"  values('1','2','3','4','5');

CREATE table  "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR ,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

upsert into "students" values('1500110004','葛德曜','24','男','理科三班');

使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。

Phoenix二级索引

开启索引支持
# 关闭hbase集群
stop-hbase.sh

# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`

# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.sql master,node1,node2
创建索引
全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

# 创建DIANXIN.sql
CREATE TABLE IF NOT EXISTS DIANXIN (
     mdn VARCHAR ,
     start_date VARCHAR ,
     end_date VARCHAR ,
     county VARCHAR,
     x DOUBLE ,
     y  DOUBLE,
     bsid VARCHAR,
     grid_id  VARCHAR,
     biz_type VARCHAR, 
     event_type VARCHAR , 
     data_source VARCHAR ,
     CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;

# 上传数据DIANXIN.csv

# 导入数据
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv

# 创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );

# 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';

# 强制使用索引 (索引生效) hint
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';

select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014'  and start_date = '20180503154614';

# 取索引列,(索引生效)
select end_date from DIANXIN where end_date = '20180503154014';

# 创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );

# 多条件查询 (索引生效)
select end_date,MDN,COUNTY from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

# 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';

# 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

# 单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';

本地索引

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

@ 创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);

# 索引生效
select grid_id from dianxin where grid_id='117285031820040';

# 索引生效
select * from dianxin where grid_id='117285031820040';

全局索引与本地索引区别
覆盖索引

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

# 创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );

# 查询所有列 (索引未生效)
select * from dianxin where x=117.288 and y =31.822;

# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from dianxin where x=117.288 and y =31.822;

# 查询索引中的列 (索引生效)
select mdn,x,y,county from dianxin where x=117.288 and y =31.822;

查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)


select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from dianxin group by x,y;

PhoenixJDBC

# 导入依赖

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.15.0-HBase-1.4</version>
</dependency>


Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * from dianxin limit 10");
while(rs.next()){
	String mdn = rs.getString("mdn");
	System.out.println(mdn);
}
stat.close();
conn.close();


HBase SQL与Hive SQL执行流程

在这里插入图片描述

HBase的MapReduce过程

在这里插入图片描述#### 代码示例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class Demo7MapReduceReadAndWriteHBase {
    // 读取HBase中的students表,统计每个班级的人数,将结果写回HBase clazz_num表
    public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String rowkey = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));

            context.write(new Text(clazz), new IntWritable(1));
        }

    }

    public static class WriteHBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : values) {
                sum += i.get();
            }
            // create 'clazz_num','cf1'
            Put put = new Put(key.toString().getBytes());
            put.addColumn("cf1".getBytes(), "num".getBytes(), Bytes.toBytes(sum));

            context.write(NullWritable.get(), put);

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");

        Job job = Job.getInstance(conf);
        job.setJobName(Demo7MapReduceReadAndWriteHBase.class.getName());
        job.setJarByClass(Demo7MapReduceReadAndWriteHBase.class);

        // 配置map任务
        // 使用HBase提供的TableMapReduceUtil工具进行配置
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students")
                , new Scan()
                , ReadHBaseMapper.class
                , Text.class
                , IntWritable.class
                , job
        );

        // 配置reduce任务
        // 使用HBase提供的TableMapReduceUtil工具进行配置
        TableMapReduceUtil.initTableReducerJob("clazz_num", WriteHBaseReducer.class, job);

        job.waitForCompletion(true);
        /*
        hadoop jar HBaseJavaAPI11-1.0-jar-with-dependencies.jar  Demo7MapReduceReadAndWriteHBase
         */
    }
}

Hive关联Hbase表

# Hive整合HBase
create external table students_hbase
(
id string,
name string,
age string,
gender string, 
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");

HBaseHA

//在node1,node2中启用备用HMaster
hbase-daemon.sh start master

在这里插入图片描述

HBase调优

Pre-Creating Regions(预分区)

//在phoenix中
CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
)split on('15001006|','15001007|','15001008|') ;
//在hbase中
create 'split_table_test', 'cf', SPLITS_FILE => 'region_split_info.txt'
create 'split_table_test', 'cf', SPLITS => ['a','e','r']

Rowkey设计

原则:
常用方式
反转

202133 -> 331202
202134 -> 431202
202135 -> 531202
202136 -> 631202
202137 -> 731202
202138 -> 831202
202139 -> 931202
202140 -> 041202

hash
md5 sha1

202133 -> 41DDBBCED55669818B2A40F4FED46F56
202134 -> 19D329403F02E2DA265CFC05D41FD253
202135 -> F6D06AEC4FB72A04F9CD4020BEF5E10F
202136 -> 0B512404B0411E623F64EC8981F8AE21

加上随机前缀
随机散列

第一次:202133 -> 41DDBBCED55669818B2A40F4FED46F56
第二次:202133 -> D55669818B2A40F4
第二次:202133 -> 02E2DA265CFC05D4

需求:查看某个时刻的数据
需求:将最新的数据放到最前面

大数:300000

202137 -> 300000 - 202137 = 97863
202138 -> 300000 - 202138 = 97862
202139 -> 300000 - 202139 = 97861
202140 -> 300000 - 202140 = 97860

加盐

CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
)salt_buckets=6;

inmemory

创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到 RegionServer的缓存中,保证在读取的时候被cache命中。

maxversion

创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置 表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。

建立索引超时,查询超时

修改配置文件,hbase-site.xml

两个位置
/usr/local/soft/phoenix-4.15.0/bin
/usr/local/soft/hbase-1.4.6/conf/  所有节点

增加配置
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>
需要重启hbase

Compact & Split

Minor Compaction:
Major Compaction:

RegionSplit

ConstantSizeRegionSplitPolicy
IncreasingToUpperBoundRegionSplitPolicy

region split阈值的计算公式是:

例如:

特点

SteppingSplitPolicy
KeyPrefixRegionSplitPolicy

根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。

DelimitedKeyPrefixRegionSplitPolicy
BusyRegionSplitPolicy
DisabledRegionSplitPolicy

不启用自动拆分, 需要指定手动拆分

HBase BulkLoading

优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

代码

  1. 生成HFile部分
package com.shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo10BulkLoading {
    public static class BulkLoadingMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split(",");
            String mdn = splits[0];
            String start_time = splits[1];
            // 经度
            String longitude = splits[4];
            // 维度
            String latitude = splits[5];

            String rowkey = mdn + "_" + start_time;

            KeyValue lg = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes());
            KeyValue lt = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lt".getBytes(), latitude.getBytes());

            context.write(new ImmutableBytesWritable(rowkey.getBytes()), lg);
            context.write(new ImmutableBytesWritable(rowkey.getBytes()), lt);

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");


        // 创建Job实例
        Job job = Job.getInstance(conf);
        job.setJarByClass(Demo10BulkLoading.class);
        job.setJobName("Demo10BulkLoading");

        // 保证全局有序
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        // 设置reduce个数
        job.setNumReduceTasks(4);
        // 配置map任务
        job.setMapperClass(BulkLoadingMapper.class);

        // 配置reduce任务
        // KeyValueSortReducer 保证在每个Reduce有序
        job.setReducerClass(KeyValueSortReducer.class);

        // 输入输出路径
        FileInputFormat.addInputPath(job, new Path("/data/DIANXIN/"));
        FileOutputFormat.setOutputPath(job, new Path("/data/hfile"));

        // 创建HBase连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // create 'dianxin_bulk','info'
        // 获取dianxin_bulk 表
        Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
        // 获取dianxin_bulk 表 region定位器
        RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
        // 使用HFileOutputFormat2将输出的数据按照HFile的形式格式化
        HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);

        // 等到MapReduce任务执行完成
        job.waitForCompletion(true);

        // 加载HFile到 dianxin_bulk 中
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
        load.doBulkLoad(new Path("/data/hfile"), conn.getAdmin(), dianxin_bulk, regionLocator);

        /**
         *  create 'dianxin_bulk','info'
         *  hadoop jar HBaseJavaAPI10-1.0-jar-with-dependencies.jar com.shujia.Demo10BulkLoading
         */
    }
}

说明

  1. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

  2. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

  3. MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

  4. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新HBase的元数据。

  5. HFile入库到HBase通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库

思维导图

在这里插入图片描述

标签:rs,Bytes,getBytes,详解,new,hbase,HBase
来源: https://blog.csdn.net/weixin_44624060/article/details/118462209