其他分享
首页 > 其他分享> > BlockingQueue 阻塞队列

BlockingQueue 阻塞队列

作者:互联网

抄录于 https://www.cnblogs.com/xieyanke/p/13441318.html

BlockingQueue原理

 

概念

BlockingQueue 翻译成中文阻塞队列,顾名思义就是线程使用队列时会阻塞当前线程;

BlockingQueue 继承了Collection,具有一般集合所具有的数据存取功能

BlockingQueue 是线程安全的队列,多线程访问时不会出现同一个数据集中的数据被多次取出,或者覆盖存放的事件

 

使用场景

可用于一个快速反馈的消息队列,无消息时阻塞线程让出CPU,有数据存入时通知线程取出数据,取完后继续阻塞,

比如用户下单后立刻在大屏上显示有客户下单,比较简单的做法是开启一个定时任务,定期扫订单表;或者接入消息中间件,下单时发送消息,大屏服务监听消息;或者借用reddis队列 解决方式有很多种不一一列举

示例模拟数据的存取 设置队列的容量为1 是为了更好展示 存取的阻塞特性

复制代码
public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(1);

    //模拟存入数据线程
    new Thread(()->{
        int i=0;
        while (true){
            try {
                //每次循环+1
                i++;
                queue.put(i);
                System.out.println("存入数据"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "存入数据线程").start();

    //模拟取出数据线程 1秒钟取一个
    new Thread(()->{
        while (true){
            try {
                //一秒钟取一个数据
                Thread.sleep(1000);
                Integer result = queue.take();
                System.out.println("取出数据"+result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "取数据线程").start();

}

打印结果:
存入数据1
取出数据1
存入数据2
取出数据2
存入数据3
取出数据3
存入数据4
取出数据4
存入数据5
取出数据5
存入数据6
取出数据6
存入数据7
取出数据7
复制代码

 

方法示例

阻塞队列的使用非常简单,基本上和普通集合一样对数据进行存和取

复制代码
public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(2);

        //存入一个数据 如果队列满了则一直阻塞到有数据取出
        queue.put(1);
        //取出一个数据 如果队列空了则一直阻塞到有数据存入
        queue.take();

        //存入一个数据 如果队列满了则阻塞若干时长(示例为10秒),超时则返回offerResult=false
        boolean offerResult = queue.offer(1, 10, TimeUnit.SECONDS);
        //取出一个数据 如果队列空了则阻塞若干时长(示例为10秒),超时则返回pollResult=null
        Integer pollResult = queue.poll(10, TimeUnit.SECONDS);
}
复制代码

 

源码分析

1、接口继承结构 

 

2、接口代码

复制代码
public interface BlockingQueue<E> extends Queue<E> {
    //向队列中添加元素, 若超过给定队列长度抛出异常
    boolean add(E e);

    //向队列中添加元素, 若超过给定队列长度抛出异常
    boolean offer(E e);

    //向队列中添加元素, 若超过队列长度则等待队列有剩余容量再加入元素
    void put(E e) throws InterruptedException;

    //向队列中添加元素, 若超过给定队列长度则等待给定时长
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //获取队列头部元素,并从队列头部移除,若队列为空,则阻塞当前获取线程,并等待新元素加入
    E take() throws InterruptedException;

    //获取队列头部元素,并从队列头部移除,若队列为空,则阻塞当前获取线程,并等待元素给定时长
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回队列剩余容量
    int remainingCapacity();

    //移除指定元素
    boolean remove(Object o);

    //返回是否存在指定元素
    public boolean contains(Object o);

    
    //将队列中的元素全部移除到给定的集合c中
    int drainTo(Collection<? super E> c);

    //将队列中的元素全部移除到给定的集合c中(最多不超过maxElements个)
    int drainTo(Collection<? super E> c, int maxElements);
}
复制代码

 

3、实现类 ArrayBlockingQueue 分析

 

复制代码
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //数据集 用于存放元素 初始化固定数组长度 不再扩容
    final Object[] items;

    //数据集下一次取数据的下标
    //具体操作为 每次take加1 若take+1==items.length即take最后一个元素
    //则takeIndex重置为0 如此往复
    int takeIndex;

    //数据集下一次存数据下标
    int putIndex;

    //数据集中 存放元素的个数 即items[i]!=null的个数
    int count;

    //重入锁 可选公平与非公平 非本文重点
    final ReentrantLock lock;

    //Condition 
    //使用流程 1取数据为空(count==0) 则阻塞等待数据集存入数据 执行等待notEmpty.await 
    //         2存数据数据集肯定不为空(count!=0), 则通知取数据线程继续取数据 执行通知notEmpty.signal
    private final Condition notEmpty;

    //Condition 
    //使用流程 1存数据数据集存满(count==items.length)则等待消耗后重新存入 执行等待notFull.await 
    //         2取数据后则数据集未满肯定不满(count<items.length) 则通知存入数据 执行通知notFull.signal 
    private final Condition notFull;

    //用户维护ArrayBlockingQueue 作为集合的迭代(Iterator)功能
    //调用ArrayBlockingQueue.iterator()是初始化此属性 非本文重点
    transient Itrs itrs = null;



    //--------------------重点方法------------------------
    

    /**
     * 从队列中取一个元素
     * @return [description]
     * @throws InterruptedException [description]
     */
    public E take() throws InterruptedException {
        //对操作进行加锁 多线程时轮流取元素
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果队列中没有对象 则阻塞线程等待
            while (count == 0)
                //重点:等待存数据的线程通知
                notEmpty.await();
            //代码运行到此处说明count!=null 执行从队列中取元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        //从数据集数组items 取出下标takeIndex的数据
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //取完数据之后 将数组对应下标应用置为空(GC对象)
        items[takeIndex] = null;
        //takeIndex+1等于数组长度表示当前下标为数组最后一个对象
        //则takeIndex重新归0
        if (++takeIndex == items.length)
            takeIndex = 0;
        //每次取数据 数据总量减1
        count--;
        //迭代器维护操作
        if (itrs != null)
            itrs.elementDequeued();
        //重点:通知存数据的线程 可以执行数据存放
        notFull.signal();
        return x;
    }


    /**
     * 存入一个数据
     * @param  e                    [description]
     * @throws InterruptedException [description]
     */
    public void put(E e) throws InterruptedException {
        //校验数据非空
        checkNotNull(e);
        //加锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //若数据集数组items满了 则阻塞线程等待
            while (count == items.length)
                //重点:等待取出数据的线程通知
                notFull.await();
            //存入数据
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        //存入数据到下标putIndex
        items[putIndex] = x;
        //如果存数据的下标已经到数据最后一个下标 则putIndex重新归0
        if (++putIndex == items.length)
            putIndex = 0;
        //数据总量加1
        count++;
        //重点:存入数据后通知等待取数据的线程
        notEmpty.signal();
    }




}
复制代码

 

 总结:

BlockingQueue 重点关注

1、阻塞方式

Condition notFull 和 Condition notEmpty 的使用,存通知取,取通知存;

从而达到存满阻塞,取完阻塞,存入通知取,取出通知存的功能

2、存取游标

takeIndex 和 putIndex的使用,每次取数据takeIndex加1,到了数据末尾则重新回到数组开始下标0,存数据原理相似逐次加1,到末尾归0

对于LinkedBlockingQueue实现方式则略有不同,链表式集合多线程取数据时只需要排队从头部节点获取,从末尾存数据,有个小优化,创建LinkedBlockingQueue

时创建一个虚拟头部节点,不做深究

标签:队列,items,阻塞,存入,线程,数据,BlockingQueue
来源: https://www.cnblogs.com/gaokeji/p/16068136.html