> 文档中心 > CyclicBarrier的理解与应用

CyclicBarrier的理解与应用


一、概述

CyclicBarrier类是JUC框架中的工具类,也是一个同步辅助装置:允许多个线程去等待直到全部线程抵达了公共的栅栏点。它的一个很明显的特点就是Cyclic 循环,也就是说栅栏是可以循环使用的,激活循环使用的条件是当所有线程通过了栅栏并释放。

二、源码分析

//静态内部类,用于表示屏障的状态private static class Generation { boolean broken = false;    }//重入锁,用于并发场景下的加锁和释放锁    private final ReentrantLock lock = new ReentrantLock();//通过重入锁拿到条件对象,调用await方法会进入到条件队列中    private final Condition trip = lock.newCondition();//表示线程数,可以立即为这些线程数都执行完后,屏障才会打破进入下一代    private final int parties;//这个是屏障被打破后,执行的一个任务    private final Runnable barrierCommand;//内部内实例,存储屏障状态    private Generation generation = new Generation();  //等待执行的线程数    private int count;    //进入下一代,其实就是下一个新的循环,触发条件是所有的线程都执行完,屏障被打破时    private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation();    }//打破屏障,唤醒条件队列中线程private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll();    }====================================================  核心方法   =================================================================private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //上锁 lock.lock(); try {     final Generation g = generation;     //判断屏障的状态     if (g.broken)  throw new BrokenBarrierException();    //判断线程是否被打断     if (Thread.interrupted()) {  breakBarrier();  throw new InterruptedException();     }//被执行的线程数减一     int index = --count;     //如果当前的线程都执行完毕     if (index == 0) {  // tripped  boolean ranAction = false;  try {      final Runnable command = barrierCommand;      if (command != null)      //执行垫底的barrierCommand方法   command.run();      ranAction = true;      //开始下一个循环,其实也就是重置一下屏障的状态,将parties的值重新复制给count并唤醒在条件队列中的线程      nextGeneration();      return 0;  } finally {      if (!ranAction)      //打破屏障   breakBarrier();  }     }//无限循环     for (;;) {  try {      if (!timed)      //调用await方法等待,其实就是到了条件队列condition中去了,等待被唤醒   trip.await();      else if (nanos > 0L)   nanos = trip.awaitNanos(nanos);  } catch (InterruptedException ie) {      if (g == generation && ! g.broken) {   breakBarrier();   throw ie;      } else {   // We're about to finish waiting even if we had not   // been interrupted, so this interrupt is deemed to   // "belong" to subsequent execution.   Thread.currentThread().interrupt();      }  }  if (g.broken)      throw new BrokenBarrierException();  if (g != generation)      return index;  if (timed && nanos <= 0L) {      breakBarrier();      throw new TimeoutException();  }     } } finally {     lock.unlock(); }    }===================================================================================================================================//构造函数,传入线程数和执行的方法public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction;    }public CyclicBarrier(int parties) { this(parties, null);    }public int await() throws InterruptedException, BrokenBarrierException { try {     return dowait(false, 0L); } catch (TimeoutException toe) {     throw new Error(toe); // cannot happen }    }//重置count,其实就是开启一个新的循环public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try {     breakBarrier();   // break the current generation     nextGeneration(); // start a new generation } finally {     lock.unlock(); }    }

三、练习

public class MyThread extends Thread{    private String name;    private CyclicBarrier cb;    public MyThread(String name,CyclicBarrier cb){ this.name = name; this.cb = cb;    }    @Override    public void run() { try{     System.out.println(Thread.currentThread().getName()+"开始执行");     cb.await(); }catch(Exception e){     e.printStackTrace(); }finally {     System.out.println(Thread.currentThread().getName()+"继续"); }    }}public static void main(String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {     @Override     public void run() {  System.out.println("屏障被打破,重新开始==========");     } }); MyThread t1 = new MyThread("t1",cb); MyThread t2 = new MyThread("t2",cb); t1.start(); t2.start(); cb.await();    }

四、总结

1、CyclicBarrier和CountDownLatch极为类似,两者都是栅栏工具类,不同的是,CyclicBarrier可以循环使用栅栏,而CountDownLatch只能使用一次。两者都是减法计数,但是数值的存储属性却不相同,CyclicBarrier使用的是自己的私有属性parties,而CountDownLatch使用的是AQS类的state属性。

2、CyclicBarrier和CountDownLatch还有一个不同点在于:CyclicBarrier在所有线程执行完后,屏障被打破时,会执行barrierCommand最终任务,当然不是必须的。

3、在何时使用CountDownLatch和CountDownLatch,还是有稍微差异:CyclicBarrier适合那种跨栏比赛,但是要求所有参赛选手跨过同一个栏后才能跨下一个栏。CountDownLatch适合那种游戏,选手都准备好了,才能开始游戏主线程。
其实说来说去,无非就是CountDownLatch适合循环的场景,CountDownLatch适合一次性的场景!

彭州一中