CyclicBarrier 分析


简介

  1. CyclicBarrier 是什么?
  2. CyclicBarrier 使用
  3. CyclicBarrier 源码解析
  4. CyclicBarrier 简单实现
  5. barrierAction 是由哪个线程执行的?

CyclicBarrier 是什么?

CyclicBarrier 开始于JDK 1.5, 一个同步工具类,允许一组线程都等待彼此到达公共屏障点。CyclicBarrier 在程序中非常有用,涉及到固定参数的线程数量等待彼此,这个 barrier 被称为
cyclic 是由于它可以所有的等待线程释放之后,重复使用。
CyclicBarrier 支持一个可选的 Runnable 在每一个屏障点执行一次,在所有参与的线程到达之后,但是在执行之前所有的线程都释放了, barrierAction非常有用的对于在任何参与者继续之前更新共享状态。

CyclicBarrier 使用

public class Test2 {

  static class A extends Thread {

    private CyclicBarrier cyclicBarrier;

    public A(CyclicBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      try {
        System.out.println(Thread.currentThread().getName() + " 准备完毕");
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }
  }

  static class B extends Thread {

    private CyclicBarrier cyclicBarrier;

    public B(CyclicBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      try {
        System.out.println(Thread.currentThread().getName() + " 准备完毕");
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }
  }

  static class C extends Thread {

    private CyclicBarrier cyclicBarrier;

    public C(CyclicBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      try {
        System.out.println(Thread.currentThread().getName() + " 准备完毕");
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }
  }


  public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> System.out.println("完成"));
    new A(cyclicBarrier, "A").start();
    new B(cyclicBarrier, "B").start();
    new C(cyclicBarrier, "C").start();
  }
}
/**
output:

A 准备完毕
B 准备完毕
C 准备完毕
完成
*/

CyclicBarrier 源码解析

在看源码前,我们可以对源码的实现进行一些猜想,根据 CyclicBarrier 前面的定义,可以猜想里面有一个变量来表示参与者的数量,在使用调用 await 方法是时候,参与者的数量减一,
知道参与者数量为 0,存在 barrierAction,就执行barrierAction,由于可以重复使用,所以在barrierAction执行对参与者的数量进行恢复。
下面看一下源码实现是否于猜想的类似。

构造方法

parties 参与者的数量
barrierAction 最后执行的动作(可选)

public CyclicBarrier(int parties) {
        this(parties, null);
    }

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

await 方法

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 减去一个参与者
            int index = --count;
           // 如果参与者数量为0,判断barrierAction是否为null,不为null, 将执行run方法,调用nextGeneration恢复状态
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await(); // 在屏障点等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

nextGeneration 方法

private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();  // 唤醒参与者
        // set up next generation
        count = parties; // 恢复参与者的数量
        generation = new Generation(); // 下一代
    }

CyclicBarrier 简单实现

借助AtomicInteger 来简单实现

public class SimpleBarrier {

  private AtomicInteger count;
  int size;
  private Runnable command;

  public SimpleBarrier(int n) {
    this.count = new AtomicInteger(n);
    this.size = n;
  }

  public SimpleBarrier(int n, Runnable barrierAction) {
    this(n);
    this.command = barrierAction;
  }

  public void await() {
    int position = count.getAndDecrement();
    if (position == 1) {
      command.run();
      count.set(size);
    } else {
      while (count.get() != 0) {
      }
    }
  }
}

public class Test2 {

  static class A extends Thread {

    private SimpleBarrier cyclicBarrier;

    public A(SimpleBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + " 准备完毕");
      cyclicBarrier.await();
    }
  }

  static class B extends Thread {

    private SimpleBarrier cyclicBarrier;

    public B(SimpleBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + " 准备完毕");
      cyclicBarrier.await();
    }
  }

  static class C extends Thread {

    private SimpleBarrier cyclicBarrier;

    public C(SimpleBarrier cyclicBarrier, String name) {
      this.cyclicBarrier = cyclicBarrier;
      setName(name);
    }

    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + " 准备完毕");
      cyclicBarrier.await();
    }
  }


  public static void main(String[] args) {
    SimpleBarrier cyclicBarrier = new SimpleBarrier(3, () -> System.out.println("完成"));
    new A(cyclicBarrier, "A").start();
    new B(cyclicBarrier, "B").start();
    new C(cyclicBarrier, "C").start();
  }
}
/**
output:
A 准备完毕
B 准备完毕
C 准备完毕
完成
*/

barrierAction 是由哪个线程执行的?

最后一个线程来执行。

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

  • 分享:
评论
还没有评论
    发表评论 说点什么