其他分享
首页 > 其他分享> > ArrayBlockingQueue

ArrayBlockingQueue

作者:互联网

目录
阻塞队列可以用于线程池的等待队列,生产者消费者的通信通道,本文讲解ArrayBlockingQueue。参考Collection之BlockingQueue)

根据类名,可以知道这个数据结构是队列,因此数据的进出顺序是FIFO;阻塞的含义为,当需要生产/消费时,队列没有空间/数据,则对应的操作会阻塞住,直到其他操作解除这个阻塞状态

ArrayBlockingQueue作为阻塞队列,特点是内部使用Object数组(使用成了环形),并且创建时需要指定大小,有两个指针分别对应生产和消费操作

里面的迭代器弄个专题一起看吧

成员变量:

/** 队列里元素数量 */
int count;
/** 存储结构 */
final Object[] items;

/** 为下一次执行 take, poll, peek or remove 操作提供的index */
int takeIndex;

/** 为下一次执行 put, offer, or add 操作提供的index */
int putIndex;

/** 进行并发控制,以及线程间通信的锁(通信主要靠condition实现) */
final ReentrantLock lock;

/** 当队列为空时获取等待 */
private final Condition notEmpty;

/** 当队列满时插入等待 */
private final Condition notFull;

主要方法:

// 在不超过队列容量的情况下立即插入指定的元素,成功后返回true,如果队列已满则抛出IllegalStateException。
public boolean add(E e)

// 在不超过队列容量的情况下立即在队列末尾插入指定的元素,如果成功则返回true,如果队列已满则返回false。此方法通常比add(E)方法更好,后者插入元素失败只能抛出异常。
public boolean offer(E e)

// 将指定的元素插入到此队列的末尾,如果队列已满则等待直到有可用的空间。
public void put(E e) throws InterruptedException

// 将指定的元素插入到此队列的末尾,如果队列已满,则在指定的超时时间之内等待空间可用,超时返回false。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

// 检索并删除此队列的头,如果此队列为空,则返回null。
public E poll()

// 检索并删除此队列的头,如有必要则等待,直到某个元素可用为止。
public E take() throws InterruptedException

// 检索并删除此队列的头,如果有必要则在指定的等待时间之内等待元素可用,超时返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException

// 检索但不删除此队列的头,或在此队列为空时返回null。
public E peek()

// 返回此队列中的元素数量。
public int size()

// 返回此队列在理想情况下(在没有内存或资源约束的情况下)可以不阻塞地接受的新元素的数量。它总是等于这个队列的初始容量减去这个队列的当前大小。
public int remainingCapacity()

// 如果指定元素存在,则从此队列中移除该元素的单个实例。更正式地说,如果队列中包含一个或多个这样的元素,则只删除匹配到的第一个元素
public boolean remove(Object o)

// 如果此队列包含至少一个指定的元素,则返回true。
public boolean contains(Object o)

// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列。返回的数组将是“安全的”,因为此队列不维护对它的引用。
public Object[] toArray()

// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列;返回数组的运行时类型是指定数组的运行时类型。
public <T> T[] toArray(T[] a)

// 返回此集合的字符串表示形式。
public String toString()

// 删除此队列中的所有元素。此调用返回后,队列将为空。
public void clear()

// 从此队列中删除所有可用元素并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效。在试图将元素添加到集合c时遇到失败抛出相关异常时可能会导致:元素不在原集合或者集合c中,或者两个集合中都没有。
public int drainTo(Collection<? super E> c)

// 从该队列中最多删除给定数量的可用元素,并将它们添加到给定集合中。异常情况同上
public int drainTo(Collection<? super E> c, int maxElements)

// 按适当的顺序返回此队列中元素的迭代器。元素将按从第一个(head)到最后一个(tail)的顺序返回。返回的迭代器是弱一致的。
public Iterator<E> iterator()

// 返回该队列中元素的Spliterator。返回的spliterator是弱一致的。
public Spliterator<E> spliterator()

方法中能和阻塞联系起来的,是put和take,同时offer和poll,也提供了对应的超时控制,重点关注这四个方法

checkNotNull(Object v)

    /**
     * Throws NullPointerException if argument is null.
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

enqueue(E x)

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //获取到Object[]的引用
        final Object[] items = this.items;
        //将putIndex位置上位置为x
        items[putIndex] = x;
        //把数组当成了一个环形的去使用
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //通知说现在队列有数据了
        notEmpty.signal();
    }

put(E e)

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        //对e进行非空判断
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //核心是这个while循环,阻塞在notFull上,等待notFull.signal
            //关于conditon,还需要再搞搞AQS的源码,不然不懂为啥这里是while,不能用if
            while (count == items.length)
                notFull.await();
            //将e入队
            //这里为什么不需要对迭代器进行操作呢?
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

offer(E e, long timeout, TimeUnit unit)

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * up to the specified wait time for space to become available if
     * the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                //这个方法也是在AQS里面实现的,后面再专门弄下吧
                //我猜测是等待nanos时间,到点了就返回一个《=0的值
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

dequeue()

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //保存takeIndex对应的数据
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //当作一个环形数组使用
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //这里对我来说也是很不熟练的地方,关于迭代器,后面也得专门弄一弄,比如ArrayList,Map之类的
        if (itrs != null)
            itrs.elementDequeued();
        //发送notFull的信号
        notFull.signal();
        //返回保存的中间结果
        return x;
    }

take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //当前队列没有数据,则等待直到notEmpty发来信号
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

poll(long timeout, TimeUnit unit)

和前面offer一样,都只是加上了一个等待时间的机制

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

removeAt(final int removeIndex)

    /**
     * Deletes item at array index removeIndex.
     * Utility for remove(Object) and iterator.remove.
     * Call only when holding lock.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        //如果要删除的下标刚好是takeIndex,当作一次普通的出队即可
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            //对迭代器进行操作
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            //把removeIndex后面的数据,都向前挪动一位
            for (int i = removeIndex;;) {
                int next = i + 1;
                //根据环形队列,来获取i的next下标
                if (next == items.length)
                    next = 0;
                //如果next不是putIndex,说明不为null,向前挪动
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    //此时 i == putIndex,将i处的数据置为null(上一步已经向前挪动了)
                    //将putIndex更新为i
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            //更新迭代器
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        //发送notFull的信号
        notFull.signal();
    }

remove(Object o)

特定删除,服用了removeAt()方法

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    //先找到o对应的下标i
                    if (o.equals(items[i])) {
                        //针对具体的下标,进行删除
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex) //如果i遍历到了putIndex,说明队列中没有o;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

标签:putIndex,队列,lock,public,items,ArrayBlockingQueue,final
来源: https://www.cnblogs.com/zjytrhy/p/15173889.html