其他分享
首页 > 其他分享> > 基于Hbase 的微博案例

基于Hbase 的微博案例

作者:互联网

需求

1、 发布微博内容    

   a. 在微博内容表中 添加一条数据(发布者)
   b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
       scan 'weibo:receive-content-email',{VERSIONS=>5}

2、添加关注用户

   a. 在微博用户关系表中 添加新的好友关注(attends)

   b. 从被关注用户角度来说, 新增粉丝用户(fans)
   
   c. 微博邮件箱表添加关注用户发布的微博内容

3、移除或者取消关注用户

   a. 在微博用户关系表中 移除新的好友关注(attends)
   b. 从被关注用户角度来说, 删除粉丝用户(fans)
   c. 微博邮件箱表删除关注用户发布的微博内容 

 

4、获取关注用户发布的微博内容

  a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
  b. 根据上面获取到的微博内容的rowkey 获取微博内容
  微博展示的内容信息:
  message: 发布者ID , 时间戳 , content

表的设计

分析微博:
   用户群体: 关注用户 和用户粉丝
   用户行为: 发布微博、添加或者移除关注用户
   数据存储: 分布式 mysql 数据库
              考虑因素:响应时间 妙级 无延迟
              对你的技术团队就是一个很大的考验

              引出hbase 数据库存储  来实现响应时间 妙级 无延迟 、
              hbase 是hadoop 的分布式数据库


 使用数据库的时候  
   拦路虎: 表的设计(合理的来设计 因素多元化)
 

   命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表
    namespace name:weibo
   设计那些表:


    a.微博内容表

          xxx 发布 xx 内容
          table name : weibo:content
          
          rowkey: 被关注用户ID_时间戳
          
          columnfamily: cf
          
          colunmnlabel: (任意变化的)
                       图片
                       内容
                       标题
          
          version:    只需要一个版本


     rowkey:
         a.唯一性  每条数据是唯一的
         b.长度 (<=64 kb 建议 10-100 byte 最佳 8-16 byte)表 rowkey 是hbase中数据产生冗余的因素
         c.散列原则
            举例:
               时间戳_用户ID 作为rowkey 
               大量的用户在同一时刻 发布微博内容
               121_001
               121_002
               121_003
               121_004
               ===> 
               集中到某个region 会造成单独几个region 负载量偏大 而其他 region 完全没有负载

          d. 业务相关的设计规范:
                方便查询 尽可能将查询知道放到 rowkey
     列簇设计:
         Hbase 是面向列簇存储 region start  rowkey 到 stop rowkey 范围内的一个列簇下的数据 对应一个hdfs file  称为StoreFile 也可以称为HFile 所以如果跨列查询 速度相对来说就会慢很多 so 设计hbase 表 列簇的是 一般1-2个,(1个最佳)

    b.用户关系表

         用户id fans attends
         table name : weibo:relations
          
          rowkey: 用户ID(发布者的用户ID)
          
          columnfamily: attends、fans
          
          colunmnlabel: 
                       关注用户ID
                       粉丝用户ID
          colunmnvalue: 关注用户ID
                        粉丝用户ID
          
          version:    只需要一个版本


    c.用户微博内容接收邮件箱表

          table name : weibo:receive-content-email
          
          rowkey: 用户ID(粉丝用户ID)
          
          columnfamily: cf
          
          colunmnlabel: 
                       用户ID(发布者ID 被关注用户ID) 
          colunmnvalue:

                        取微博内容的rowkey
          
          version:     1000
          10001: cf_001_yyyyy
          10001: cf_001_xxxxx

hbase 常用命令:

          1. disable 'weibo:content': 禁用表
          2. drop 'weibo:content': 删除表

          3.truncate  'weibo:relations' :清空表数据
          - Disabling table...
          - Dropping table...
          - Creating table...
          list_namespace: 查看命名空间
          list: 查看表的列表信息

          default:  默认使用的命名空间                                                                                                    
          hbase : 系统默认使用命名空间
          drop_namespace 'weibo': 删除指定的命名空间

代码实现

package com.ibeifeng.hbase.weibo.hbase_weibo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

public class WeiBo {
	private Configuration conf =HBaseConfiguration.create();
	private static final  byte[] CONTENT=Bytes.toBytes("weibo:content");
	private static final  byte[] RELATIONS=Bytes.toBytes("weibo:relations");
	private static final  byte[] RECEIVE_CONTENT_EMAIL=Bytes.toBytes("weibo:receive-content-email");

	//创建 命名空间(库)
	public void initNameSpace(){
		HBaseAdmin  admin=null;

		try {
			admin=new HBaseAdmin(conf);
			/**
			 * 命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表
    			namespace name:weibo
			 */
			NamespaceDescriptor weibo=NamespaceDescriptor.create("weibo")
					.addConfiguration("creator", "beifeng")//
					.addConfiguration("createTime", System.currentTimeMillis()+"")
					.build();
			admin.createNamespace(weibo);

		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=admin){
				try {
					admin.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	// 建表
	public void initTable(){
		createContent();
		createRelations();
		createReceiveContentEmails();
	}
	private void createContent() {
		/**
		 * a.微博内容表:  xxx 发布 xx 内容
    	  table name : weibo:content

    	  rowkey: 用户ID_时间戳

          columnfamily: cf

          colunmnlabel: 
	                       图片
	                       内容
	                       标题

	      version:	只需要一个版本
		 */
		HBaseAdmin admin=null;
		try {
			admin=new HBaseAdmin(conf);
			HTableDescriptor content=new HTableDescriptor(TableName.valueOf(CONTENT));
			HColumnDescriptor c_cf=new HColumnDescriptor(Bytes.toBytes("cf"));
			c_cf.setBlockCacheEnabled(true);
			//推荐是计算后的值
			c_cf.setBlocksize(2097152);
			// 一定事先配置好
			//			c_cf.setCompressionType(Algorithm.SNAPPY);
			c_cf.setMaxVersions(1);
			c_cf.setMinVersions(1);

			content.addFamily(c_cf);
			admin.createTable(content);

		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=admin){
				try {
					admin.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}


	private void createRelations() {
		/**
		 * b.用户关系表:
		              用户id fans attends
		    	 table name : weibo:relations

		    	  rowkey: 用户ID

		          columnfamily: attends、fans

		          colunmnlabel: 
		                       关注用户ID
		                       粉丝用户ID
		       colunmnvalue: 用户ID

		       version:	只需要一个版本
		 */
		HBaseAdmin admin=null;
		try {
			admin=new HBaseAdmin(conf);
			HTableDescriptor relations=new HTableDescriptor(TableName.valueOf(RELATIONS));

			HColumnDescriptor attends=new HColumnDescriptor(Bytes.toBytes("attends"));
			attends.setBlockCacheEnabled(true);
			attends.setBlocksize(2097152);

			attends.setMaxVersions(1);
			attends.setMinVersions(1);

			HColumnDescriptor fans=new HColumnDescriptor(Bytes.toBytes("fans"));
			fans.setBlockCacheEnabled(true);
			fans.setBlocksize(2097152);

			fans.setMaxVersions(1);
			fans.setMinVersions(1);

			relations.addFamily(attends);
			relations.addFamily(fans);

			admin.createTable(relations);
		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=admin){
				try {
					admin.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	private void createReceiveContentEmails() {
		/**
		 * c.用户微博内容接收邮件箱表:
	    	  table name : weibo:receive-content-email

	    	  rowkey: 用户ID

	          columnfamily: cf

	          colunmnlabel: 
	                       	用户ID 
	          colunmnvalue:

	                                                                        取微博内容的rowkey
	           version: 	1000
		 */
		HBaseAdmin admin=null;
		try {
			admin=new HBaseAdmin(conf);
			HTableDescriptor receive_content_email=new HTableDescriptor(TableName.valueOf(RECEIVE_CONTENT_EMAIL));

			HColumnDescriptor rce_cf =new HColumnDescriptor(Bytes.toBytes("cf"));

			rce_cf.setBlockCacheEnabled(true);
			rce_cf.setBlocksize(2097152);

			rce_cf.setMaxVersions(1000);
			rce_cf.setMinVersions(1000);

			receive_content_email.addFamily(rce_cf);

			admin.createTable(receive_content_email);

		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=admin){
				try {
					admin.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	/**
	 * 发布微博内容:
	 *  a. 在微博内容表中 添加一条数据(发布者)
	    b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
	 * @param uid 
	 *          发布者ID
	 * @param content
	 *          发布微博内容
	 */
	public void publishContent(String uid,String content){
		HConnection connection=null;
		try {
			connection=HConnectionManager.createConnection(conf);
			//a. 在微博内容表中 添加一条数据(发布者)
			HTableInterface contentTBL=connection.getTable(TableName.valueOf(CONTENT));
			long timestamp=System.currentTimeMillis();
			String rowkey=uid+"_"+timestamp;
			Put put = new Put(Bytes.toBytes(rowkey));

			//四个值分别代表colfamily,columnlabel,timestamp,value
			put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"),timestamp, Bytes.toBytes(content));

			contentTBL.put(put);

			//b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
			// 1. 查询 用户关系表 获取该用户的粉丝用户 
			HTableInterface relationsTBL=connection.getTable(TableName.valueOf(RELATIONS));

			// get 'tablename','rowkey','cf','cq'

			
			//rowkey
			Get get=new Get(Bytes.toBytes(uid));

			//cf
			get.addFamily(Bytes.toBytes("fans"));

			Result result = relationsTBL.get(get);

			List<byte[]> fans = new ArrayList<byte[]>();
			
			//设置uid用户下所有的粉丝
			for (Cell cell : result.rawCells()) {
				//RELATIONS表里的columnlabel成为RECEIVE_CONTENT_EMAIL的rowkey
				fans.add(CellUtil.cloneQualifier(cell));
			}
			// 数据判断
			if(fans.size() <= 0) return ;
			// 获取微博内容邮件箱表
			HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
			List<Put> puts=new ArrayList<Put>();
 
			for (byte[] fan : fans) {
				Put fanPut=new Put(fan); //设置rowkey
				fanPut.add(Bytes.toBytes("cf"), Bytes.toBytes(uid),timestamp, Bytes.toBytes(rowkey));
				puts.add(fanPut);
			}
			rceTBL.put(puts);
			/**
			 *  cell :
			    primary key					
			   {rowkey       ,   column(family+label),version(timestamp)}:value
			 */
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=connection){
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}
	/**
	 * 添加关注用户:
	 *   a. 在微博用户关系表中 添加新的好友关注(attends)

   		 b. 从被关注用户角度来说, 新增粉丝用户(fans)

   		 c. 微博邮件箱表添加关注用户发布的微博内容
	 * @param uid
	 *           粉丝 ID
	 * @param attends
	 * 			 被关注用户ID	
	 */
	public void addAttends(String uid, String ...attends){

		//参数过滤
		if(attends==null||attends.length<=0) return;
		/**
		 * a.在微博用户关系表中 添加新的好友关注(attends)
		 * (0001) ,(0002,0003)
		 * rowkey   column      value
		 * 0001    attends:0002,0002
		 * 0001    attends:0003,0003
		 * 
		 * rowkey  column    value
		 * 0003    fans:0001,0001
		 * 0002    fans:0001,0001
		 * 
		 */
		HConnection connection=null;
		try {
			connection=HConnectionManager.createConnection(conf);

			HTableInterface realtionsTBL=connection.getTable(RELATIONS);
			List<Put> puts=new ArrayList<Put>();
			//a. 在微博用户关系表中 添加新的好友关注(attends)
			Put attendsPut=new Put(Bytes.toBytes(uid));
			for (String attend : attends) {
				attendsPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
				//b. 从被关注用户角度来说, 新增粉丝用户(fans)
				Put fansPut=new Put(Bytes.toBytes(attend));
				fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
				puts.add(fansPut);
			}
			puts.add(attendsPut);
			realtionsTBL.put(puts);

			//c. 微博邮件箱表添加关注用户发布的微博内容的rowkey
			/**
			 * 1. 首先查询被关注用户ID发布的微博内容的rowkey
			 *     单个被关注用户ID,  --查询content ->微博内容的rowkey
			 *     0001_xxx
			 *     0001_aaa
			 *     0002_yyy
			 *     0002_zzz
			 * 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据
			 * 
			 */
			HTableInterface contentTBL= connection.getTable(CONTENT);
			Scan scan =new Scan();
			List<byte[]> rowkeys=new ArrayList<byte[]>();
			for(String attend: attends){
				//扫描表的rowkey,含有字符串("被关注用户ID_")
				RowFilter filter=new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_"));
				scan.setFilter(filter);
				ResultScanner result=contentTBL.getScanner(scan);
				// 迭代器遍历
				Iterator<Result> itearor=result.iterator();
				while(itearor.hasNext()){
					Result r=itearor.next();
					for(Cell cell:r.rawCells()){
						rowkeys.add(CellUtil.cloneRow(cell));
					}
				}
			}
			if(rowkeys.size()<= 0) return;
			// 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据
			HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
			List<Put> rcePuts=new ArrayList<Put>();
			for (byte[] rk : rowkeys) {
				Put put =new Put(Bytes.toBytes(uid));
				String rowkey=Bytes.toString(rk);
				// substring 包前不包后
				String attend=rowkey.substring(0, rowkey.indexOf("_"));
				long timestamp=Long.parseLong(rowkey.substring(rowkey.indexOf("_")+1));
				put.add(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp, rk);
				rcePuts.add(put);
			}	
			rceTBL.put(rcePuts);

		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=connection){
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}
	/**
	 *移除或者取消关注用户
	   a. 在微博用户关系表中 移除新的好友关注(attends)
	   b. 从被关注用户角度来说, 删除粉丝用户(fans)
	   c. 微博邮件箱表删除关注用户发布的微博内容
	 * @param uid
	 * 			粉丝用户ID
	 * @param attends
	 *          被关注用户ID
	 */
	public void removeAttends(String uid,String ...attends){
		//参数过滤
		if(attends==null||attends.length<=0) return;
		HConnection connection=null;
		try {
			connection=HConnectionManager.createConnection(conf);
			//a. 在微博用户关系表中 移除新的好友关注(attends)
			HTableInterface relationsTBL=connection.getTable(RELATIONS);
			List<Delete> deletes=new ArrayList<Delete>();
			Delete attendsDelete =new Delete(Bytes.toBytes(uid));

			for (String attend : attends) {
				attendsDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
				//b. 从被关注用户角度来说, 删除粉丝用户(fans)
				Delete fansDelete=new Delete(Bytes.toBytes(attend));
				fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
				deletes.add(fansDelete);
			}

			deletes.add(attendsDelete);
			relationsTBL.delete(deletes);

			//c. 微博邮件箱表删除关注用户发布的微博内容
			HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
			Delete rceDelete=new Delete(Bytes.toBytes(uid));
			for(String attend:attends){
				/**
				 * Delete the latest version of the specified column.
				 */
				// rceDelete.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend));
				// Delete all versions of the specified column with a timestamp less than
				rceDelete.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend), System.currentTimeMillis()+Integer.MAX_VALUE);

			}
			rceTBL.delete(rceDelete);

		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=connection){
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}
	/**
	 * 粉丝用户 去获取关注用户发布的微博内容:
	 *  a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
  		b. 根据上面获取到的微博内容的rowkey 获取微博内容
	 * @param uid
	 * 		  粉丝用户ID
	 */
	public List<Message> getAttendsContens(String uid){
		HConnection connection=null;
		List<Message> messages=new ArrayList<Message>();
		try {
			connection=HConnectionManager.createConnection(conf);
			//a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
			HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL);
			Get get=new Get(Bytes.toBytes(uid));
			get.setMaxVersions(5);
			List<byte[]> rowkeys=new ArrayList<byte[]>();
			Result result=rceTBL.get(get);
			for(Cell cell:result.rawCells()){
				
				//CellUtil.cloneValue      获取value
				//CellUtil.cloneRow        获取rowkey
				//CellUtil.cloneQualifier  获取列名
				//CellUtil.cloneFamily     获取到列族名
				rowkeys.add(CellUtil.cloneValue(cell));
			}
			//b. 根据上面获取到的微博内容的rowkey 获取微博内容
			HTableInterface contentTBL =connection.getTable(CONTENT);
			List<Get> gets=new ArrayList<Get>();
			for (byte[] rk : rowkeys) {
				Get g=new Get(rk);
				gets.add(g);
			}
			Result[] results=contentTBL.get(gets);
			for (Result res : results) {
				for(Cell cell:res.rawCells()){
					Message message=new Message();
					String rowkey=Bytes.toString(CellUtil.cloneRow(cell));
					String userid=rowkey.substring(0, rowkey.indexOf("_"));
					String timestamp=rowkey.substring(rowkey.indexOf("_")+1);
					String content=Bytes.toString(CellUtil.cloneValue(cell));
					message.setUid(userid);
					message.setTimestamp(timestamp);
					message.setContent(content);
					messages.add(message);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			if(null!=connection){
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		return messages;


	}
	public static void testInit(WeiBo wb){
		wb.initNameSpace();
		wb.initTable();
	}
	public static  void testPublishContent(WeiBo wb){
		wb.publishContent("0001", "Tomorrow will be better!");
		wb.publishContent("0001", "Tomorrow will be better!");
	}
	public static void testAddAttends(WeiBo wb){
		wb.publishContent("0008", "今天天气真不错!!!");
		wb.publishContent("0009", "今天天气真不错!!!");
		wb.addAttends("0001","0008","0009");
	}
	public static void testRemoveAttends(WeiBo wb){
		wb.removeAttends("0001", "0009");
	}
	public static void testGetAttendsContents(WeiBo wb){
		List<Message> messages=wb.getAttendsContens("0001");
		for (Message message : messages) {
			System.out.println(message);
		}
	}
	public static void main(String[] args) {
		WeiBo wb=new WeiBo();
		//testInit(wb);
		//testPublishContent(wb);
		testAddAttends(wb);
		//testRemoveAttends(wb);

	}
}

 

标签:toBytes,Bytes,用户,案例,微博,rowkey,new,Hbase
来源: https://blog.51cto.com/u_13270164/3035959