> 文档中心 > CompletableFuture异步编排

CompletableFuture异步编排

异步

  • 一、线程基本了解
    • 1、创建线程的四种方式
      • 1.1、继承Thread类
      • 1.2、实现Runnable接口
      • 1.3、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
      • 1.4、使用线程池
    • 2、线程池
      • 2.1、线程池七大参数
      • 2.2、运行流程
      • 2.3、常见的4种线程池特点
  • 二、CompletableFuture异步编排
    • 1、创建CompletableFuture
      • 1.1、runAsync(无返回值)
      • 1.2、supplyAsync(有返回值)
    • 2、各个方法使用
      • 1、`whenComplete`
      • 2、`handle`
      • 3、线程串行化 thenApply / thenAccept / thenRun
      • 4、任务组合 thenCombine / thenAcceptBoth / runAfterBoth
      • 5、组合任务(一个完成)applyToEither / acceptEither / runAfterEither
      • 6、多任务组合 allOf / anyOf

一、线程基本了解

1、创建线程的四种方式

1.1、继承Thread类

public static void main(String[] args) {Thread1 thread1 = new Thread1();thread1.start();}// 继承Thread类public static class Thread1 extends Thread{@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread().getName());}}

1.2、实现Runnable接口

public static void main(String[] args) {Runnable1 runnable1 = new Runnable1();new Thread(runnable1).start();}public static class Runnable1 implements Runnable{@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread().getName());}}

1.3、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)

public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new Callable1());// 线程启动new Thread(futureTask).start();// 堵塞等待整个线程执行完成,获取返回结果String name = futureTask.get();System.out.println("name = " + name);}public static class Callable1 implements Callable<String>{@Overridepublic String call() throws Exception {String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;}}

1.4、使用线程池

前三种启动线程方式,我们的业务代码都不会使用,在高并发情况下,线程一直创建,最终会导致资源耗尽。
.
使用线程池方式,分配给线程池设置的线程,等它处理完了,再处理下一个

// 整个系统最好只要一两个池public static ExecutorService executorService = Executors.newFixedThreadPool(10);// 直接把任务提交给线程池,让它执行。public static void main(String[] args) {//executorService.submit() // 有返回值executorService.execute(new Runnable() { // 无返回值@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread());}});}

2、线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(10000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

2.1、线程池七大参数

  • corePoolSize:[5] 核心线程数(一直存在,除非allowCoreThreadTimeOut);线程池创建好就准备了5个new Thread。

  • maximumPoolSize:[200]最大线程数量;控制资源用,不管多高的并发,也只有200个正在运行。

  • keepAliveTime:存活时间。只要线程空闲大于指定的keepAliveTime,释放空闲的线程(除核心线程外)。

  • unit:时间单位,给上面存活时间用。

  • workQueue:堵塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要线程空闲,就会去队列里取出新的任务继续执行。

  • threadFactory:线程创建工厂。

  • RejectedExecutionHandler handler: 如果队列满了,按照我们指定的拒绝策略拒绝任务。

    四种拒绝策略:
    CompletableFuture异步编排


2.2、运行流程

1、线程池创建,准备好core数量的核心线程,准备接收任务。

2、core核心线程满了,就将再进来的任务放入堵塞队列中。空闲的core就会自己去堵塞队列获取任务执行。

3、堵塞队列满了,就直接开启新线程执行,最大只能开到max指定的数量。

4、max满了,就有RejectedExecutionHandler拒绝策略拒接任务。

5、max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放除核心线程外的线程。

2.3、常见的4种线程池特点

该部分参考:http://blog.csdn.net/czd3355/article/details/52608567

Executors.newFixedThreadPool:
1、线程数量固定

2、只有核心线程切并且不会被回收

3、当所有线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来

Executors.newCachedThreadPool:
1、线程数量不定的线程池

2、只有非核心线程,最大线程数量为Integer.MAX_VALUE,可视为任意大

3、有超时机制,时长为60s,即超过60s的空闲线程就会被回收

4、当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。因此任何任务都会被立即执行

5、该线程池比较适合执行大量耗时较少的任务

Executors.newScheduledThreadPool
1、核心线程数量是固定的,而非核心线程数不固定的,并且非核心线程有超时机制,只要处于闲置状态就会被立即回收

2、该线程池主要用于执行定时任务和具有固定周期的重复任务

Executors.newSingleThreadPool

只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。因此在这些任务之间不需要处理线程同步的问题

二、CompletableFuture异步编排

CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。

1、创建CompletableFuture

1.1、runAsync(无返回值)

CompletableFuture.runAsync(()->{System.out.println("当前线程 = " + Thread.currentThread().getName());});

指定线程池

public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) {CompletableFuture.runAsync(()->{System.out.println("当前线程 = " + Thread.currentThread().getName());},executorService);}

CompletableFuture异步编排

1.2、supplyAsync(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;});// 获取返回值future.get();

指定线程池

public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;},executorService);// 获取返回值future.get();}

CompletableFuture异步编排

2、各个方法使用

1、whenComplete

翻译为:当任务完成时
whenComplete主要用于注入任务完成时的回调通知逻辑

// 创建不写,就是上方创建的代码/ * res 异步返回结果 * excption 异常信息 */// 当任务完成时的逻辑 future.whenComplete((res,excption)->{System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);});

exceptionally处理异常时默认返回

// 当任务完成时的逻辑future.whenComplete((res,excption)->{System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);}).exceptionally((throwable -> { // 异常处理return "当出现异常时,默认返回这个";})); 

2、handle

handle与whenComplete的作用有些类似,但是handle可以处理返回结果。

/ * res 异步返回结果 * excption 异常信息 */future.handle((res,excption)->{if (res!=null){ // 正常,直接返结果。return res;}if (excption!=null){return "如果异常,返这个";}return "两个都不走返这个";});

3、线程串行化 thenApply / thenAccept / thenRun

在这里插入图片描述以下有加async就是开启另一个线程执行这些方法。

如现在有A、B两个方法。

thenApply:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(拿A的返回值,处理后,返回处理后的值)

future.thenApply((res)->{return "这个任务完成后,执行这个方法,可以接收任务返回值,还可以把这个方法执行后的返回值返回去";});// 用get方法获取返回值future.get();

thenAccept:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。(拿A的返回值,B还要再处理,用这个方法)

future.thenAccept((res)->{System.out.println("这个任务完成后,执行这个方法,可以接收任务返回值,但是这个方法执行完没有返回值");});

thenRun:只要上面的任务执行完成,就开始执行thenRun,只是处理完成任务后,执行thenRun的后续结果。(不接收A返回值,直接执行B,用这个)

future.thenRun(()->{System.out.println("这个任务完成后,执行这个方法,不能拿到这个任务的返回值");});

4、任务组合 thenCombine / thenAcceptBoth / runAfterBoth

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值。

/ * future1 任务1 * future2 任务2 */CompletableFuture<String> future = future1.thenCombine(future2, (f1, f2) -> {// 获取两个的结果后处理逻辑,无返回值System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:" + f1 + "任务2结果" + f2);return "返回这个结果";});future.get(); // 用get获取结果

thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

future1.thenAcceptBoth(future2,(f1,f2)->{// 获取两个的结果后处理逻辑,无返回值System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:"+ f1 +"任务2结果" + f2); });

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完成任务后,处理该任务。

/ * future1 任务1 * future2 任务2 */future1.runAfterBoth(future2,()->{System.out.println("任务1、2都结束后,来处理这个任务");},executorService);//指定线程池。

5、组合任务(一个完成)applyToEither / acceptEither / runAfterEither

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

CompletableFuture<String> future3 = future1.applyToEither(future2, (res) -> {return "任务1、2有一个做完,就来执行这个,可以获取到返回结果,返回新的结果";});future3.get(); // 用get方法获取返回值

acceptEither:两个任务有一个执行完成,获取他的返回值,处理任务,没有新的返回值。

future1.acceptEither(future2,(res)->{System.out.println("任务1、2有一个做完,就来执行这个,可以获取到返回结果,无新的结果返回");});

runAfterEither:两个任务有一个执行完成,不需要获取futrue结果,处理任务,没有返回值。

future1.runAfterEither(future2,()->{System.out.println("任务1、2有一个做完,就来执行这个,无返回值");});

6、多任务组合 allOf / anyOf

allOf:等待所有任务完成

CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);allOf.get(); // 等待所有结果完成System.out.println("所有结果都完成,才会打印我");

anyOf:只要有一个任务完成

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);anyOf.get(); // 拿的是完成的那个的返回值。System.out.println("任意一个完成,都会打印我");

小吃零食网