其他分享
首页 > 其他分享> > 队列总结(五)LinkedBlockingQueue

队列总结(五)LinkedBlockingQueue

作者:互联网

LinkedBlockingQueue

阻塞队列BlockingQueue接口的常用实现之一,基于链表的可选(可手动指定队列长度)有界阻塞队列

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

构造方法,通过设置属性capacity来限制队列长度,默认容量为Integer.MAX_VALUE。并用属性count 来记录队列当前元素个数。


    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     * 头结点的item(实际入队元素)始终为null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     * 尾结点的next (后继节点)始终为null
     */
    private transient Node<E> last;
    
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 注意这里,构造了链表的头尾节点,都指向一个属性item为null的伪节点
        last = head = new Node<E>(null);
    }

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

锁相关,两组锁,分别对应(take,poll)与(put,offer)操作

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

对比ArrayBlockingQueue可以发现,ArrayBlockingQueue中只使用了一把锁,添加和移除共用这把锁进行并发控制,所以这两个操作是互斥的,不能同时进行。而LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,添加和删除操作可以同时进行,所以吞吐量大大提高了。也因此,LinkedBlockingQueue在大多数并发应用程序中的可预测性能较差。

至于LinkedBlockingQueue的实现原理图与ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。

但是如果没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。

通过观察入队出队操作来理解两把锁的应用:

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            // 疑惑点1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
         // 疑惑点2
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

逻辑比较简单,线程入队操作enqueue分析,这里盗用一张图,来看看往队列里依次放入元素A和元素B的过程。之前说过了,队列的头结点为一个item为null的伪节点。
在这里插入图片描述对于offer方法,主要的疑惑点有两点:

  1. 为什么添加完成后要继续唤醒在条件对象notFull上的添加线程?

在ArrayBlockingQueue中,添加完成后,并不会再次检测队列容量是否已满,从而去继续唤醒其他添加线程,这是因为ArrayBlockingQueue的添加操作和移除操作是同一把锁控制,是互斥的,即添加过程中元素个数是不会减少的,所以不存在唤醒其他添加线程的必要。同理,添加完成后也不能唤醒其他添加线程,那样会造成添加线程和消费线程的争抢锁行为,可能导致永远无法消费。

而在LinkedBlockingQueue中,添加和移除操作是可以同时进行,不会互斥的。稍后你会看到,在移除方法中,只有当队列是满的时候才会进行唤醒添加线程,且移除操作也是会自己唤醒其他的移除线程,这就意味着可能进行了多次移除操作,只进行了一次唤醒,所以自我唤醒就是很有必要的了。

那么为什么移除操作仅当队列满的时候才对添加线程进行唤醒呢?一是因为唤醒是需要加锁的,这样做可以减少锁的次数。二是因为正常来讲,当队列满了,此时才有可能有多个添加线程如A,B阻塞,移除线程a唤醒其中一个A后,此时后续自我唤醒的另一个移除线程b假如先工作,经过判断队列不满,不会主动唤醒添加线程B,但是添加线程A的自我唤醒机制会唤醒B,所以整体来说,各司其职,提高了吞吐量。

  1. 为什么要判断if (c == 0)时才去唤醒消费线程呢?

消费线程被唤醒后,在队列中有数据的前提下,会自我唤醒,一直消费,所以c值是一直在变化的,c值是添加完元素前队列的大小,此时c只可能是0或c>0,如果是c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后就直接唤醒等待消费线程。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(poll、take、remove)的调用,那为什么不是条件c>0才去唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前c>0,那么很可能上一次调用消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程了,所以c>0唤醒消费线程得意义不是很大。

至此,学习LinkedBlockingQueue主要可学习的就是对锁的运用,对并发的处理,后续就不再深入了。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

LinkedBlockingQueue和ArrayBlockingQueue迥异

通过上述的分析,对于LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们的区别来个小结

  1. 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

  2. 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。

  3. 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

  4. 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

标签:总结,队列,LinkedBlockingQueue,添加,线程,移除,唤醒
来源: https://blog.csdn.net/java_lifeng/article/details/120758625