其他分享
首页 > 其他分享> > HbaseAPI

HbaseAPI

作者:互联网

package com.liu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

/**
 * @Author : ld
 * @Description :
 * @ClassName : DemoTest
 * @Date : 2021/10/11 19:43
 * @Version : 1.0
 */
public class DemoTest {
    public static void main(String[] args)throws Exception {
        //创建配置,指定zk集群地址
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","master,slave1,slave2");
        //创建连接
        Connection conn = ConnectionFactory.createConnection(conf);
        //创建admin对象
        Admin admin=conn.getAdmin();
        //创建test_api表
        HTableDescriptor test_api = new HTableDescriptor(TableName.valueOf("test_api"));
        //创建cf1列族
        HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
        //对列族进行配置
        cf1.setTimeToLive(5);//设置TTL
        cf1.setMaxVersions(3);//最大版本号
        //增加列族
        test_api.addFamily(cf1);
        admin.createTable(test_api);
        conn.close();
    }
}

package com.liu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;

/**
 * @Author : ld
 * @Description :
 * @ClassName : HBaseAPI
 * @Date : 2021/10/11 19:53
 * @Version : 1.0
 */
public class HBaseAPI {
    Connection conn;
    @Before
    public void init() throws IOException{
        //创建配置,指定zk集群地址
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","master,slave1,slave2");
        conn= ConnectionFactory.createConnection(conf);
    }

    //创建表
    @Test
    public void createTable()throws IOException{
        Admin admin = conn.getAdmin();
        if(!admin.tableExists(TableName.valueOf("test_cre"))){
            HTableDescriptor test_cre = new HTableDescriptor(TableName.valueOf("test_cre"));
            HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
            cf1.setMaxVersions(3);//最打版本为3
            test_cre.addFamily(cf1);
            admin.createTable(test_cre);
        }else{
            System.out.println("表已存在");
        }
    }
    //删除表
    @Test
    public void deleteTable() throws IOException {
        Admin admin = conn.getAdmin();
        if(admin.tableExists(TableName.valueOf("test_cre"))){
            admin.disableTable(TableName.valueOf("test_cre"));
            admin.deleteTable(TableName.valueOf("test_cre"));
        }else{
            System.out.println("表不存在");
        }
    }
    //修改表
    @Test
    public void alterTable() throws Exception {
        Admin admin = conn.getAdmin();
        //获取表的原有结构
        HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("test_api"));
        //获取所有列族构成的HColumnDescriptor数组
        HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
        //遍历所有的列族
        for (HColumnDescriptor columnFamily : columnFamilies) {
            //获取列族名称
            String cfName = columnFamily.getNameAsString();
            //对名为cf1的列族进行修改
            if("cf1".equals(cfName)){
//                将TTL重新设置为10000s
                columnFamily.setTimeToLive(10000);
            }
        }
        //修改表结构
        admin.modifyTable(TableName.valueOf("test_api"),tableDescriptor);
    }
    //put
    @Test
    public void Put() throws IOException {
        Table test_api = conn.getTable(TableName.valueOf("test_api"));
        Put put = new Put("001".getBytes());
        put.addColumn("cf1".getBytes(),"name".getBytes(),"张三".getBytes());
        test_api.put(put);
    }
    //get
    @Test
    public void Get() throws IOException {
        Table test_api = conn.getTable(TableName.valueOf("test_api"));
        Get get = new Get("001".getBytes());
        Result rs = test_api.get(get);
        byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
        System.out.println(Bytes.toString(value));
    }
    //scan
    @Test
    public void Scan() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        Scan scan = new Scan();
        scan.setLimit(5);
        scan.withStartRow("00".getBytes());
        scan.withStopRow("1500100010".getBytes());

        ResultScanner scanner = students.getScanner(scan);
        for (Result result : scanner) {
            String id = Bytes.toString(result.getRow());
            String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));
            String age = Bytes.toString(result.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(result.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(result.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "," + name + "," + age + "," + gender + "," + clazz);

        }
    }
    // 获取数据的另外一种方式
    // 适用于每条数据结构不唯一的情况下 直接遍历每条数据包含的所有的cell
    @Test
    public void scanWithUtils() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        Scan scan = new Scan();
//        scan.Limit(5);
        scan.setStartRow("00".getBytes());
        scan.setStopRow("1500100010".getBytes());

        ResultScanner scanner = students.getScanner(scan);
        for (Result result : scanner) {

            String rk = Bytes.toString(result.getRow());
            System.out.print(rk + " ");
            for (Cell cell : result.listCells()) {
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                // 列名
                String qua = Bytes.toString(CellUtil.cloneQualifier(cell));
                String cf = Bytes.toString(CellUtil.cloneFamily(cell)); // 列簇名
                if ("age".equals(qua)) {
                    if (Integer.parseInt(value) >= 18) {
                        value = "成年";

                    } else {
                        value = "未成年";
                    }
                }
                System.out.print(value + " ");

            }
            System.out.println();

//            System.out.println(id + "," + name + "," + age + "," + gender + "," + clazz);

        }

    }

    //根据行键的前缀进行查询
    @Test
    public void ScanF() throws IOException {
        Table table = conn.getTable(TableName.valueOf("test_api"));
        Scan scan3 = new Scan();
        PrefixFilter filter1 = new PrefixFilter("001".getBytes());
        scan3.setFilter(filter1);
        ResultScanner scanner = table.getScanner(scan3);

    }

    //putAll 读取students.txt 并将数据写入HBase12
    @Test
    public void PutAll() throws IOException {
        //创建students表info
        Admin admin = conn.getAdmin();
        TableName studentsT = TableName.valueOf("students");
        //判断表是否存在
        if(!admin.tableExists(studentsT)){
            HTableDescriptor students = new HTableDescriptor(studentsT);
            HColumnDescriptor info = new HColumnDescriptor("info");
            students.addFamily(info);
            admin.createTable(students);
        }
        Table stu = conn.getTable(studentsT);

        BufferedReader br = new BufferedReader(new FileReader("F:\\ideaProject\\liubigdata\\data\\students.txt"));
        String line=null;
        ArrayList<Put> puts = new ArrayList<>();
        int batchSize = 10;
        while((line=br.readLine())!=null){
            //读取每行
            String[] split = line.split(",");
            String id=split[0];
            String name=split[1];
            String age=split[2];
            String gender=split[3];
            String clazz=split[4];
            Put put = new Put(id.getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
            put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
            put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());
            puts.add(put); // 将每条数据构建好的put对象加入puts列表
            if(puts.size()==batchSize){
                stu.put(puts);//批量写入
                puts=new ArrayList<>();
            }
            stu.put(put);//逐条put
        }
        if(puts.size()!=0){
            stu.put(puts);
        }
    }

    @After
    public void closed() throws IOException {
        conn.close();
    }
}

标签:String,HbaseAPI,put,new,test,import,getBytes
来源: https://blog.csdn.net/qq_43278189/article/details/120713016