HbaseAPI
作者:互联网
package com.liu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
/**
* @Author : ld
* @Description :
* @ClassName : DemoTest
* @Date : 2021/10/11 19:43
* @Version : 1.0
*/
public class DemoTest {
public static void main(String[] args)throws Exception {
//创建配置,指定zk集群地址
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master,slave1,slave2");
//创建连接
Connection conn = ConnectionFactory.createConnection(conf);
//创建admin对象
Admin admin=conn.getAdmin();
//创建test_api表
HTableDescriptor test_api = new HTableDescriptor(TableName.valueOf("test_api"));
//创建cf1列族
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
//对列族进行配置
cf1.setTimeToLive(5);//设置TTL
cf1.setMaxVersions(3);//最大版本号
//增加列族
test_api.addFamily(cf1);
admin.createTable(test_api);
conn.close();
}
}
package com.liu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
/**
* @Author : ld
* @Description :
* @ClassName : HBaseAPI
* @Date : 2021/10/11 19:53
* @Version : 1.0
*/
public class HBaseAPI {
Connection conn;
@Before
public void init() throws IOException{
//创建配置,指定zk集群地址
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master,slave1,slave2");
conn= ConnectionFactory.createConnection(conf);
}
//创建表
@Test
public void createTable()throws IOException{
Admin admin = conn.getAdmin();
if(!admin.tableExists(TableName.valueOf("test_cre"))){
HTableDescriptor test_cre = new HTableDescriptor(TableName.valueOf("test_cre"));
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
cf1.setMaxVersions(3);//最打版本为3
test_cre.addFamily(cf1);
admin.createTable(test_cre);
}else{
System.out.println("表已存在");
}
}
//删除表
@Test
public void deleteTable() throws IOException {
Admin admin = conn.getAdmin();
if(admin.tableExists(TableName.valueOf("test_cre"))){
admin.disableTable(TableName.valueOf("test_cre"));
admin.deleteTable(TableName.valueOf("test_cre"));
}else{
System.out.println("表不存在");
}
}
//修改表
@Test
public void alterTable() throws Exception {
Admin admin = conn.getAdmin();
//获取表的原有结构
HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("test_api"));
//获取所有列族构成的HColumnDescriptor数组
HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
//遍历所有的列族
for (HColumnDescriptor columnFamily : columnFamilies) {
//获取列族名称
String cfName = columnFamily.getNameAsString();
//对名为cf1的列族进行修改
if("cf1".equals(cfName)){
// 将TTL重新设置为10000s
columnFamily.setTimeToLive(10000);
}
}
//修改表结构
admin.modifyTable(TableName.valueOf("test_api"),tableDescriptor);
}
//put
@Test
public void Put() throws IOException {
Table test_api = conn.getTable(TableName.valueOf("test_api"));
Put put = new Put("001".getBytes());
put.addColumn("cf1".getBytes(),"name".getBytes(),"张三".getBytes());
test_api.put(put);
}
//get
@Test
public void Get() throws IOException {
Table test_api = conn.getTable(TableName.valueOf("test_api"));
Get get = new Get("001".getBytes());
Result rs = test_api.get(get);
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
}
//scan
@Test
public void Scan() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
Scan scan = new Scan();
scan.setLimit(5);
scan.withStartRow("00".getBytes());
scan.withStopRow("1500100010".getBytes());
ResultScanner scanner = students.getScanner(scan);
for (Result result : scanner) {
String id = Bytes.toString(result.getRow());
String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));
String age = Bytes.toString(result.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(result.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(result.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "," + name + "," + age + "," + gender + "," + clazz);
}
}
// 获取数据的另外一种方式
// 适用于每条数据结构不唯一的情况下 直接遍历每条数据包含的所有的cell
@Test
public void scanWithUtils() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
Scan scan = new Scan();
// scan.Limit(5);
scan.setStartRow("00".getBytes());
scan.setStopRow("1500100010".getBytes());
ResultScanner scanner = students.getScanner(scan);
for (Result result : scanner) {
String rk = Bytes.toString(result.getRow());
System.out.print(rk + " ");
for (Cell cell : result.listCells()) {
String value = Bytes.toString(CellUtil.cloneValue(cell));
// 列名
String qua = Bytes.toString(CellUtil.cloneQualifier(cell));
String cf = Bytes.toString(CellUtil.cloneFamily(cell)); // 列簇名
if ("age".equals(qua)) {
if (Integer.parseInt(value) >= 18) {
value = "成年";
} else {
value = "未成年";
}
}
System.out.print(value + " ");
}
System.out.println();
// System.out.println(id + "," + name + "," + age + "," + gender + "," + clazz);
}
}
//根据行键的前缀进行查询
@Test
public void ScanF() throws IOException {
Table table = conn.getTable(TableName.valueOf("test_api"));
Scan scan3 = new Scan();
PrefixFilter filter1 = new PrefixFilter("001".getBytes());
scan3.setFilter(filter1);
ResultScanner scanner = table.getScanner(scan3);
}
//putAll 读取students.txt 并将数据写入HBase12
@Test
public void PutAll() throws IOException {
//创建students表info
Admin admin = conn.getAdmin();
TableName studentsT = TableName.valueOf("students");
//判断表是否存在
if(!admin.tableExists(studentsT)){
HTableDescriptor students = new HTableDescriptor(studentsT);
HColumnDescriptor info = new HColumnDescriptor("info");
students.addFamily(info);
admin.createTable(students);
}
Table stu = conn.getTable(studentsT);
BufferedReader br = new BufferedReader(new FileReader("F:\\ideaProject\\liubigdata\\data\\students.txt"));
String line=null;
ArrayList<Put> puts = new ArrayList<>();
int batchSize = 10;
while((line=br.readLine())!=null){
//读取每行
String[] split = line.split(",");
String id=split[0];
String name=split[1];
String age=split[2];
String gender=split[3];
String clazz=split[4];
Put put = new Put(id.getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());
puts.add(put); // 将每条数据构建好的put对象加入puts列表
if(puts.size()==batchSize){
stu.put(puts);//批量写入
puts=new ArrayList<>();
}
stu.put(put);//逐条put
}
if(puts.size()!=0){
stu.put(puts);
}
}
@After
public void closed() throws IOException {
conn.close();
}
}
标签:String,HbaseAPI,put,new,test,import,getBytes 来源: https://blog.csdn.net/qq_43278189/article/details/120713016