ArrayBlockingQueue是什么?
作者:互联网
前置知识【1】什么是线程安全?
1、线程安全:
指多个线程在执行同一段代码的时候采用加锁机制,使每次的执行结果和单线程执行的结果都是一样的,不存在执行程序时出现意外结果。
2、线程不安全:
是指不提供加锁机制保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据。
前置知识【2】什么是队列?什么是生产者消费者模式?
由图可知,多个生产者向队列里放元素,多个消费者在从队列里取出元素。那么问题来了:
- 当队列里是空的时候,消费者来取怎么办?同理当队列里满的,生产者产出元素怎么办?
- 当多个消费者/生产者同时工作时候,会不会拿到脏数据?比如:多个消费者会不会都拿到了某个元素?
ArrayBlockingQueue简介
ArrayBlockingQueue是JUC中的BlockingQueue中的用数组做数据结构的线程安全的阻塞队列。最常使用场景为生产者和消费者模式。让以上两个问题得到控制和解决。
- ArrayBlockingQueue保证线程安全。
- ArrayBlockingQueue阻塞机制保证:
- 队列空时:阻塞消费者。
- 队列满时:阻塞生产者。
代码示例生产者消费者模式
public class TestMain2 {
private static AtomicInteger count = new AtomicInteger();
//private static volatile ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10, true);
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10, true);
Object o = new Object();
//生产者
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
synchronized (o){
Integer i = new Integer(count.incrementAndGet());
queue.put(i);
System.out.println("生产者:"+Thread.currentThread().getName()+"将---"+i+"---入队");
}
}
}).start();
}
//消费者
for (int j = 0; j < 10; j++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
synchronized (o) {
Integer take = queue.take();
System.out.println("消费者:" + Thread.currentThread().getName() + "将---" + take + "---取出");
}
}
}).start();
}
}
}
ArrayBlockingQueue源码解析
1. 使用reentrantlock 对 入队 进行安全保护
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
2.notFull.await() 队列满时 的 执行阻塞和排队
实现可中断条件等待。
实现可中断条件等待。
-
如果当前线程被中断,抛出InterruptedException。
-
保存由getState返回的锁状态。
-
用保存状态作为参数调用release,如果失败抛出IllegalMonitorStateException。
-
阻塞直到收到信号或中断。
-
通过调用以保存状态作为参数的acquire的专一版本来重新获取。
-
如果在步骤4中阻塞时被中断,抛出InterruptedException。
3. AQS的CLH等待队列之外还需要一个队列
用于进入CLH队列的那些阻塞的线程的存放
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3. 取出同样 使用reentrantlock 保证线程安全,notEmpty.await() 与入队时候的一样实现可中断条件等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
标签:node,队列,lock,什么,线程,new,ArrayBlockingQueue 来源: https://blog.csdn.net/weixin_43885975/article/details/118386113