编程语言
首页 > 编程语言> > BlockingQueue源码分析

BlockingQueue源码分析

作者:互联网

一、阻塞队列简介

队列常被用来解决生产——消费者问题,Java中定义了Queue接口以及通用的一些抽象方法

public interface Queue<E> extends Collection<E> {
    // 添加一个元素,添加成功返回true,如果队列满了就抛出异常
    boolean add(E e);

    //添加一个元素,添加成功返回true,如果队列满了返回false
    boolean offer(E e);

    // 删除并返回队首元素,队列为空则抛出异常
    E remove();

    // 移除并返回队首元素,队列为空则返回null
    E poll();

    // 返回队首元素,但并不移除,队列为空则抛出异常
    E element();

    // 返回队首元素,但并不移除,队列为空则返回null
    E peek();
}

上面所列举出来的只是普通的队列的通用方法,而Java中的阻塞队列BlockingQueue,继承了Queue接口,同时又添加了两个具有阻塞功能的抽象方法,同时又提供了offer()poll两个可阻塞的重载方法:

通过下面阻塞方法的定义可以看出,只要是会被阻塞的方法,都会抛出InterruptedException异常

public interface BlockingQueue<E> extends Queue<E> {
    // 添加元素,队列满时,插入线程会被阻塞,直到队列不满
    void put(E e) throws InterruptedException;
    
    // 移除并返回元素,队列为空时,获取元素线程会被阻塞,直到队列非空
    E take() throws InterruptedException;
    
	// 可以指定添加元素时线程被阻塞的超时时间
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 可以指定获取元素时线程被阻塞的超时时间
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
}

BlockingQueue的常用方法做一个归纳如下:

方法抛出异常返回特定值阻塞指定阻塞时间
入队add(e)offer(e)put(e)offer(e,time,unit)
出队remove()poll()take()poll(time,unit)
获取队首元素element()peek()不支持不支持

阻塞队列除了具有可阻塞的特性之外,还有另外一个重要的特性就是容量大小,分为有界和无界。没有绝对意义的上的无界,只是这个界限很大,可以放很多元素。以LinkedBlockingQueue为例,它的容量大小为Integer.MAX_VALUE,这是一个非常大的数字,我们通常认为它就是无界的。也有一些阻塞队列是有界的,比如ArrayBlockingQueue,如果达到最大容量之后,也不会进行扩容。所以一旦满了就无法再往里面放数据了。

BlockingQueue同时也是线程安全的,它可以保证多线程的情况下,保证生产者和消费者的线程安全,其内部大多都是采用CASReentrantLock来保证线程安全,业务代码无需再关注多线程安全的问题,直接向队列里面放或者取就可以了,如图所示:

同时,阻塞队列还启动了资源隔离的作用,在复杂业务中,业务A完成后,只需要将结果丢到队列中即可,不需要关心后面的步骤,业务B会从队列中获取任务来执行对应的业务,实现了业务之间的解耦,也可以提高安全性。

下面就介绍一些常用的阻塞队列和部分核心源码

二、常用阻塞队列及核心源码分析

2.1 ArrayBlockingQueue

ArrayBlockingQueue是一个典型的有界的线程安全的阻塞队列,初始化时需要指定其容量大小,其内部元素使用数组进行存储,以put()方法为例,使用ReentrantLock来保证线程安全,通过条件队列的两个条件notEmptynotFull来进行阻塞和唤醒

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;
    int takeIndex;
    int putIndex;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 加锁保证线程安全
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 使用while而不是if,是为了防止虚假唤醒
            while (count == items.length)
                // 队列满了,生产者阻塞
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // 如果添加一个元素队列满了之后,会被putIndex置为0,典型的环形数组的实现
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 条件队列转同步队列并唤醒线程
        notEmpty.signal();
    }
}

由于ArrayBlockingQueueput()take()方法使用ReentrantLock进行同步,同时只有一个方法可以执行,所以在高并发的情况下,性能会比较差。

思考ArrayBlockingQueue为什么采用双指针环形数组的方式?

普通的数组,删除数组元素时需要进行移位操作,导致它的时间复杂度为O(n),而采用双指针环形数组,不需要进行移位,只需要分别移动两个指针即可。

2.2 LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    
    static class Node<E> {
        E item; // 元素内容
        Node<E> next; // 下一个元素节点 单链表结构
        Node(E x) { item = x; }
    }

    // 初始化容量,默认Integer.MAX_VALUE
    private final int capacity;
    // 元素个数,因为读写操作的锁分离,这里使用线程安全的计数变量
    private final AtomicInteger count = new AtomicInteger();
    // 链表头,本身不存储元素信息,其item为null
    transient Node<E> head;
	// 链表尾元素
    private transient Node<E> last;
	// 获取元素的锁,锁分离,提高效率
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
	// 添加元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 使用put锁,可以被中断
        putLock.lockInterruptibly();
        try {
            // 队列满了,阻塞生产者线程
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            // 返回旧值
            c = count.getAndIncrement();
            // 可能有很多线程阻塞在notFull这个条件上,而取元素时才会唤醒notFull,此处不用等到取元素时才唤醒
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 队列之前为空,现在新增了一个元素后,可以直接去唤醒获取元素的线程
        if (c == 0)
            signalNotEmpty();
    }

}

LinkedBlockingQueue与ArrayBlockingQueue对比

2.3 LinkedBlockingDeque

LinkedBlockingDeque是对LinkedBlockingQueue的增强,其顶层接口为Deque,该接口定义了更加丰富的操作队列的方法,通过方法名就可以看出来,这些方法打破了队列先进先出的固有规则,提供了可以从头部或者尾部操作的API

public interface Deque<E> extends Queue<E> {
    void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    E removeFirst();
    E removeLast();
    E pollFirst();
    E pollLast();
    E getFirst();
    E getLast();
    E peekFirst();
    E peekLast();
}

BlockingDeque接口继承了Deque,同时又提供了几个可阻塞的方法

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    void putFirst(E e) throws InterruptedException;
    void putLast(E e) throws InterruptedException;
    E takeFirst() throws InterruptedException;
    E takeLast() throws InterruptedException;
}

LinkedBlockingDeque实现了BlockingDeque接口,其内部通过双向链表来记录元素,通过一把ReentrantLock来保证线程安全,该类可以看成是ArrayBlockingQueueLinkedBlockingQueue的结合与增强

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {

    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;
        Node(E x) {
            item = x;
        }
    }
    transient Node<E> first;
    transient Node<E> last;
    private transient int count;
    private final int capacity;
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

    private boolean linkFirst(Node<E> node) {
        // 超过容量,直接返回
        if (count >= capacity)
            return false;
        Node<E> f = first;
        node.next = f;
        first = node;
        if (last == null)
            last = node;
        else
            f.prev = node;
        ++count;
        // 唤醒被阻塞的获取元素的线程
        notEmpty.signal();
        return true;
    }
}

2.4 SynchronousQueue

SynchronousQueue是一个没有缓冲的BlockingQueue,生产者线程对元素的插入操作put()必须等待消费者的移除操作take(),其模型如下图:

如上图所示,SynchronousQueue最大的不同在于,它的容量为0,没有地方来缓存元素,这就导致了每次添加元素都会被阻塞,直到有线程来取元素;同理,取元素也是一样,取元素的线程也会被阻塞,直到有线程添加元素。

由于SynchronousQueue不需要持有元素,它的作用在于直接传递,所以它非常适用于传递性场景做交换工作,生产者线程和消费者线程同步传递某些信息、事务或任务

SynchronousQueue常见的一个场景就是在Executors.newCachedThreadPool()中,因为不确定生产者的请求数量(创建任务),而这些请求又需要被及时处理,那么使用SynchronousQueue为每个生产者线程分配一个消费者线程就是处理效率最高的方式。线程池会根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60s之后会被回收。

下面结合源码来分析它的实现原理:

SynchronousQueue内部抽象类Transferer提供了任务传递的方法transfer(),而该方法内部包含了线程阻塞与唤醒的逻辑,而Transferer有两个实现类TransferQueueTransferStack,可以理解为存储阻塞线程的方式有两种:队列和栈。根据这两者的特性,可以分为公平和非公平的实现,队列满足FIFO(先进先出)的特性,所以是公平的实现;而栈满足LIFO(后进先出)的特性,所以是非公平的实现。

下面SynchronousQueue的构造方法,提供了公平和非公平的选项,默认为非公平实现

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

下面以TransferQueue为例简要分析元素入队和出队的操作,SynchronousQueueput()take()都会去调用transfer()方法添加元素或获取元素

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

下面分析TransferQueuetransfer()方法,在分析该方法之前,先看一下它的内部类QNodeTransferQueue中使用QNode来记录元素和被阻塞的线程,其中还利用UNSAFE来获取元素和下一个节点的偏移量,直接通过CAS修改对应的数值,QNode中还有很多的CAS方法这里没有一一列举出来。

static final class TransferQueue<E> extends Transferer<E> {
    static final class QNode {
        // 下一个节点的指针
        volatile QNode next;          // next node in queue
        // 元素内容
        volatile Object item;         // CAS'ed to or from null
        // 被阻塞的线程
        volatile Thread waiter;       // to control park/unpark
        // 用于区分节点类型,false表示为取元素,true为添加元素
        final boolean isData;

        // 通过CAS修改下一个节点(多线程安全)
        boolean casNext(QNode cmp, QNode val) {
            return next == cmp &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // QNode属性的偏移量
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                // 根据Unsafe类计算属性在QNode类中的偏移量
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = QNode.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }   
    }

    // 头节点
    transient volatile QNode head;
    // 尾节点
    transient volatile QNode tail;

    // 初始化时就创建一个元素为null,数据类型为false的节点,头尾节点都指向该该节点
    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }

    // 计算头尾节点的偏移量,通过CAS直接修改(保证线程安全)
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long cleanMeOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
            cleanMeOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("cleanMe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

上面介绍了TransferQueue大致的内部构造,下面重点看transfer()方法实现,

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        // 自旋等待初始化完成
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin
		// 为空或者当前节点
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 防止其他线程修改,这里再次判断
            if (t != tail)                  // inconsistent read
                continue;
            // 如果当前尾节点后面还有节点,则通过CAS把后面的节点修改为尾节点
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            // 如果需要超时阻塞,但超时时间小于0(不能阻塞),直接返回null
            // put或take方法中中断该线程并抛出中断异常
            if (timed && nanos <= 0)        // can't wait
                return null;
            // 创建一个节点,通过CAS添加到尾节点后面,这个节点可以是取元素的节点,也可以是添加元素的节点
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;
			// 新的节点添加完成之后,通过CAS将其修改为尾节点
            advanceTail(t, s);              // swing tail and wait
            // 自旋阻塞线程 下面重点介绍
            Object x = awaitFulfill(s, e, timed, nanos);
            
            // 如果返回的节点为当前节点,表示该节点被取消了,直接清除掉
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            // 队列不为空,且新的节点类型与队列里面的节点类型不一致(说明可以唤醒线程了)
            QNode m = h.next;               // node to fulfill
            
            // 为了保证线程安全,再次判断自旋
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            // 如果m节点已经被别的线程处理了,这里就修改头节点自旋
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
			// 当前线程去修改头节点,阻塞线程节点出队
            advanceHead(h, m);              // successfully fulfilled
            // 唤起m节点被阻塞的线程
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

transfer()中有一个重要的方法awaitFulfill,它会去进行自旋阻塞

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 根据处理器的核数计算自旋次数
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果当前线程被中断了,就修改S节点的item属性为当前节点
        //然后在判断节点是否被取消时就直接判断其item值是否为当前节点即可
        if (w.isInterrupted())
            s.tryCancel(e);
        // 当节点取消就返回
        Object x = s.item;
        if (x != e)
            return x;
        // 过了超时时间就取消
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋,达到一定次数之后,填充S节点的waiter属性为当前线程,然后就阻塞
        // 至此一个节点的内容就完整了
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

2.5 PriorityBlockingQueue

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,虽然它是无界的,但在初始化的时候,它是可以指定数组初始化容量的,它的无界是基于它可以进行动态扩容而言的。

如果没有指定初始化容量,它默认的容量为11,最大容量为Integer.MAX_VALUE - 8

private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

同时,PriorityBlockingQueue是一个优先级队列,它每次出队都会返回优先级最高或最低的元素,它的构造方法中提供了自定义Comparator比较器,默认情况下使用自然顺序升序排序。

通过下面的构造方法也可以看出,该队列线程安全是由ReentrantLock来保证的,同时需要注意的是PriorityBlockingQueue不能保证同等优先级元素的顺序

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

那么PriorityBlockingQueue如果只是简单的使用数组操作来对插入元素移除进行排序,其性能将是非常低的,而它采用的是最大最小堆的方式来插入或移除数据,大小堆只是逻辑上的一种操作方式而已,其储存结构依然是数组

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序

二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。

最大最小堆满足以下特性:

下图展示了最小二叉堆的情况:

最大最小堆按照从上到下,从左到右来一次表示索引位置,上图中右下角的数字表示该元素在数组中的索引下标

在最大最小二叉堆中,插入或移除元素时,都可能涉及到元素位置调整,而在二叉堆中,利用元素的下标索引,可以很简单的计算其父节点以及左右节点的下标(以索引下标为t的元素为例):

父节点:P(t) = (t-1) >>> 1 <=> (t-1)/2

左节点:L(t) = t <<< 1 +1 <=> t*2 +1

右节点:R(t) = t <<< 1 + 2 <=> t*2 +2

下面结合源码分析它是如何添加和移除元素的

由于PriorityBlockingQueue是无界队列,所以添加元素时线程不需要阻塞,容量不够进行扩容就可以了

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 加锁保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果已经达到当前容量就进行扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        
        if (cmp == null)
            // 需要注意如果没有执行比较器,元素类必须实现Comparable接口
            siftUpComparable(n, e, array);
        else
            // 指定了比较器,就是用自定义的来做比较
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 添加元素后,直接唤醒被阻塞的获取元素的线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

扩容的代码如下,tryGrow()方法在offer()方法的while循环体内部,就实现了CAS+自旋的方式来实现线程安全的扩容

private void tryGrow(Object[] array, int oldCap) {
    // 释放锁,下面通过CAS来进行扩容
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 如果原来的容量小于64,则容量就扩大一倍再+2,否则容量直接扩大为原来的三倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 容量不能超过最大值
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // 其他线程在扩容时,当前线程就让出CPU
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

核心的方法在于siftUpComparable()siftUpUsingComparator()这两个方法,这两个方法才是二叉堆入队的核心方法,以siftUpUsingComparator()为例来分析

这里面是一个while循环,进行元素的上浮操作,每次都是获取当前节点的父节点,然后与插入的元素进行比对,如果比较的结果满足最大最小堆的结构,就直接退出循环,否则就换位,继续进行比较,知道满足条件

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

获取元素的代码如下,因为获取元素的线程会被阻塞,所以这个方法会抛出中断异常

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            // 如果没有元素就进行阻塞
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    // 最后一个元素的索引下标
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        // 取出最后一个元素
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            // 第一次进来时,取得是第二层的两个节点进行比较
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            // 如果左节点与右节点比较,满足比较条件,就把右节点的值作为与最后一个节点比较的值
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

2.6 DelayQueue

DelayQueue是一个支持延时获取元素的阻塞队列,内部采用PriorityQueue存储元素,同时元素必须实现Delayed接口,接口的getDelay()方法可以返回延时时间延时的时间,方法参数为时间工具类TimeUnit

在获取元素是,只有延迟时间到了才能从队列中提取元素。

延迟队列的特点:并不是先进先出,而是按照延迟时间的长短进行排序,下一个被执行的任务排在队列的最前面

由于队列元素必须实现Delayed接口,而该接口又继承自Comparable接口,所以,元素类还要去实现compareTo()方法,这样在创建队列时就不需要在额外创建Comparator对象了,元素本身就具有了排序的能力。

下面定义了一个元素类

class DelayObject implements Delayed {
    private String name;
    private long time;   //延时时间

    public DelayObject(String name, long delayTime) {
        this.name = name;
        this.time = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = time - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed obj) {
        if (this.time < ((DelayObject) obj).time) {
            return -1;
        }
        if (this.time > ((DelayObject) obj).time) {
            return 1;
        }
        return 0;
    }
}

使用Demo:

//实例化一个DelayQueue
BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();

//向DelayQueue添加2个元素对象,注意延时时间不同
blockingQueue.put(new DelayObject("lizhi", 1000 * 10));  //延时10秒
blockingQueue.put(new DelayObject("linan", 1000 * 30));  //延时30秒

//  取出lizhi
DelayObject lizhi = blockingQueue.take();
// 取出linan
DelayObject linan = blockingQueue.take();

下面看一下DelayQueue的构造,使用ReentrantLock来保证线程安全,取元素需要进行阻塞,底层使用PriorityQeue进行存储,这是一个优先级队列,与上面PriorityBlockingQueue是一样的,只是没有阻塞功能

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();

下面看一下具体的put()take()

public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁保证线程安全
    lock.lock();
    try {
        // 调用PriorityQueue添加元素
        // 与PriorityBlockingQueue的逻辑基本一致
        q.offer(e);
        // 如果当前队列只有这一个元素,就去唤醒阻塞的线程
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

下面是take()方法,要比put()方法复杂一些

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁保证线程安全,由于线程可能会被阻塞,所以这里可中断
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 取出队列第一个元素,如果没有,直接让当前线程阻塞
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                // 如果延迟时间已到,直接取出该元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 如果有线程已经在阻塞了,就让当前线程直接去阻塞
                if (leader != null)
                    available.await();
                else {
                    // 没有线程阻塞,则记录当前线程,然后让当前线程阻塞,阻塞的时间等于最近元素的延迟时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 当前线程被唤醒后,重置leader,然后自旋
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 出队成功后,如果leader为空,并且当前对了还有元素,就去唤醒下一个被阻塞的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

延迟队列的应用场景**:**

三、选择合适的阻塞队列

我们接触的比较多的就是线程中使用阻塞队列,线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。

注:ScheduledThreadPool中使用的阻塞队列并不是DelayQueue,而是自定义实现的DelayedWorkQueue

一般从以下几个维度来选择合适的阻塞队列

标签:分析,队列,元素,阻塞,源码,线程,null,final,BlockingQueue
来源: https://blog.csdn.net/sermonlizhi/article/details/123237387