011Java并发包009辅助类
作者:互联网
1 CountDownLatch
1.1 简介
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一致等待。
在当前计数到达零之前,等待线程会一直阻塞,计数为零时会释放所有等待线程。
计数器无法被重置,因此只能使用一次,不能重复使用。
1.2 源码分析
1.2.1 构造方法
构造方法需要传入一个非负整数,否则会抛出异常。
通过构造方法将传入的值,设置为AQS中state字段的值。
1 // CountDownLatch的构造方法 2 public CountDownLatch(int count) { 3 if (count < 0) throw new IllegalArgumentException("count < 0"); 4 this.sync = new Sync(count); 5 } 6 ... 7 // Sync继承自AQS 8 private static final class Sync extends AbstractQueuedSynchronizer 9 ... 10 // Sync的构造方法 11 Sync(int count) { 12 setState(count); 13 } 14 ... 15 // AQS类的setState方法 16 protected final void setState(int newState) { 17 state = newState; 18 }
1.2.2 await方法
调用await方法时,线程会阻塞,直到计数器的值变为0时,才会继续执行。
通过await方法,判断AQS中state字段的值是否为0,若为0则不被阻塞,若不为0则阻塞入列当前线程。
1 // CountDownLatch的await方法 2 public void await() throws InterruptedException { 3 sync.acquireSharedInterruptibly(1); 4 } 5 ... 6 // AQS的acquireSharedInterruptibly方法 7 public final void acquireSharedInterruptibly(int arg) 8 throws InterruptedException { 9 if (Thread.interrupted()) 10 throw new InterruptedException(); 11 if (tryAcquireShared(arg) < 0) 12 doAcquireSharedInterruptibly(arg); 13 } 14 ... 15 // Sync的tryAcquireShared方法 16 protected int tryAcquireShared(int acquires) { 17 return (getState() == 0) ? 1 : -1; 18 }
1.2.3 countDown方法
调用countDown方法时,计数器数值会减1,当计数器被减为0时,唤醒等待的线程。
通过await方法,对AQS中state字段的值进行双层判断。
判断AQS中state字段的值是否为0,为0则返回,不为0则减1并判断。
判断AQS中state字段的值是否为0,为0则唤醒队列中的线程,不为0则返回。
1 // CountDownLatch的countDown方法 2 public void countDown() { 3 sync.releaseShared(1); 4 } 5 // AQS的releaseShared方法 6 public final boolean releaseShared(int arg) { 7 if (tryReleaseShared(arg)) { 8 doReleaseShared(); 9 return true; 10 } 11 return false; 12 } 13 // Sync的tryReleaseShared方法 14 protected boolean tryReleaseShared(int releases) { 15 // Decrement count; signal when transition to zero 16 for (;;) { 17 int c = getState(); 18 if (c == 0) 19 return false; 20 int nextc = c-1; 21 if (compareAndSetState(c, nextc)) 22 return nextc == 0; 23 } 24 }
1.3 使用举例
1 public static void main(String[] args) throws InterruptedException { 2 CountDownLatch cdl = new CountDownLatch(3); 3 for (int i = 0; i < cdl.getCount(); i++) { 4 new Thread(() -> { 5 System.out.println(Thread.currentThread().getName() + "离开课堂"); 6 cdl.countDown(); 7 }, i + "号").start(); 8 } 9 cdl.await(); 10 System.out.println("所有学生都已离开课堂"); 11 }
2 CyclicBarrier
2.1 简介
一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点,并且这种屏障是可循环利用的。
2.2 源码分析
2.2.1 构造方法
构造方法有两个,最终调用的是同一个。
要求传入一个整型的数量,以及一个屏障操作,当等待的线程达到指定数量时,由最后一个进入屏障的线程执行屏障操作。
1 public CyclicBarrier(int parties, Runnable barrierAction) { 2 if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 } 7 8 public CyclicBarrier(int parties) { 9 this(parties, null); 10 }
2.2.2 await方法
调用await方法后,线程会进入等待状态,直到达到指定数量后,执行屏障操作。
1 public int await() throws InterruptedException, BrokenBarrierException { 2 try { 3 return dowait(false, 0L); 4 } catch (TimeoutException toe) { 5 throw new Error(toe); // cannot happen 6 } 7 }
2.3 使用举例
1 public static void main(String[] args) { 2 CyclicBarrier cb = new CyclicBarrier(7, () -> { 3 System.out.println(Thread.currentThread().getName() + "龙珠是最后一颗,龙珠集齐,召唤神龙"); 4 }); 5 for (int i = 1; i <= 7; i++) { 6 new Thread(() -> { 7 try { 8 System.out.println(Thread.currentThread().getName() + "龙珠被收集"); 9 cb.await(); 10 } catch (Exception e) { 11 e.printStackTrace(); 12 } 13 }, String.valueOf(i)).start(); 14 } 15 }
3 Semaphore
3.1 简介
一个同步辅助类,可以看做是一个计数信号量,信号量维护了一个许可集。
3.2 源码分析
3.2.1 构造方法
构造方法有两个,都需要传入整型的许可数量,第一个使用非公平的锁,第二个可以传入参数指定使用公平锁还是非公平锁。
1 public Semaphore(int permits) { 2 sync = new NonfairSync(permits); 3 } 4 5 public Semaphore(int permits, boolean fair) { 6 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 7 }
3.2.2 acquire方法
调用acquire方法会尝试获取许可,获取不到会一直阻塞。
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
3.2.3 release方法
调用release方法会释放许可。
1 public void release() { 2 sync.releaseShared(1); 3 }
3.3 使用举例
1 public static void main(String[] args) { 2 Semaphore s = new Semaphore(3); 3 for (int i = 1; i <= 6; i++) { 4 new Thread(() -> { 5 try { 6 s.acquire(); 7 System.out.println(Thread.currentThread().getName() + "抢到了"); 8 Thread.sleep(100 * new Random().nextInt(5)); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } finally { 12 System.out.println(Thread.currentThread().getName() + "释放了"); 13 s.release(); 14 } 15 }, String.valueOf(i)).start(); 16 } 17 }
标签:构造方法,int,await,public,线程,new,发包,009,011Java 来源: https://www.cnblogs.com/zhibiweilai/p/15381083.html