Netty的对象池实现——Recycler源码浅析
作者:互联网
Netty中带有"Pooled"前缀的buffer,创建出来后是可以被循环利用的,以达到一个对象池的效果,避免了频繁地GC。而这个对象池的核心,就是Recycler。
Recycler
Recycler是一个抽象类,在Netty中有一个匿名的实现类:
private static final class RecyclerObjectPool<T> extends ObjectPool<T> { private final Recycler<T> recycler; RecyclerObjectPool(final ObjectCreator<T> creator) { recycler = new Recycler<T>() { @Override protected T newObject(Handle<T> handle) { return creator.newObject(handle); } }; } @Override public T get() { return recycler.get(); } }
可以看到,只有一个抽象方法,即创建对象的方法newObject,Recycler不负责创建对象,只负责管理创建的对象。
Recycler的属性可以分为以下几类:
- 常量型,如"DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD",其含义就是字面意思:默认初始最大容量
- 参数型,如"maxCapacityPerThread",作为构造函数的参数传入,用来控制Recycler的“尺寸”
- 线程共享变量,一共有两个:"DELAYED_RECYCLED"和"threadLocal",这是Recycler的核心属性,通过线程共享变量来实现线程安全的对象池,因为涉及到Recycler的内部类,这里不过多阐述
Recycler的核心方法有两个:
- Recycler#get:从Recycler取出对象
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
- Recycler#recycle(已废弃,转移到handler中):通过Recycler回收对象
@Deprecated public final boolean recycle(T o, Handle<T> handle) { if (handle == NOOP_HANDLE) { return false; } DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; } h.recycle(o); return true; }
从get的方法中我们看到一个熟悉的对象:Stack(栈)。但此栈非java.util中的Stack,而是Recycler的一个内部类,但还是栈的数据结构。get方法的逻辑很简单:从threadLocal中取一个与当前线程对应的stack,然后从stack中pop出一个类型为DefaultHandler的handler,如果handler为空,就生成一个handler,并且将生成的对象保存到handler.value中。从这短短几行代码中我们可以确定几个事实:
- Recycler的stack是线程共享变量,也就是说每个线程的stack互不影响
- 对象保存在handler的value属性中,handler绑定在一个stack上,代码如下。所以可以通过handler找到对应的stack,完成一些操作(如回收)
private static final class DefaultHandle<T> implements Handle<T> { int lastRecycledId; int recycleId; boolean hasBeenRecycled; Stack<?> stack; Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } stack.push(this); } }
接下来我们看看Stack是怎样实现的,起到了什么作用
Stack
Stack的属性中比较重要的有:
- threadRef,是一个WeakReference<Thread>,保存了绑定线程的引用。用弱引用的原因是:如果线程死去,那么此处如果仍有强引用的话,会造成该线程无法被GC回收
- elements,是一个DefaultHandler的数组,而DefaultHandler在上文提到过,用来保存生成的对象。因为Stack是一个栈的数据结构,我们不难猜到这个数组就是用来保存栈中的元素
- head、cursor、prev,这三个属性类型都是WeakOrderQueue,也是一个内部类,下文会详细说明,此处不再阐述。从类名来看,应该是一个队列;而从这三个属性的名字来看,head似乎是一个链表的头结点,cursor、prev像是遍历中当前结点和前一个结点的引用
既然是栈,那么一定有两个操作:
- Stack#push:
void push(DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (threadRef.get() == currentThread) { // The current Thread is the thread that belongs to the Stack, we can try to push the object now. pushNow(item); } else { // The current Thread is not the one that belongs to the Stack // (or the Thread that belonged to the Stack was collected already), we need to signal that the push // happens later. pushLater(item, currentThread); } }
从代码中可以看到,当前线程与stack绑定的线程一致时,调用的是pushNow:
private void pushNow(DefaultHandle<?> item) { if ((item.recycleId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; if (size >= maxCapacity || dropHandle(item)) { // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); } elements[size] = item; this.size = size + 1; }
Stack#pushNow的逻辑很简单,就是将对象放到数组第一个空位置。接下来看当stack绑定线程与当前线程不一致时,调用的pushLater:
private void pushLater(DefaultHandle<?> item, Thread thread) { if (maxDelayedQueues == 0) { // We don't support recycling across threads and should just drop the item on the floor. return; } // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // Check if we already reached the maximum number of delayed queues and if we can allocate at all. if ((queue = newWeakOrderQueue(thread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } queue.add(item); }
这里我们看到了"DELAYED_RECYCLED"属性,它也是一个线程共享变量,但保存的元素类型不是Stack,而是一个Map<Stack, WeakOrderQueue>。pushLater会从DELAYED_RECYCLED中取一个与当前线程对应的map,然后将stack本作为键,从Map中取出一个WeakOrderQueue。当取出的WeakOrderQueue为null时,就会生成一个新的queue,并放到map中。当取出的WeakOrderQueue不为null时WeakOrderQueue的add方法将元素添加进去,看来核心还是对WeakOrderQueue的操作。这里我们来看按一下newWeakOrderQueue方法,看看这个WeakOrderQueue是怎么被创建的:
- Stack#newWeakOrderQueue
private WeakOrderQueue newWeakOrderQueue(Thread thread) { return WeakOrderQueue.newQueue(this, thread); }
- WeakOrderQueue#newQueue
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) { // We allocated a Link so reserve the space if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) { return null; } final WeakOrderQueue queue = new WeakOrderQueue(stack, thread); // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so // may be accessed while its still constructed. stack.setHead(queue); return queue; }
这里我们继续跟一下stack的setHead方法
- Stack#setHead
synchronized void setHead(WeakOrderQueue queue) { queue.setNext(head); head = queue; }
这是个同步方法,传入的queue将原来的head串在自己后面,然后head重新设置为传入的queue。因为这一系列方法都是在pushLater中调用的,所以当push元素时,如果当前线程不是stack绑定的线程,会生成(如果之前没有生成过)一个queue串在stack上。而stack绑定的线程,可以通过这个链,找到别的线程push入栈的元素
- Stack#pop:
DefaultHandle<T> pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; if (size <= 0) { // double check, avoid races return null; } } size --; DefaultHandle ret = elements[size]; elements[size] = null; // As we already set the element[size] to null we also need to store the updated size before we do // any validation. Otherwise we may see a null value when later try to pop again without a new element // added before. this.size = size; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; return ret; }
pop的逻辑看上去也比较简单,当调用scavenge返回false时,就从elements数组中弹出末尾元素。因为scavenge方法涉及到对head、cursor、prev的操作,所以我们不得不开始分析WeakOrderQueue
WeakOrderQueue
WeakOrderQueue有两个内部类:
- Link
static final class Link extends AtomicInteger { final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY]; int readIndex; Link next; }
看上去就是一个链表,保存一个类型为int的读索引。Link有一个名为elements的属性,类型DefaultHandler的数组,所以WeakOrderQueue保存的元素一定是在Link中
- Head
private static final class Head { private final AtomicInteger availableSharedCapacity; Link link; Head(AtomicInteger availableSharedCapacity) { this.availableSharedCapacity = availableSharedCapacity; } ... }
这里省略了一些Head的代码,Head保存了一个Link的引用,看上去是一个链表的头结点
介绍完这两个内部类,来看看WeakOrderQueue的属性:
- head,类型为Head
- tail,类型为Link
- next,类型为WeakOrderQueue
看到这里我想大家都明白了,我们被"Queue"这个名字欺骗了,实际上WeakOrderQueue就是一个链表,通过next指针串起来。至于head和link的作用,我们需要回到上面分析Stack时跳过的两个方法:WeakOrderQueue的add和Stack的scavenge方法
- WeakOrderQueue#add
void add(DefaultHandle<?> handle) { handle.lastRecycledId = id; // While we also enforce the recycling ratio one we transfer objects from the WeakOrderQueue to the Stack // we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast // without control if the Stack if (handleRecycleCount < interval) { handleRecycleCount++; // Drop the item to prevent recycling to aggressive. return; } handleRecycleCount = 0; Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { Link link = head.newLink(); if (link == null) { // Drop it. return; } // We allocate a Link so reserve the space this.tail = tail = tail.next = link; writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1); }
这里有几个逻辑:
-
当handleRecycleCount达到interval时,方法直接返回了,元素被丢弃了。从注释中也可以看到这一点,目的是为了控制queue的大小
-
添加是从tail开始,tail的类型是Link,继承了AtomicInteger,所以它有一个计数的作用:
-
当tail的计数达到一个上限(LINK_CAPACITY)时,就新生成一个Link,将原tail的next指向这个新的Link(串起来),然后将WeakOrderQueue的tail指向新的Link。从这里我们知道了WeakOrderQueue的tail的作用,就是永远指向这一串Link的尾部
-
没达到上限时,就直接写入到tail的elements中,这正好证实了我们在分析Link时对存入元素位置的结论。最后tail的计数加一,表明当前写到elements的哪个位置了,正好对应了writeIndex这个变量名
-
-
- Stack#scavenge
private boolean scavenge() { // continue an existing scavenge, if any if (scavengeSome()) { return true; } // reset our scavenge cursor prev = null; cursor = head; return false; }
我们发现调用了scavengeSome,继续跟进去
- Stack#scavengeSome
private boolean scavengeSome() { WeakOrderQueue prev; WeakOrderQueue cursor = this.cursor; if (cursor == null) { prev = null; cursor = head; if (cursor == null) { return false; } } else { prev = this.prev; } boolean success = false; do { if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.getNext(); ... } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }
可以看到核心是调用了transfer方法,继续跟
- WeakOrderQueue#transfer
// transfer as many items as we can from this queue to the stack, returning true if any were transferred boolean transfer(Stack<?> dst) { Link head = this.head.link; if (head == null) { return false; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } head = head.next; this.head.relink(head); } final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle<?> element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } srcElems[i] = null; if (dst.dropHandle(element)) { // Drop the object. continue; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. this.head.relink(head.next); } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false; } dst.size = newDstSize; return true; } else { // The destination stack is full already. return false; } }
从最上面的注释可以看到,这个方法会将尽可能多的元素从queue中转移到stack上。一切是从head.link开始
Link head = this.head.link; if (head == null) { return false; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } head = head.next; this.head.relink(head); }
如果readIndex达到上限(LINK_CAPACITY),那么就跳转到head.next,并且调用queue的head的relink方法,传入参数是新的head(head.next),看看relink方法:
- Head#relink
void relink(Link link) { reclaimSpace(LINK_CAPACITY); this.link = link; }
逻辑很简单,初始化容量,并且将link指向新的head。从这里我们知道了WeakOrderQueue的head的作用:head的link属性,永远指向Link链的头部,与tail正好对应起来。继续分析transfer方法:
final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle<?> element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } srcElems[i] = null; if (dst.dropHandle(element)) { // Drop the object. continue; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. this.head.relink(head.next); } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false; } dst.size = newDstSize; return true; } else { // The destination stack is full already. return false; }
这里的逻辑大致是:从head.link指向的Link的readIndex位置开始,读取其elements数组,然后放到stack的elements数组中,然后更新stack的size。至此,核心代码已经解析完毕
总结
我知道很多人看到这里还是对总体的结构没什么概念,这里我画了一张图,可以帮助理解Recycler即期内部类之间的关系:
Recycler的实现有许多巧妙之处:
- Link这个内部类中保存了一个int型的readIndex来表示读索引,而其写索引并不是保存了另一个int型元素实现,而是通过继承AtomicInteger。一方面,当Link中写入元素时,可能是多个线程写入,所以写索引必须是线程安全的;另一方面读取Link中的元素时,由于读取时stack是从threadLocal取出的,所以只可能当前线程操作这个stack,所以readIndex只需要普通的int即可
- 当多个非当前线程push元素时,各个线程都互不影响,因为其对应的WeakOrderQueue都保存在线程共享变量中。如果在A、B线程中同时调用与线程C绑定的stack的push,那么从DELAYED_RECYCLED取出的map都是线程独有的,所以map保存的WeakOrderQueue并不冲突。而为了把这些WeakOrderQueue联系起来,又通过stack的head属性,将这些queue串起来。真正需要加锁的地方,只有setHead这一步,所以效率很高
标签:Netty,head,return,stack,源码,Link,Stack,WeakOrderQueue,浅析 来源: https://www.cnblogs.com/cedriccheng/p/14397148.html