其他分享
首页 > 其他分享> > CyclicBarrier(同步屏障)

CyclicBarrier(同步屏障)

作者:互联网

CyclicBarrier(同步屏障)

应用场景:一个等多个,当一组线程到达一个屏障,调用awaite,告诉CyclicBarrier我已经到了,然后当前线程会被阻塞,当最后一个线程到达时就会开始执行。下面demo,模拟三个人到达5道门的情景,每次都必须所有人都到一道门才能去下一道门。

使用demo

package com.w.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo implements Runnable{

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    @Override
    public void run() {
        try {
            Thread.sleep((long)Math.random()*10000);
            System.out.println(Thread.currentThread()+"我到了,我在等待其它人");
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            new Thread(new CyclicBarrierDemo(),"一号").start();
            new Thread(new CyclicBarrierDemo(),"二号").start();
            new Thread(new CyclicBarrierDemo(),"三号").start();
            Thread.sleep(2000);
            System.out.println("所有人都到了第"+i+"道门");
        }

        new Thread(new CyclicBarrierDemo(),"四号").start();
        new Thread(new CyclicBarrierDemo(),"五号").start();
        Thread.sleep(2000);
        System.out.println("缺一个人");
    }
}

除了隐式的重置计数器,还可以在出现问题后调用reset()方法重置。

结构

  1. 构造参数

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    //当最后一个线程到达屏障,优先执行,barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
    }
    
  2. 成员变量

    //因为用到的是条件队列,所以需要lock
    private final ReentrantLock lock = new ReentrantLock();
    //通过lock获取条件队列,除最后一个到达的线程,其余线程都会阻塞到这个队列   
    private final Condition trip = lock.newCondition();
    //需要等待的线程数量,通过构造参数初始化
    private final int parties;
    // 当所有线程到达时要执行的任务(构造时可选,即可为null)
    private final Runnable barrierCommand;
    //Generation 实例,在此处进行初始化,表示代的概念
    private Generation generation = new Generation();
    //当前代还有多少个线程未到位
    private int count;
    

方法

  1. awaite() 必用方法,等待,直到所有各方都已调用此屏障。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    //主要方法
    //如果任何线程在等待时被中断,那么所有其他正在等待的线程将抛出BrokenBarrierException,并将barrier置于中断状态
    //其他线程调用该屏障上的重置,如果任何一方正在等待屏障,他们将返回一个BrokenBarrierException
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();//使用condition的线程必须获取到锁
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    //线程被中断,唤醒trip条件队列中所有等待的线程,并将broken置为true,
                    //其它调用awaite的线程会抛出BrokenBarrierException()
                    breakBarrier();
                    throw new InterruptedException();
                }
    			
                int index = --count;
                if (index == 0) {  // tripped
                    //index为0,代表当前代最后一个线程已经到达
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)//如果构造方法时指定了最后一个线程到达时要执行的任务,在这里调用
                            command.run();
                        ranAction = true;
                        //这里新new一个代,重置count,唤醒等待队列中的线程
                        nextGeneration();
                        return 0;//已经是最后一个线程了就直接返回0
                    } finally {
                        if (!ranAction)
                           	//打破当前一代,唤醒其它线程
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //循环直到触发、中断、中断或超时,只有非最后一个线程会执行到这里
                for (;;) {
                    try {
                        //没有设置超时,直接就阻塞到等待队列
                        if (!timed)
                            trip.await();//调用awaite会释放锁
                        else if (nanos > 0L)
                            //阻塞到超时等待队列
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    				//这下面的方法,都只有当前线程被唤醒后才会执行
                    
                    
                    //	如果任何线程在等待时被中断,那么所有其他正在等待的线程将抛出BrokenBarrierException,并将barrier置于中断状态
                    if (g.broken)
                        throw new BrokenBarrierException();
                    //一旦!=成立,代表最后一个线程到达,唤醒了其它所有线程。正常情况下
                 	//有线程唤醒此线程,代表着肯定执行了nextGeneration()方法,也就新new了一代。
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    
     public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
    

与countDownLatch比较

  1. 相同点:

    都是控制线程协做的,一个构造函数传入预期值,当有多少个线程调用时,统计然后到达一个点,让所有线程正常执行。

  2. 不同点:

    countDownLatch,只有一个构造函数。而且需要手动调用countDown去更改计数器值,直到减为0,依次让所有在同步队列的线程执行,并不是等待(条件)队列。

    CyclicBarrier则是自动的减,只需要线程调用awaite()。当一组线程的最后一个线程到达一个代时,唤醒阻塞在等待队列中的所有线程。而且计数器的值可以被重置使用,可以显示的调用reset或者当到达一代之后直接调用awaite进入新的一代。另外,是有提供两个构造方法,其中一个支持当最后一个线程到达屏障时要制作的任务。

    countDownLatch底层实现的是AQS,使用的是共享锁模式,操作的是AQS的state

    CyclicBarrier则是使用的是ReetrantLock的Condition,使用的是互斥锁模式。

标签:同步,Thread,lock,屏障,线程,new,CyclicBarrier,BrokenBarrierException
来源: https://blog.csdn.net/weixin_45862170/article/details/120596121