brpc中资源池/对象池的源码剖析
作者:互联网
1.背景
在实际项目中,资源池(对象也是资源的一种)是经常使用的技巧,它能够使程序节省资源申请和释放的时间、重复利用池中的资源。
资源池的实现通常是比较类似,代码例如:
template <typename T>
class ResourcePool {
public:
ResourcePool()= default;
virtual ~ResourcePool() {
for (auto it : res) {
if (it) {
delete it;
}
}
}
// 获得一个资源对象
T* Get() {
std::unique_lock<std::mutex> lck(mutex_);
if (res.empty()) {
return new T();
}
T* resouce = res.back();
res.pop_back();
return resouce;
}
// 返还资源对象至池中,以便后续重复利用
void Return(const T* resouce) {
std::unique_lock<std::mutex> lck(mutex_);
res.push_back(resouce);
}
private:
std::vector<T*> res;
std::mutex mutex_;
};
这里的T对象可能包含一个大的缓冲区、可能包含某个io对象等,总而言之,重复利用T比每次都构建一个新的要节省时间。
但上述的比较普遍的资源池的实现有一个重要的问题:锁。在高并发环境中这个锁竞争激烈,可能是影响性能的重要一环。有没有更好的实现?
看到了brpc(https://github.com/apache/incubator-brpc)代码中有一个非常好的工业中使用的资源池的实现,所以下面主要介绍它的实现。
2.用户接口
用户接口:
// 获取资源:返回对象T的指针,并返回唯一标识该T对象的id:ResourceId<T>类型(这个类型里其实就是一个uint64_t的value)
template <typename T> inline T* get_resource(ResourceId<T>* id) {
return ResourcePool<T>::singleton()->get_resource(id);
}
// 同上,但T对象是通过 T(arg1)构造的
template <typename T, typename A1>
inline T* get_resource(ResourceId<T>* id, const A1& arg1) {
return ResourcePool<T>::singleton()->get_resource(id, arg1);
}
// 同上,但T对象是通过 T(arg1, arg2) 构造的
template <typename T, typename A1, typename A2>
inline T* get_resource(ResourceId<T>* id, const A1& arg1, const A2& arg2) {
return ResourcePool<T>::singleton()->get_resource(id, arg1, arg2);
}
// 返还资源。注意这里返还的是id(get_resource得到的),而不是T对象
// Returns 0 when successful, -1 otherwise.
template <typename T> inline int return_resource(ResourceId<T> id) {
return ResourcePool<T>::singleton()->return_resource(id);
}
// 根据id找到并返回实际的T对象指针
template <typename T> inline T* address_resource(ResourceId<T> id) {
return ResourcePool<T>::address_resource(id);
}
正如上面接口看到的,ResourcePool<T>类是单例的,其中的几个重要的数据成员如下:
template <typename T>
class BAIDU_CACHELINE_ALIGNMENT ResourcePool {
private:
// 线程本地数据,每个线程一个LocalPool(LocalPool是ResourcePool 的嵌套类)
static BAIDU_THREAD_LOCAL LocalPool* _local_pool;
// 保存所有实际T对象空间的block的集合:BlockGroup
static butil::static_atomic<BlockGroup*> _block_groups[RP_MAX_BLOCK_NGROUP];
// 当前_block_groups中实际的BlockGroup个数
static butil::static_atomic<size_t> _ngroup;
// 回收各个线程存满的FreeChunk
std::vector<DynamicFreeChunk*> _free_chunks;
};
ResourcePool类中的get_resource函数的实现(其他两个构造函数带参数的重载函数类似,不赘述了):
inline T* get_resource(ResourceId<T>* id) {
// 首先获取线程本地对象LocalPool指针,首次调用时没有则创建一个
LocalPool* lp = get_or_new_local_pool();
if (__builtin_expect(lp != NULL, 1)) {
// 调用LocalPool的get函数
return lp->get(id);
}
return NULL;
}
为什么要创建线程本地对象LocalPool?因为只有利用TLS才能避免多线程竞争,这也是消除锁的核心。
嵌套类LocalPool中的几个重要的数据成员:
class BAIDU_CACHELINE_ALIGNMENT LocalPool {
public:
inline T* get(ResourceId<T>* id) {
BAIDU_RESOURCE_POOL_GET();
}
private:
// 弱引用ResourcePool对象指针,ResourcePool对象全局只有1个,LocalPool 是每个线程一个
ResourcePool* _pool;
// 下面几个后面再讲
Block* _cur_block;
size_t _cur_block_index;
FreeChunk _cur_free;
};
LocalPool的get函数主要就是调用宏BAIDU_RESOURCE_POOL_GET,所以重点看BAIDU_RESOURCE_POOL_GET宏的实现(不用细看,下文会一行一行剖析):
// 这里有个有意思的地方,就是如果T是POD类型,new T()和 new T是不同的,
// 例如,new int()和new int是不同的,前者值是0,后者未初始化
#define BAIDU_RESOURCE_POOL_GET(CTOR_ARGS) \
// 要获取一个空闲的T对象,分4层缓存
// 第一层
if (_cur_free.nfree) { \
const ResourceId<T> free_id = _cur_free.ids[--_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
// 第二层
if (_pool->pop_free_chunk(_cur_free)) { \
--_cur_free.nfree; \
const ResourceId<T> free_id = _cur_free.ids[_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
// 第三层
if (_cur_block && _cur_block->nitem < BLOCK_NITEM) { \
//yc:这个value很重要,能够定位是哪个block group下的哪个block下的第几个item(因为每一级数组都是固定的长度)
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
// 第4层
_cur_block = add_block(&_cur_block_index); \
if (_cur_block != NULL) { \
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
return NULL; \
要获取一个空闲的T对象,是分4层缓存实现的。为了更好的理解,我们从第4层开始往第1层讲。
3.四层缓存
第4层缓存
走到这里说明第1~3级缓存全部穿透了,例如冷启动时。核心代码:
// 第4层
// add_block是ResourcePool类的static函数(嵌套类可以直接使用外部类的静态成员函数)
_cur_block = add_block(&_cur_block_index); \
if (_cur_block != NULL) { \
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
看一下ResourcePool::add_block的实现:
struct BAIDU_CACHELINE_ALIGNMENT Block {
//yc:申请的是内存,经过in-place new之后就是长度为BLOCK_NITEM的 T 数组
char items[sizeof(T) * BLOCK_NITEM];
size_t nitem;
Block() : nitem(0) {}
};
struct BlockGroup {
butil::atomic<size_t> nblock;
butil::atomic<Block*> blocks[RP_GROUP_NBLOCK];
BlockGroup() : nblock(0) {
memset(blocks, 0, sizeof(butil::atomic<Block*>) * RP_GROUP_NBLOCK);
}
};
// 创建一个Block 对象并插入到BlockGroup的最右边
static Block* add_block(size_t* index) {
// 首先创建一各新的block对象
Block* const new_block = new(std::nothrow) Block;
if (NULL == new_block) {
return NULL;
}
size_t ngroup;
do {
// 获得当前block group个数
ngroup = _ngroup.load(butil::memory_order_acquire);
// 如果当前block group个数大于0,那么看看最后的BlockGroup是不是已经满了,如果没满则把新的block对象插入到这个BlockGroup中,并计算它的全局索引index,如果满了则新建一个BlockGroup,再插入block对象。
if (ngroup >= 1) {
BlockGroup* const g =
_block_groups[ngroup - 1].load(butil::memory_order_consume);
const size_t block_index =
g->nblock.fetch_add(1, butil::memory_order_relaxed);
if (block_index < RP_GROUP_NBLOCK) {
g->blocks[block_index].store(
new_block, butil::memory_order_release);
// 这个index是全局的block index,不是所属的group的block_index
*index = (ngroup - 1) * RP_GROUP_NBLOCK + block_index;
return new_block;
}
g->nblock.fetch_sub(1, butil::memory_order_relaxed);
}
// 如果当前block group个数等于0,或者当前group的block已满,则再创建一个group
} while (add_block_group(ngroup));
// Fail to add_block_group.
delete new_block;
return NULL;
}
这里有几个概念:Block、BlockGroup、索引index,它们之间是什么关系呢,简单的画个图介绍:
所以第4层缓存我们知道它的实现了:创建一个新的block对象,并插入到最右边的BlockGroup的最右边的blocks中,并获得它的唯一标识block_index,新的block对象指针保存到_cur_block;并使用_cur_block->items内存in-place new一个T对象,计算它的ResourceId,返回。
第3层
核心代码:
// 第3层
if (_cur_block && _cur_block->nitem < BLOCK_NITEM) { \
// 这个id能够定位是哪个block group下的哪个block下的第几个item(因为每一级数组都是固定的长度)
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
这一层缓存实现比较简单:如果当前_cur_block 有值并且空间没有用完,那么直接使用_cur_block->items内存in-place new一个T对象,计算它的ResourceId,返回。
归还实现
要介绍第2层、第1层,要先介绍归还资源的实现。首先要理解资源池中“归还”的概念,这里的归还并不是说把资源返还给操作系统,而是归还资源的id给资源池,对象还是那个对象、他的内存毫无变动,我们只要把free的id收集起来、后面重复利用id即可。
用户接口中的return_resource是怎么实现的:直接调用LocalPool::return_resource。所以看下LocalPool::return_resource的实现:
template <typename T, size_t NITEM>
struct ResourcePoolFreeChunk {
size_t nfree;
ResourceId<T> ids[NITEM];
};
typedef ResourcePoolFreeChunk<T, FREE_CHUNK_NITEM> FreeChunk;
typedef ResourcePoolFreeChunk<T, 0> DynamicFreeChunk;
inline int return_resource(ResourceId<T> id) {
// _cur_free是FreeChunk类型;返还id到本地的free列表中
if (_cur_free.nfree < ResourcePool::free_chunk_nitem()) {
// 把归还的id放到FreeChunk的ids数组中
_cur_free.ids[_cur_free.nfree++] = id;
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_ADD1;
return 0;
}
// 一旦本地free列表满了,则把这个满的FreeChunk归还给全局free列表,然后把新的id插入到_cur_free中,又重新开始收集
if (_pool->push_free_chunk(_cur_free)) {
_cur_free.nfree = 1;
_cur_free.ids[0] = id;
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_ADD1;
return 0;
}
return -1;
}
全局free列表:
bool push_free_chunk(const FreeChunk& c) {
// 为什么搞一个DynamicFreeChunk、ids的长度不是固定的?因为返还的FreeChunk不一定是满的,例如某个线程退出、销毁LocalPool时归还的FreeChunk就不一定是满的。
DynamicFreeChunk* p = (DynamicFreeChunk*)malloc(
offsetof(DynamicFreeChunk, ids) + sizeof(*c.ids) * c.nfree);
if (!p) {
return false;
}
p->nfree = c.nfree;
memcpy(p->ids, c.ids, sizeof(*c.ids) * c.nfree);
pthread_mutex_lock(&_free_chunks_mutex);
// _free_chunks是std::vector<DynamicFreeChunk*>,用于回收各个线程归还的FreeChunk
_free_chunks.push_back(p);
pthread_mutex_unlock(&_free_chunks_mutex);
return true;
}
了解了归还的实现,再来看第2层、第1层就好理解了。
第2层
核心代码:
// 如果本地FreeChunk是空的,则从全局的free列表_free_chunks中copy一个FreeChunk到本地,然后复用其中一个id,定义到唯一执行的T对象地址并返回。
if (_pool->pop_free_chunk(_cur_free)) { \
--_cur_free.nfree; \
const ResourceId<T> free_id = _cur_free.ids[_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
第1层
核心代码:
// 如果本地FreeChunk不是空的,则直接复用其中一个id,定义到唯一执行的T对象地址并返回。
if (_cur_free.nfree) { \
const ResourceId<T> free_id = _cur_free.ids[--_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
至此,我们再回过头从第1层到第4层顺序着看:
4.总结
这个资源池的实现相比我们最开始的写法有哪些好处:
(1)绝大部分情况是lockless的,避免了锁的竞争,因此延时上是更好的
(2)看Block的实现,是先申请了一块内存,然后in-place new每个T对象,因此减少了内存碎片,空间上是更好的。
当然了,需要说明的是,资源池不能滥用,因为毕竟是独占一块资源、不释放给操作系统,所以要理解它的利与弊。
以上。
标签:brpc,return,cur,free,剖析,源码,resource,id,block 来源: https://blog.csdn.net/yockie/article/details/111425457