编程语言
首页 > 编程语言> > Java并发编程之同步辅助类

Java并发编程之同步辅助类

作者:互联网

CountDownLatch

在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,基于AbstractQueuedSynchronizer实现,state初始化为count,每countDown一次减1直到等于0,unpark唤醒await线程

重要方法:

  • await():调用此方法线程会被阻塞,直到count为0
  • await(long timeout, TimeUnit unit):同await(),可以设置最大等待时间,如超过最大等待时间则不再等待
  • countDown():count减1,直至为0
public static void main(String[] args) throws InterruptedException {
  int studentCount = 10;
  final CountDownLatch countDownLatch = new CountDownLatch(studentCount);
  for (int i = 0; i < studentCount; i++) {
    String studentName = String.valueOf(i);
    new Thread(() -> {
      try {
        System.out.println("学生" + studentName + "正在考试……");
        Thread.sleep(2000);
        System.out.println("学生" + studentName + "已交卷");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        //finally中进行计数器减1,防止发生异常计数失败
        countDownLatch.countDown();
      }
    }).start();
  }
  System.out.println("等待所有学生交卷");
  //如果30秒内还不能收齐试卷,默认为学生弃权,防止发生异常一直等待
  countDownLatch.await(30, TimeUnit.SECONDS);
  System.out.println("全部学生已经交卷,正在计算平均分");
}

为什么不使用ReentrantLock?

countDown不需要堵塞,只需要在最后一次count=0时去唤醒堵塞的主线程(await),AQS+LockSupport完全够用

Semaphore

计数信号量,用于控制特定资源在同一个时间被访问的个数,基于AbstractQueuedSynchronizer实现,支持公平和非公平信号量,默认非公平信号量,state初始化为permits

重要方法:

  • acquire():从信号量获取1个许可,信号量内部计数器减1,如果没有许可,线程将一直阻塞
  • acquire(int permits):从信号量获取permits个许可,在提供这些许可前,线程一直阻塞
  • release():释放1个许可,将其返回给信号量,信号量内部计数器加1
  • release(int permits):释放permits个许可
  • availablePermits():当前可用的许可数
  • tryAcquire():尝试地获得1个许可,如果获取不到则返回false
  • tryAcquire(long timeout, TimeUnit unit):在指定的时间内尝试地获得1个许可,如果获取不到则返回false
  • tryAcquire(int permits):尝试地获得permits个许可,如果获取不到则返回false
  • tryAcquire(int permits, long timeout, TimeUnit unit):在指定的时间内尝试地获得permits个许可,如果获取不到则返回false
public static void main(String[] args) {
  int carCount = 10;
  int lot = 5;
  Semaphore semaphore = new Semaphore(lot);
  for (int i = 0; i < carCount; i++) {
    final int x = i;
    new Thread(() -> {
      try {
        System.out.println("来了一辆车" + x);
        semaphore.acquire();
        System.out.println("有车位," + x + "停车入位");
        Thread.sleep(2000);
        System.out.println(x + "离开车位");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        semaphore.release();
      }
    }).start();
  }
}

CyclicBarrier

一个可循环使用(Cyclic)的屏障(Barrier),让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会放行,所有被屏障拦截的线程继续执行。基于ReentrantLock+Condition实现,await后先lock,然后--count,不等于0就执行Condition.await。反之,重置count并执行Condition.signalAll唤醒所有堵塞线程

重要方法:

  • await():在CyclicBarrier上进行阻塞等待,并使count减1
  • await(long timeout, TimeUnit unit):在CyclicBarrier上进行限时的阻塞等待,并使count减1,当时间到达限定时间后,线程继续执行
  • getParties():获取CyclicBarrier通过屏障的线程数量,也称为方数
  • getNumberWaiting():获取正在CyclicBarrier上等待的线程数量
public static void main(String[] args) throws InterruptedException {
  int passenger = 5;
  final CyclicBarrier cyclicBarrier = new CyclicBarrier(passenger,
    () -> System.out.println("乘客已经满5人,准备上车"));
  for (int i = 0; i < passenger; i++) {
    new Thread(() -> {
      try {
        System.out.println("乘客+1,等待满员");
        cyclicBarrier.await();
        System.out.println("乘客已上车");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }).start();
  }
}

Phaser

阶段器/多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance),是一个可控制任务阶段执行且可重复使用的同步器,包含了CountDownLatch和CyclicBarrier的功能,比他们更加灵活、强大。通过自旋+synchronized实现注册时同步问题,LockSupport实现堵塞与唤醒

基本概念

image

image

上面图中的几点关键点:

state

image

通过state字段来实现同步逻辑,state是volatile修饰的64位long变量,它有包含了四个维度的语义:

使用样例

public static void main(String[] args) {
    int num = 6;
    Phaser phaser = new Phaser(num);
    for (int i = 0; i < num; i++) {
        new Thread(() -> {
            try {
                Thread.sleep(200);
                System.out.println(Thread.currentThread().getName() + " 已到达");
                //表示当前线程已到达
                phaser.arrive();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    phaser.awaitAdvance(phaser.getPhase());
    System.out.println("大家都到达了");
}
public static void main(String[] args) {
    int num = 6;
    Phaser phaser = new Phaser(num){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个");
            // 返回true表示需要终止Phaser,否则继续下一轮的phase
            return registeredParties == 0;
        }
    };
    for (int i = 0; i < num; i++) {
        new Thread(() -> {
            // 到达并等待其他线程到达
            System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " 开始执行任务");
        }).start();
    }
}
public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    int num = 6;
    Phaser phaser = new Phaser(num) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
            System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
            return flag;
        }
    };
    for (int i = 0; i < num; i++) {
        new Thread(() -> {
            // phaser关闭前循环执行
            while (!phaser.isTerminated()) {
                // 到达并等待其他线程到达
                System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " 开始执行任务");
            }
        }).start();
    }
}
public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    Phaser parent = new Phaser() {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
            // registeredParties等于2
            System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
            return flag;
        }
    };
    int num1 = 5;
    final Phaser phaser1 = new Phaser(parent);
    phaser1.bulkRegister(num1);
    for (int i = 0; i < num1; i++) {
        new Thread(() -> {
            // phaser关闭前循环执行
            while (!phaser1.isTerminated()) {
                // 到达并等待其他线程到达
                System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
                phaser1.arriveAndAwaitAdvance();
            }
        }).start();
    }

    int num2 = 4;
    final Phaser phaser2 = new Phaser(parent);
    phaser2.bulkRegister(num2);
    for (int i = 0; i < num2; i++) {
        new Thread(() -> {
            // phaser关闭前循环执行
            while (!phaser2.isTerminated()) {
                // 到达并等待其他线程到达
                System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
                phaser2.arriveAndAwaitAdvance();
            }
        }).start();
    }
}
public static void main(String[] args) {
    // 最多执行3轮
    int maxPhase = 3;
    int num = 6;
    Phaser phaser = new Phaser() {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            boolean flag = phase + 1 >= maxPhase || registeredParties == 0;
            System.out.println("Phase " + phase + " 结束,目前有Parties:" + registeredParties + "个,阶段器是否停止:" + flag);
            return flag;
        }
    };
    for (int i = 0; i < 2; i++) {
        new Thread(() -> {
            // 注册线:程parties+1
            phaser.register();
            // phaser关闭前循环执行
            while (!phaser.isTerminated()) {
                // 到达并等待其他线程到达
                System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
                phaser.arriveAndAwaitAdvance();
            }
        }).start();
    }
    // 增加等待的线程数
    phaser.bulkRegister(num - 2);
    for (int i = 0; i < num - 2; i++) {
        new Thread(() -> {
            // 到达并等待其他线程到达
            System.out.println(Thread.currentThread().getName() + " 已到达,等待其他线程到达");
            // 达到后注销该线程
            phaser.arriveAndDeregister();
        }).start();
    }
}

参考:

Exchanger

image

image

每个线程通过ThreadLocal保存自己的Node信息,通过成功抢占slot的Node节点中item、match进行数据交换

image

  • 为什么slot占用失败的线程需要CAS修改slot=null?
    • 可能存在多线程竞争,所以需要使用CAS
    • slot=null代表新一轮的开始
  • 堵塞/唤醒使用LockSupport
  • 多槽位交换:同时出现了多个配对线程竞争修改slot槽位,导致某个线程CAS修改slot失败时,就会初始化arena多槽数组,后续所有的交换都会走arenaExchange,竞争分散到不同的槽位
public static void main(String[] args) {
    final Exchanger<Integer> exchanger = new Exchanger<>();
    Random random = new Random();
    for (int i = 0; i < 2; i++) {
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(random.nextInt(1000));
                    Integer v = random.nextInt(100);
                    System.out.println(Thread.currentThread().getName() + " 生产数据:" + v);
                    Integer message = exchanger.exchange(v);
                    System.out.println(Thread.currentThread().getName() + " 交换得到数据:" + message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

标签:Phaser,Java,Thread,int,编程,System,并发,线程,println
来源: https://www.cnblogs.com/sheung/p/14530061.html