基于Hbase 的微博案例
作者:互联网
需求
1、 发布微博内容
a. 在微博内容表中 添加一条数据(发布者)
b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
scan 'weibo:receive-content-email',{VERSIONS=>5}
2、添加关注用户
a. 在微博用户关系表中 添加新的好友关注(attends)
b. 从被关注用户角度来说, 新增粉丝用户(fans)
c. 微博邮件箱表添加关注用户发布的微博内容
3、移除或者取消关注用户
a. 在微博用户关系表中 移除新的好友关注(attends)
b. 从被关注用户角度来说, 删除粉丝用户(fans)
c. 微博邮件箱表删除关注用户发布的微博内容
4、获取关注用户发布的微博内容
a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
b. 根据上面获取到的微博内容的rowkey 获取微博内容
微博展示的内容信息:
message: 发布者ID , 时间戳 , content
表的设计
分析微博:
用户群体: 关注用户 和用户粉丝
用户行为: 发布微博、添加或者移除关注用户
数据存储: 分布式 mysql 数据库
考虑因素:响应时间 妙级 无延迟
对你的技术团队就是一个很大的考验
引出hbase 数据库存储 来实现响应时间 妙级 无延迟 、
hbase 是hadoop 的分布式数据库
使用数据库的时候
拦路虎: 表的设计(合理的来设计 因素多元化)
命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表
namespace name:weibo
设计那些表:
a.微博内容表
xxx 发布 xx 内容
table name : weibo:content
rowkey: 被关注用户ID_时间戳
columnfamily: cf
colunmnlabel: (任意变化的)
图片
内容
标题
version: 只需要一个版本
rowkey:
a.唯一性 每条数据是唯一的
b.长度 (<=64 kb 建议 10-100 byte 最佳 8-16 byte)表 rowkey 是hbase中数据产生冗余的因素
c.散列原则
举例:
时间戳_用户ID 作为rowkey
大量的用户在同一时刻 发布微博内容
121_001
121_002
121_003
121_004
===>
集中到某个region 会造成单独几个region 负载量偏大 而其他 region 完全没有负载
d. 业务相关的设计规范:
方便查询 尽可能将查询知道放到 rowkey
列簇设计:
Hbase 是面向列簇存储 region start rowkey 到 stop rowkey 范围内的一个列簇下的数据 对应一个hdfs file 称为StoreFile 也可以称为HFile 所以如果跨列查询 速度相对来说就会慢很多 so 设计hbase 表 列簇的是 一般1-2个,(1个最佳)
b.用户关系表
用户id fans attends
table name : weibo:relations
rowkey: 用户ID(发布者的用户ID)
columnfamily: attends、fans
colunmnlabel:
关注用户ID
粉丝用户ID
colunmnvalue: 关注用户ID
粉丝用户ID
version: 只需要一个版本
c.用户微博内容接收邮件箱表
table name : weibo:receive-content-email
rowkey: 用户ID(粉丝用户ID)
columnfamily: cf
colunmnlabel:
用户ID(发布者ID 被关注用户ID)
colunmnvalue:
取微博内容的rowkey
version: 1000
10001: cf_001_yyyyy
10001: cf_001_xxxxx
hbase 常用命令:
1. disable 'weibo:content': 禁用表
2. drop 'weibo:content': 删除表
3.truncate 'weibo:relations' :清空表数据
- Disabling table...
- Dropping table...
- Creating table...
list_namespace: 查看命名空间
list: 查看表的列表信息
default: 默认使用的命名空间
hbase : 系统默认使用命名空间
drop_namespace 'weibo': 删除指定的命名空间
代码实现
package com.ibeifeng.hbase.weibo.hbase_weibo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
public class WeiBo {
private Configuration conf =HBaseConfiguration.create();
private static final byte[] CONTENT=Bytes.toBytes("weibo:content");
private static final byte[] RELATIONS=Bytes.toBytes("weibo:relations");
private static final byte[] RECEIVE_CONTENT_EMAIL=Bytes.toBytes("weibo:receive-content-email");
//创建 命名空间(库)
public void initNameSpace(){
HBaseAdmin admin=null;
try {
admin=new HBaseAdmin(conf);
/**
* 命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表
namespace name:weibo
*/
NamespaceDescriptor weibo=NamespaceDescriptor.create("weibo")
.addConfiguration("creator", "beifeng")//
.addConfiguration("createTime", System.currentTimeMillis()+"")
.build();
admin.createNamespace(weibo);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// 建表
public void initTable(){
createContent();
createRelations();
createReceiveContentEmails();
}
private void createContent() {
/**
* a.微博内容表: xxx 发布 xx 内容
table name : weibo:content
rowkey: 用户ID_时间戳
columnfamily: cf
colunmnlabel:
图片
内容
标题
version: 只需要一个版本
*/
HBaseAdmin admin=null;
try {
admin=new HBaseAdmin(conf);
HTableDescriptor content=new HTableDescriptor(TableName.valueOf(CONTENT));
HColumnDescriptor c_cf=new HColumnDescriptor(Bytes.toBytes("cf"));
c_cf.setBlockCacheEnabled(true);
//推荐是计算后的值
c_cf.setBlocksize(2097152);
// 一定事先配置好
// c_cf.setCompressionType(Algorithm.SNAPPY);
c_cf.setMaxVersions(1);
c_cf.setMinVersions(1);
content.addFamily(c_cf);
admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void createRelations() {
/**
* b.用户关系表:
用户id fans attends
table name : weibo:relations
rowkey: 用户ID
columnfamily: attends、fans
colunmnlabel:
关注用户ID
粉丝用户ID
colunmnvalue: 用户ID
version: 只需要一个版本
*/
HBaseAdmin admin=null;
try {
admin=new HBaseAdmin(conf);
HTableDescriptor relations=new HTableDescriptor(TableName.valueOf(RELATIONS));
HColumnDescriptor attends=new HColumnDescriptor(Bytes.toBytes("attends"));
attends.setBlockCacheEnabled(true);
attends.setBlocksize(2097152);
attends.setMaxVersions(1);
attends.setMinVersions(1);
HColumnDescriptor fans=new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void createReceiveContentEmails() {
/**
* c.用户微博内容接收邮件箱表:
table name : weibo:receive-content-email
rowkey: 用户ID
columnfamily: cf
colunmnlabel:
用户ID
colunmnvalue:
取微博内容的rowkey
version: 1000
*/
HBaseAdmin admin=null;
try {
admin=new HBaseAdmin(conf);
HTableDescriptor receive_content_email=new HTableDescriptor(TableName.valueOf(RECEIVE_CONTENT_EMAIL));
HColumnDescriptor rce_cf =new HColumnDescriptor(Bytes.toBytes("cf"));
rce_cf.setBlockCacheEnabled(true);
rce_cf.setBlocksize(2097152);
rce_cf.setMaxVersions(1000);
rce_cf.setMinVersions(1000);
receive_content_email.addFamily(rce_cf);
admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 发布微博内容:
* a. 在微博内容表中 添加一条数据(发布者)
b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
* @param uid
* 发布者ID
* @param content
* 发布微博内容
*/
public void publishContent(String uid,String content){
HConnection connection=null;
try {
connection=HConnectionManager.createConnection(conf);
//a. 在微博内容表中 添加一条数据(发布者)
HTableInterface contentTBL=connection.getTable(TableName.valueOf(CONTENT));
long timestamp=System.currentTimeMillis();
String rowkey=uid+"_"+timestamp;
Put put = new Put(Bytes.toBytes(rowkey));
//四个值分别代表colfamily,columnlabel,timestamp,value
put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"),timestamp, Bytes.toBytes(content));
contentTBL.put(put);
//b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
// 1. 查询 用户关系表 获取该用户的粉丝用户
HTableInterface relationsTBL=connection.getTable(TableName.valueOf(RELATIONS));
// get 'tablename','rowkey','cf','cq'
//rowkey
Get get=new Get(Bytes.toBytes(uid));
//cf
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<byte[]>();
//设置uid用户下所有的粉丝
for (Cell cell : result.rawCells()) {
//RELATIONS表里的columnlabel成为RECEIVE_CONTENT_EMAIL的rowkey
fans.add(CellUtil.cloneQualifier(cell));
}
// 数据判断
if(fans.size() <= 0) return ;
// 获取微博内容邮件箱表
HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
List<Put> puts=new ArrayList<Put>();
for (byte[] fan : fans) {
Put fanPut=new Put(fan); //设置rowkey
fanPut.add(Bytes.toBytes("cf"), Bytes.toBytes(uid),timestamp, Bytes.toBytes(rowkey));
puts.add(fanPut);
}
rceTBL.put(puts);
/**
* cell :
primary key
{rowkey , column(family+label),version(timestamp)}:value
*/
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 添加关注用户:
* a. 在微博用户关系表中 添加新的好友关注(attends)
b. 从被关注用户角度来说, 新增粉丝用户(fans)
c. 微博邮件箱表添加关注用户发布的微博内容
* @param uid
* 粉丝 ID
* @param attends
* 被关注用户ID
*/
public void addAttends(String uid, String ...attends){
//参数过滤
if(attends==null||attends.length<=0) return;
/**
* a.在微博用户关系表中 添加新的好友关注(attends)
* (0001) ,(0002,0003)
* rowkey column value
* 0001 attends:0002,0002
* 0001 attends:0003,0003
*
* rowkey column value
* 0003 fans:0001,0001
* 0002 fans:0001,0001
*
*/
HConnection connection=null;
try {
connection=HConnectionManager.createConnection(conf);
HTableInterface realtionsTBL=connection.getTable(RELATIONS);
List<Put> puts=new ArrayList<Put>();
//a. 在微博用户关系表中 添加新的好友关注(attends)
Put attendsPut=new Put(Bytes.toBytes(uid));
for (String attend : attends) {
attendsPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b. 从被关注用户角度来说, 新增粉丝用户(fans)
Put fansPut=new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
puts.add(fansPut);
}
puts.add(attendsPut);
realtionsTBL.put(puts);
//c. 微博邮件箱表添加关注用户发布的微博内容的rowkey
/**
* 1. 首先查询被关注用户ID发布的微博内容的rowkey
* 单个被关注用户ID, --查询content ->微博内容的rowkey
* 0001_xxx
* 0001_aaa
* 0002_yyy
* 0002_zzz
* 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据
*
*/
HTableInterface contentTBL= connection.getTable(CONTENT);
Scan scan =new Scan();
List<byte[]> rowkeys=new ArrayList<byte[]>();
for(String attend: attends){
//扫描表的rowkey,含有字符串("被关注用户ID_")
RowFilter filter=new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_"));
scan.setFilter(filter);
ResultScanner result=contentTBL.getScanner(scan);
// 迭代器遍历
Iterator<Result> itearor=result.iterator();
while(itearor.hasNext()){
Result r=itearor.next();
for(Cell cell:r.rawCells()){
rowkeys.add(CellUtil.cloneRow(cell));
}
}
}
if(rowkeys.size()<= 0) return;
// 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据
HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
List<Put> rcePuts=new ArrayList<Put>();
for (byte[] rk : rowkeys) {
Put put =new Put(Bytes.toBytes(uid));
String rowkey=Bytes.toString(rk);
// substring 包前不包后
String attend=rowkey.substring(0, rowkey.indexOf("_"));
long timestamp=Long.parseLong(rowkey.substring(rowkey.indexOf("_")+1));
put.add(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp, rk);
rcePuts.add(put);
}
rceTBL.put(rcePuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
*移除或者取消关注用户
a. 在微博用户关系表中 移除新的好友关注(attends)
b. 从被关注用户角度来说, 删除粉丝用户(fans)
c. 微博邮件箱表删除关注用户发布的微博内容
* @param uid
* 粉丝用户ID
* @param attends
* 被关注用户ID
*/
public void removeAttends(String uid,String ...attends){
//参数过滤
if(attends==null||attends.length<=0) return;
HConnection connection=null;
try {
connection=HConnectionManager.createConnection(conf);
//a. 在微博用户关系表中 移除新的好友关注(attends)
HTableInterface relationsTBL=connection.getTable(RELATIONS);
List<Delete> deletes=new ArrayList<Delete>();
Delete attendsDelete =new Delete(Bytes.toBytes(uid));
for (String attend : attends) {
attendsDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
//b. 从被关注用户角度来说, 删除粉丝用户(fans)
Delete fansDelete=new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendsDelete);
relationsTBL.delete(deletes);
//c. 微博邮件箱表删除关注用户发布的微博内容
HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
Delete rceDelete=new Delete(Bytes.toBytes(uid));
for(String attend:attends){
/**
* Delete the latest version of the specified column.
*/
// rceDelete.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend));
// Delete all versions of the specified column with a timestamp less than
rceDelete.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend), System.currentTimeMillis()+Integer.MAX_VALUE);
}
rceTBL.delete(rceDelete);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 粉丝用户 去获取关注用户发布的微博内容:
* a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
b. 根据上面获取到的微博内容的rowkey 获取微博内容
* @param uid
* 粉丝用户ID
*/
public List<Message> getAttendsContens(String uid){
HConnection connection=null;
List<Message> messages=new ArrayList<Message>();
try {
connection=HConnectionManager.createConnection(conf);
//a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
Get get=new Get(Bytes.toBytes(uid));
get.setMaxVersions(5);
List<byte[]> rowkeys=new ArrayList<byte[]>();
Result result=rceTBL.get(get);
for(Cell cell:result.rawCells()){
//CellUtil.cloneValue 获取value
//CellUtil.cloneRow 获取rowkey
//CellUtil.cloneQualifier 获取列名
//CellUtil.cloneFamily 获取到列族名
rowkeys.add(CellUtil.cloneValue(cell));
}
//b. 根据上面获取到的微博内容的rowkey 获取微博内容
HTableInterface contentTBL =connection.getTable(CONTENT);
List<Get> gets=new ArrayList<Get>();
for (byte[] rk : rowkeys) {
Get g=new Get(rk);
gets.add(g);
}
Result[] results=contentTBL.get(gets);
for (Result res : results) {
for(Cell cell:res.rawCells()){
Message message=new Message();
String rowkey=Bytes.toString(CellUtil.cloneRow(cell));
String userid=rowkey.substring(0, rowkey.indexOf("_"));
String timestamp=rowkey.substring(rowkey.indexOf("_")+1);
String content=Bytes.toString(CellUtil.cloneValue(cell));
message.setUid(userid);
message.setTimestamp(timestamp);
message.setContent(content);
messages.add(message);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null!=connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return messages;
}
public static void testInit(WeiBo wb){
wb.initNameSpace();
wb.initTable();
}
public static void testPublishContent(WeiBo wb){
wb.publishContent("0001", "Tomorrow will be better!");
wb.publishContent("0001", "Tomorrow will be better!");
}
public static void testAddAttends(WeiBo wb){
wb.publishContent("0008", "今天天气真不错!!!");
wb.publishContent("0009", "今天天气真不错!!!");
wb.addAttends("0001","0008","0009");
}
public static void testRemoveAttends(WeiBo wb){
wb.removeAttends("0001", "0009");
}
public static void testGetAttendsContents(WeiBo wb){
List<Message> messages=wb.getAttendsContens("0001");
for (Message message : messages) {
System.out.println(message);
}
}
public static void main(String[] args) {
WeiBo wb=new WeiBo();
//testInit(wb);
//testPublishContent(wb);
testAddAttends(wb);
//testRemoveAttends(wb);
}
}
标签:toBytes,Bytes,用户,案例,微博,rowkey,new,Hbase 来源: https://blog.51cto.com/u_13270164/3035959