> 文档中心 > (JUC 下典型的类)Java 并发包中线程同步器

(JUC 下典型的类)Java 并发包中线程同步器

Java 并发包中线程同步器

  • CountDownLatch
    • CountDownLatch 与 join 方法的区别
    • CountDownLatch 中的方法介绍
  • 回环屏障 CyclicBarrier
    • CyclicBarrier 中几个重要方法
  • 信号量 Semaphore
    • Semaphore 主要方法
  • 总结

CountDownLatch

在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。在 CountDownLatch 出现之前一般都使用线程 join() 方法来实现这一点,但是 join 方法不够灵活,不能满足不同场景的需要,所以 JDK 开发提供了 CountDownLatch 这个类,使用 CountDownLatch 代码如下:

// 计数器:判断线程池的任务是否已经全部执行完import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class CountDownLatchDemo1 {    public static void main(String[] args) throws InterruptedException { // 创建计数器 CountDownLatch countDownLatch = new CountDownLatch(5); // 创建新线程执行任务 for (int i = 0; i < 5; i++) {     new Thread(() -> {  Thread currThread = Thread.currentThread();  System.out.println(currThread.getName() + "开始执行");  // 线程执行所用时间  int runTime = (1 + new Random().nextInt(5));  try {      TimeUnit.SECONDS.sleep(runTime);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(currThread.getName() + "执行完成,用时" + runTime);  //计数器-1  countDownLatch.countDown();     },"线程" + i+"-> ").start(); } countDownLatch.await(); // 阻塞等待,直到所有线程执行完 System.out.println("执行结果");    }}

在如上代码中,创建了一个 CountDownLatch 实例,用 for 循环创建 5 个线程,所以给构造函数传递参数为 5。主线程调用 countDownLatch.await() 方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown() 方法让countDownLatch 内部的计数器减 1,所有子线程执行完毕后并调用 countDown() 方法后计数器会变为 0,这时候主线程的 await() 方法才会返回。

以上代码是用直接循环创建 5 个线程实现的,其实在项目实践中一般都避免直接操作线程,而是使用 ExecutorService 线程池来管理,使用 ExecutorService 时传递的参数是 Runnable 或者 Callable 对象,这时候你就没有办法调用这些线程的 join() 方法,这就需要选择使用 CountDownLatch 了,将上面代码修改如下:

import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CountDownLatchDemo2 {    // 创建计数器    private static CountDownLatch countDownLatch = new CountDownLatch(5);    public static void main(String[] args) throws InterruptedException { // 创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) {     executorService.submit(new Runnable() {  @Override  public void run() {      Thread currThread = Thread.currentThread();      System.out.println(currThread.getName() + "开始执行");      // 线程执行所用时间      int runTime = (1 + new Random().nextInt(5));      try {   TimeUnit.SECONDS.sleep(runTime);      } catch (InterruptedException e) {   e.printStackTrace();      }      System.out.println(currThread.getName() + "执行完成,用时" + runTime);      //计数器-1      countDownLatch.countDown();  }     }); } countDownLatch.await(); // 阻塞等待,直到所有线程执行完 System.out.println("执行结果");    }}

CountDownLatch 与 join 方法的区别

一个区别是,调用一个子线程的 join() 方法后,该线程会一直被阻塞直到子线程执行完毕,而 CountDownLatch 可以在子线程运行的任何时候让 await 方法返回而不一定必须等到线程结束。另外,使用线程池来管理线程时一般都是直接添加 Runnable 到线程池,这时候就没有办法再调用线程的 join 方法了,就是说 countDownLatch 相比 join 方法让我们对线程同步有更灵活的控制。

CountDownLatch 中的方法介绍

  1. void await(): 当线程调用 CountDownLatch 对象的 await 方法后,当前线程会被阻塞,直到下面情况之一发生才会返回:当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是当计数器的值为 0 时;其他线程调用了当前线程的 interupt() 方法中断了当前线程,当前线程会抛出 InterruptedException 异常,然后返回。
  2. boolean await(long timeout, TimeUnit unit):当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面情况之一发生才会返回:当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是当计数器的值为 0 时;其他线程调用了当前线程的 interupt() 方法中断了当前线程,当前线程会抛出 InterruptedException 异常,然后返回。
  3. void countDown():线程调用该方法后,计数器的值递减,递减后如果计数器值为 0 则唤醒所有因调用 await 方法而被阻塞的线程,否则什么都不做。
  4. long getCount():获取当前计数器的值。

回环屏障 CyclicBarrier

上面介绍的 CountDownLatch 在解决多个线程同步方面相对于调用 join 方法已经有了不少优化,但是 CountDownLatch 的计数器是一次性的,也就是等到计数器变为 0 后,再调用 CountDownLatch 的 await 和 countdownLatch 方法都会立即返回,这就起不到线程同步的效果了。所以为了满足计数器可以重置的需要,JDK 开发组提供了 CyclicBarrier 类,并且 CyclicBarrier 类的功能并不限于 CountDownLatch 的功能。从字面意思理解,CyclicBarrier 是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕,并重置 CyclicBarrier 的状态后它可以被重用。之所以叫做屏障是因为所有线程调用 await 方法后会被阻塞,这个阻塞点就称为屏蔽点,等所有线程都调用了 await 方法后,线程们就会冲破屏障,继续向下执行。

下面是一个演示:

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;// 循环屏障public class CyclicBarrierDemo1 {    public static void main(String[] args) { // 循环屏障 CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {     @Override     public void run() {  System.out.println("计数器为 0 了");     } }); // 创建线程池 ExecutorService service = Executors.newFixedThreadPool(4); for (int i = 0; i < 4; i++) {     int finalI = i;     service.submit(() -> {  Thread currThread = Thread.currentThread();  System.out.println("执行线程:" + currThread.getName());  try {      Thread.sleep(500 * finalI);      cyclicBarrier.await(); // 执行阻塞等待(直到循环屏障的计数器为0的时候,再执行后面的代码)  } catch (InterruptedException e) {      e.printStackTrace();  } catch (BrokenBarrierException e) {      e.printStackTrace();  }  System.out.println("线程执行完成:"+currThread.getName());     }); }    }}

在这里插入图片描述

上面代码中,每个子线程在开始执行后都调用了 await 方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程的阶段性执行。

CyclicBarrier 中几个重要方法

  1. int await():当线程调用 CyclicBarrier 的方法时会被阻塞,直到满足下面条件之一才会返回:parties 个线程都调用了 await() 方法,也就是线程都到了屏障点;其他线程调用了当前线程的 interrupt() 方法中断了当前线程,则当前线程会抛出 InterruptedException 异常而返回;与当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时,会抛出 BrokenBarrierException 异常,然后返回。
  2. boolean await(long timeout, TimeUnit unit):当线程调用 CyclicBarrier 的方法时会被阻塞,直到满足下面条件之一才会返回:parties 个线程都调用了 await() 方法,也就是线程都到了屏障点,这时候返回true;设置的超时时间到了后返回 false;其他线程调用了当前线程的 interrupt() 方法中断了当前线程,则当前线程会抛出 InterruptedException 异常而返回;与当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时,会抛出 BrokenBarrierException 异常,然后返回。
  3. int dowait(boolean timed, long nanos):该方法是 CyclicBarrier 的核心功能,当一个线程调用了 dowait 方法后,首先会获取独占锁 lock,如果创建 CyclicBarrier 时传递的参数为 10,那么后面 9 个调用线程会被阻塞。然后当前获取到的锁的线程会对计数器 count 进行递减操作,递减后 count = index = 9。如果当前线程调用了 await(),由于被阻塞释放锁后,其他被阻塞的 9 个线程中有一个会竞争到 lock 锁,然后执行同样的操作,直到最后一个线程获取到 lock 锁。最后 count = index 等于 0,会重置 CyclicBarrier,然后这 10 个线程就可以继续向下运行了。

信号量 Semaphore

Semaphore 信号量也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同的是,它内部的计数器是递增的,并且在一开始初始化 Semaphore 时可以指定一个初始值,但是并不需要知道同步的线程个数,而是在需要同步的地方调用 acquire 方法时指定需要同步的线程个数。

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreDemo2 {    private static Semaphore semaphore = new Semaphore(0);    public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 2; i++) {     executorService.submit(new Runnable() {  @Override  public void run() {      System.out.println(Thread.currentThread()+ "开始执行");      semaphore.release();  }     }); } // 等待子线程执行完毕,返回 semaphore.acquire(2); System.out.println("所有线程执行完毕"); // 关闭线程池 executorService.shutdown();    }}

在这里插入图片描述
如上代码首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器值为 0。然后 mian 函数向线程池添加两个线程任务,在每个线程内部调用信号量的 release 方法,这相当于让计数器值递增 1。最后在 main线程里面调用信号量的 acquire 方法,传参为 2 说明调用 acquire 方法的线程会一直阻塞,知道信号量的计数变为 2 才会返回。看到这里也就明白了,如果构造方法 Semaphore 时传递的参数为 N,并在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 使 M 个线程同步时传递的参数应该是 M+N。

Semaphore 主要方法

  1. void acquire():当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减 1,然后该方法直接返回。否则如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。当其他线程调用了当前线程的 interrupt() 方法中断了当前线程时,则会抛出 InterruptedException 异常返回。
  2. void acquire(int permits): 该方法与 void acquire() 方法不同,后者只需要获取一个信号量值,而前者则获取 permits 个。
  3. void acquireUninterruptibly():该方法与 void acquire() 类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源是(包含被阻塞后),其他线程调用了当前线程的 interrupt() 方法设置了当前线程的中断标志,此时当前线程并不会抛出 InterruptedException 异常而返回。
  4. void acquireUninterruptibly(int permits):该方法与 void acquire(int permits) 方法不同之处在于,该方法对中断不响应。
  5. void release():该方法的作用是把当前 Semaphore 对象的信号量值增加 1,如果当前有线程因为调用 aquire 方法被阻塞而被放入了 AQS 的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
  6. void release(int permits):该方法与不带参数的 release 方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加 permits,而后者每次增加 1。

总结

此文介绍了并发包中关于线程协作的一些重要类。首先 CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器值为 0,就可以往下执行,这相比于 join 必须等待线程执行完毕后主线程才会继续向下运行更灵活。另外,CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者在计数器值变为 0 后,就不能再被复用,而前者则可以使用 reset 方法重置后复用,前者对同一个算法但是输入参数不同的类似场景比较使用。而 Semaphore 采用了信号量递增的策略,一开始并不需要关心同步的线程个数,等调用 aquire 方法时再指定需要同步的个数,并且提供了获取信号量的公平性策略。