Java并发编程之同步辅助类
作者:互联网
CountDownLatch
在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,基于AbstractQueuedSynchronizer实现,state初始化为count,每countDown一次减1直到等于0,unpark唤醒await线程
重要方法:
- await():调用此方法线程会被阻塞,直到count为0
- await(long timeout, TimeUnit unit):同await(),可以设置最大等待时间,如超过最大等待时间则不再等待
- countDown():count减1,直至为0
public static void main(String[] args) throws InterruptedException {
int studentCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(studentCount);
for (int i = 0; i < studentCount; i++) {
String studentName = String.valueOf(i);
new Thread(() -> {
try {
System.out.println("学生" + studentName + "正在考试……");
Thread.sleep(2000);
System.out.println("学生" + studentName + "已交卷");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//finally中进行计数器减1,防止发生异常计数失败
countDownLatch.countDown();
}
}).start();
}
System.out.println("等待所有学生交卷");
//如果30秒内还不能收齐试卷,默认为学生弃权,防止发生异常一直等待
countDownLatch.await(30, TimeUnit.SECONDS);
System.out.println("全部学生已经交卷,正在计算平均分");
}
为什么不使用ReentrantLock?
countDown不需要堵塞,只需要在最后一次count=0时去唤醒堵塞的主线程(await),AQS+LockSupport完全够用
Semaphore
计数信号量,用于控制特定资源在同一个时间被访问的个数,基于AbstractQueuedSynchronizer实现,支持公平和非公平信号量,默认非公平信号量,state初始化为permits
重要方法:
- acquire():从信号量获取1个许可,信号量内部计数器减1,如果没有许可,线程将一直阻塞
- acquire(int permits):从信号量获取permits个许可,在提供这些许可前,线程一直阻塞
- release():释放1个许可,将其返回给信号量,信号量内部计数器加1
- release(int permits):释放permits个许可
- availablePermits():当前可用的许可数
- tryAcquire():尝试地获得1个许可,如果获取不到则返回false
- tryAcquire(long timeout, TimeUnit unit):在指定的时间内尝试地获得1个许可,如果获取不到则返回false
- tryAcquire(int permits):尝试地获得permits个许可,如果获取不到则返回false
- tryAcquire(int permits, long timeout, TimeUnit unit):在指定的时间内尝试地获得permits个许可,如果获取不到则返回false
public static void main(String[] args) {
int carCount = 10;
int lot = 5;
Semaphore semaphore = new Semaphore(lot);
for (int i = 0; i < carCount; i++) {
final int x = i;
new Thread(() -> {
try {
System.out.println("来了一辆车" + x);
semaphore.acquire();
System.out.println("有车位," + x + "停车入位");
Thread.sleep(2000);
System.out.println(x + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
CyclicBarrier
一个可循环使用(Cyclic)的屏障(Barrier),让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会放行,所有被屏障拦截的线程继续执行。基于ReentrantLock+Condition实现,await后先lock,然后--count,不等于0就执行Condition.await。反之,重置count并执行Condition.signalAll唤醒所有堵塞线程
重要方法:
- await():在CyclicBarrier上进行阻塞等待,并使count减1
- await(long timeout, TimeUnit unit):在CyclicBarrier上进行限时的阻塞等待,并使count减1,当时间到达限定时间后,线程继续执行
- getParties():获取CyclicBarrier通过屏障的线程数量,也称为方数
- getNumberWaiting():获取正在CyclicBarrier上等待的线程数量
public static void main(String[] args) throws InterruptedException {
int passenger = 5;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(passenger,
() -> System.out.println("乘客已经满5人,准备上车"));
for (int i = 0; i < passenger; i++) {
new Thread(() -> {
try {
System.out.println("乘客+1,等待满员");
cyclicBarrier.await();
System.out.println("乘客已上车");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
Phaser
阶段器/多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance),是一个可控制任务阶段执行且可重复使用的同步器,包含了CountDownLatch和CyclicBarrier的功能,比他们更加灵活、强大。通过自旋+synchronized实现注册时同步问题,LockSupport实现堵塞与唤醒
基本概念
- parties(参与者):参与线程的个数,跟CountDownLatch或者CyclicBarrier的构造方法的参数的含义是一样的,不同Phaser提供了调整的方法
- register / deregister : register通知Phaser参与等待的线程数增加了,deregister通知Phaser参与等待的线程数减少了,然后相应调整parties
- arrive / advance:arrive跟CyclicBarrier中到达栅栏是一个意思,当所有parties个线程都arrive了,则触发advance
- phase:表示执行任务的阶段,初始值是0,每一次advance都会将该值加1,最大值是Integer.MAX_VALUE;如果Phaser被终止了,则该值为负数,此时所有的register、arrive或者await操作都会立即返回
- 父子Phaser:父子Phaser一方面可以避免parties线程过多时导致cas修改state容易失败,另一方面可以基于父子Phaser实现复杂的执行任务的阶段控制。
- 子Phaser的parties线程可以有多个,但是对于父Phaser只有一个
- 只有子Phaser所有的parties线程都到达的时候才通知父Phaser当前子Phaser已到达
- 只有子Phaser所有的parties线程都被注销(deregister)了才会向父Phaser注销当前子Phaser
- Phaser有root、parent两个属性,在多级父子Phaser下,所有的Phaser的root属性都指向同一个祖先Phaser,调用internalAwaitAdvance方法时也是在root Phaser上调用。即所有的子Phaser都共享祖先Phaser的等待线程链表,从而实现最后一个到达的子Phaser可以唤醒其他子Phaser关联的等待线程
上面图中的几点关键点:
- 树的根结点root链接着两个“无锁栈”,用于保存等待线程(比如当线程等待Phaser进入下一阶段时,会根据当前阶段的奇偶性,把自己挂到某个栈中),所有Phaser对象都共享这两个栈。
- 当首次将某个Phaser结点链接到树中时,会同时向该结点的父结点注册一个参与者(子phaser实际只承担了维护state的任务)
state
通过state字段来实现同步逻辑,state是volatile修饰的64位long变量,它有包含了四个维度的语义:
- 低16位,当前未到达的parties,调用arriveXXX时,该值-1,调用register时+1
- 中16位,当前总parties,调用register时+1,deRegister时-1
- 高32位,phase,即Phaser的年龄,当未到达的parties减到0(即所有parties已到达)时,phase自动加1,并且把16-31位的parties数复制到0-15位,从而该Phaser可以继续复用
使用样例
- 模拟CountDownLatch
public static void main(String[] args) {
int num = 6;
Phaser phaser = new Phaser(num);
for (int i = 0; i < num; i++) {
new Thread(() -> {
try {
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + " 已到达");
//表示当前线程已到达
phaser.arrive();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
phaser.awaitAdvance(phaser.getPhase());
System.out.println("大家都到达了");
}
- 模拟CyclicBarrier
public static void main(String[] args) {
int num = 6;
Phaser phaser = new Phaser(num){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个");
// 返回true表示需要终止Phaser,否则继续下一轮的phase
return registeredParties == 0;
}
};
for (int i = 0; i < num; i++) {
new Thread(() -> {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
}).start();
}
}
- 多阶段执行,并控制执行轮数Phase
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
int num = 6;
Phaser phaser = new Phaser(num) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
for (int i = 0; i < num; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
}
}).start();
}
}
- 支持分层功能,减小同一Phaser上同步开销
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
Phaser parent = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
// registeredParties等于2
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
int num1 = 5;
final Phaser phaser1 = new Phaser(parent);
phaser1.bulkRegister(num1);
for (int i = 0; i < num1; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser1.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser1.arriveAndAwaitAdvance();
}
}).start();
}
int num2 = 4;
final Phaser phaser2 = new Phaser(parent);
phaser2.bulkRegister(num2);
for (int i = 0; i < num2; i++) {
new Thread(() -> {
// phaser关闭前循环执行
while (!phaser2.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser2.arriveAndAwaitAdvance();
}
}).start();
}
}
- 灵活调整parties
public static void main(String[] args) {
// 最多执行3轮
int maxPhase = 3;
int num = 6;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
return flag;
}
};
for (int i = 0; i < 2; i++) {
new Thread(() -> {
// 注册线:程parties+1
phaser.register();
// phaser关闭前循环执行
while (!phaser.isTerminated()) {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
phaser.arriveAndAwaitAdvance();
}
}).start();
}
// 增加等待的线程数
phaser.bulkRegister(num - 2);
for (int i = 0; i < num - 2; i++) {
new Thread(() -> {
// 到达并等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
// 达到后注销该线程
phaser.arriveAndDeregister();
}).start();
}
}
参考:
Exchanger
- 交换器,用于两个线程之间的数据交换,如下图:
- 交换示意图
每个线程通过ThreadLocal保存自己的Node信息,通过成功抢占slot的Node节点中item、match进行数据交换
- 完整流程
- 为什么slot占用失败的线程需要CAS修改slot=null?
- 可能存在多线程竞争,所以需要使用CAS
- slot=null代表新一轮的开始
- 堵塞/唤醒使用LockSupport
- 多槽位交换:同时出现了多个配对线程竞争修改slot槽位,导致某个线程CAS修改slot失败时,就会初始化arena多槽数组,后续所有的交换都会走arenaExchange,竞争分散到不同的槽位
- 代码示例
public static void main(String[] args) {
final Exchanger<Integer> exchanger = new Exchanger<>();
Random random = new Random();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
try {
Thread.sleep(random.nextInt(1000));
Integer v = random.nextInt(100);
System.out.println(Thread.currentThread().getName() + " 生产数据:" + v);
Integer message = exchanger.exchange(v);
System.out.println(Thread.currentThread().getName() + " 交换得到数据:" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
标签:Phaser,Java,Thread,int,编程,System,并发,线程,println 来源: https://www.cnblogs.com/sheung/p/14530061.html