编程语言
首页 > 编程语言> > java phaser

java phaser

作者:互联网

Phaser

  java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser.

  Phaser拥有与CyclicBarrier和CountDownLatch类似的功劳.但是这个类提供了更加灵活的应用.CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者.移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者.并且在抵达屏障是可以注销已经注册的参与者.因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化

  如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.因此,移相器会有多代.一旦为某个特定相位注册的所有参与者都到达移相器,就增加相数.相数从零开始,在达到Integer.MAX_VALUE后,再次绕回0.当移相器发生变化时,通过重写onAdvance方法,可以自行可选操作.这个方法也可用于终止移相器.移相器一旦被终止,所有的同步方法就会立即返回,并尝试注册新的失败的参与者


  移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争.很明显,更小的组将拥有更少的竞争同步的参与者.因此,将大量的参与者分成较小的组可以减少竞争.虽然创建移相器能增加中的吞吐量,但是这需要更多的开销.最后,移相器的另一个重要的特征在于监控功能,使用独立的对象可以监视移相器的当前状态.监视器可以查询注册到移相器的参与者的数量,以及已经到达和还没有到达某个特定相数的参与者的数量

  Phaser中是通过计数器来控制。在Phaser中计数器叫做parties, 我们可以通过Phaser的构造函数或者register()方法来注册

  通过调用register()方法,我们可以动态的控制phaser的个数。如果我们需要取消注册,则可以调用arriveAndDeregister()方法,我们看下arrive:

  public int arrive() {
        return doArrive(ONE_ARRIVAL);
    }

  Phaser中arrive实际上调用了doArrive方法,doArrive接收一个adjust参数,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister

  Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞,但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为

  下面看一个基本的使用:

void runTasks(List<Runnable> tasks) {
        final Phaser phaser = new Phaser(1); // "1" to register self
        // create and start threads
        for (final Runnable task : tasks) {
            phaser.register();
            new Thread() {
                public void run() {
                    phaser.arriveAndAwaitAdvance(); // await all creation
                    task.run();
                }
            }.start();
        }

        // allow threads to start and deregister self
        phaser.arriveAndDeregister();
    }

  上面的例子中,我们在执行每个Runnable之前调用register()来注册, 然后调用arriveAndAwaitAdvance()来等待这一个Phaser周期结束。最后我们调用 phaser.arriveAndDeregister()来取消注册主线程

  下面来详细的分析一下运行步骤:

    final Phaser phaser = new Phaser(1);

  这一步我们初始化了一个Phaser,并且指定其现在party的个数为1

    phaser.register();

  这一步注册Runnable task到phaser,同时将party+1

    phaser.arriveAndAwaitAdvance()

  这一步将会等待直到所有的party都arrive。这里只会将步骤2中注册的party标记为arrive,而步骤1中初始化的party一直都没有被arrive。

    phaser.arriveAndDeregister();

  在主线程中,arrive了步骤1中的party,并且将party的个数减一

    步骤3中的phaser.arriveAndAwaitAdvance()将会继续执行,因为最后一个phaser在步骤4中arrive了

多个Phaser周期

  Phaser的值是从0到Integer.MAX_VALUE,每个周期过后该值就会加一,如果到达Integer.MAX_VALUE则会继续从0开始。

  如果我们执行多个Phaser周期,则可以重写onAdvance方法:

  protected boolean onAdvance(int phase, int registeredParties) {
        return registeredParties == 0;
    }

  onAdvance将会在最后一个arrive()调用的时候被调用,如果这个时候registeredParties为0的话,该Phaser将会调用isTerminated方法结束该Phaser

  如果要实现多周期的情况,我们可以重写这个方法:

      protected boolean onAdvance(int phase, int registeredParties) {
                return phase >= iterations || registeredParties == 0;
            }

  上面的例子中,如果phase次数超过了指定的iterations次数则就会自动终止

  我们看下实际的例子:

void startTasks(List<Runnable> tasks, final int iterations) {
        final Phaser phaser = new Phaser() {
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase >= iterations || registeredParties == 0;
            }
        };
        phaser.register();
        for (final Runnable task : tasks) {
            phaser.register();
            new Thread() {
                public void run() {
                    do {
                        task.run();
                        phaser.arriveAndAwaitAdvance();
                    } while (!phaser.isTerminated());
                }
            }.start();
        }
        phaser.arriveAndDeregister(); // deregister self, don't wait
    }

  上面的例子将会执行iterations次

 

标签:Phaser,java,移相器,phaser,phase,party,arrive
来源: https://www.cnblogs.com/hzzjj/p/15844688.html