多线程-并发工具类之CyclicBarrier详解
作者:互联网
文章目录
简介
从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier 的状态后它可以被重用。之所以叫作屏障是因为线程调用await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。
CyclicBarrier是一种同步辅助工具,允许一组线程相互等待,直到达到共同的障碍点.
经常用于一组固定数量的线程必须相互等待的程序.
假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为 0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。
线程进入屏障通过CyclicBarrier的await()方法。
CyclicBarrier实例是可重复使用的:所有等待线程被唤醒的时候,任何线程再次执行CyclicBarrier.await()又会被暂停,直到这些线程中的最后一个线程执行了CyclicBarrier.await().
例子
如下例子,新建10个线程,直到10个线程都调用了await方法,即都到达屏障点后,就调用CyclicBarrier初始化时定义的方法(召唤神龙).
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
System.out.println("召唤神龙");
});
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"收集到龙珠");
cyclicBarrier.await(); //等待其他线程执行完自己的操作,当等待线程数量达到10时,会召唤神龙
} catch (Exception e) {
e.printStackTrace();
}
}, Thread.currentThread().getName()+":"+i).start();
}
如下例子:假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行。该例子利用了CyclicBarrier的可复用性.
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " step3");
} catch (Exception e) {
}
});
}
executorService.shutdown();
}
输出结果:
pool-1-thread-1 step1
pool-1-thread-3 step1
pool-1-thread-2 step1
pool-1-thread-2 step2
pool-1-thread-1 step2
pool-1-thread-3 step2
pool-1-thread-3 step3
pool-1-thread-1 step3
pool-1-thread-2 step3
在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段 1后才会开始执行阶段2。然后在阶段 2后面调用了await方法,这保证了所有线程都完成了阶段2后 ,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的 。
实现原理
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。
parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行。而count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点。
你可能会疑惑,为何维护parties和count两个变量,只使用
count不就可以了?另外别忘了CyclieBarrier是可以被复用的,使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的.如下所示:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
还有一个变量barrierCommand也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用lock首先保证了更新计数器count的原子性。另外使用lock 的条件变量trip支持线程间使用await和signal操作进行同步。
最后,在变量generation内部有一个变量broken,其用来记录当前屏障是否被打破。注意,这里的broken并没有被声明为volatile的,因为是在锁内使用变量,所以不需要声明。
private static class Generation {
boolean broken = false;
}
几个重要方法
- int await()方法
当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足 下面条件之一才会返回:
- parties个线程都调用了await()方法,也就是线程都到了屏障点;
- 其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回; - - 与当前屏障点关联的Generation对象的broken标志被设置为 true时,会抛出BrokenBarrierException异常,然后返回。
由如下代码可知,在内部调用了dowait方法。第一个参数为false,则说明不设置超时时间,这时候第二个参数没有意义。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
- int dowait(boolean timed, long nanos)方法
该方法实现了CyclicBarrier的核心功能,其代码如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//(1)如果index==O则说明所有线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//(2)执行任务
if (command != null)
command.run();
ranAction = true;
//(3)激活其他因调用await方法而被阻塞的线程,并重置CyclieBarrier
nextGeneration();
//返回
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//(4)如果index不为0
for (;;) {
try {
//没有设置超时时间
if (!timed)
trip.await();
//设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
//(7)唤醒条件队列里面阻塞线程
trip.signalAll();
// set up next generation
//重置CyclicBarrier
count = parties;
generation = new Generation();
}
当一个线程调用了dowait方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用钱程会被阻塞。然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=O所以当前线程会执行代码(4)。如果当前线程调用的是无参数的await() 方法 ,则这里timed=false,所以当前线程会被放入条件变量 的trip的条件阻塞队列,当前线程会被挂起并释放获取的lock 锁。如果调用的是有参数的await方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。
当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时己经有9个线程被放入了条件变量trip的条件队列里面。最后count=index等于 0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置 CyclicBarrier,然后这 10个线程就可以继续向下运行了。
小结
CycleBarrier与CountDownLatch的不同在于,前者是可以复用 的,并且前者特别适合分段任务有序执行的场景。
CycleBarrier其底层通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。CyclicBarrier内部使用了一个条件变量trip来实现等待/通知.使用了分代(Generation)的概念用于表示CyclicBarrier实例是可以重复使用的.
尚凯辉的博客 发布了25 篇原创文章 · 获赞 8 · 访问量 957 私信 关注标签:调用,CyclicBarrier,lock,await,详解,线程,parties,多线程 来源: https://blog.csdn.net/kaihuishang666/article/details/103995252