CompletableFuture异步编排
异步
- 一、线程基本了解
-
- 1、创建线程的四种方式
-
- 1.1、继承Thread类
- 1.2、实现Runnable接口
- 1.3、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
- 1.4、使用线程池
- 2、线程池
-
- 2.1、线程池七大参数
- 2.2、运行流程
- 2.3、常见的4种线程池特点
- 二、CompletableFuture异步编排
-
- 1、创建CompletableFuture
-
- 1.1、runAsync(无返回值)
- 1.2、supplyAsync(有返回值)
- 2、各个方法使用
-
- 1、`whenComplete`
- 2、`handle`
- 3、线程串行化 thenApply / thenAccept / thenRun
- 4、任务组合 thenCombine / thenAcceptBoth / runAfterBoth
- 5、组合任务(一个完成)applyToEither / acceptEither / runAfterEither
- 6、多任务组合 allOf / anyOf
一、线程基本了解
1、创建线程的四种方式
1.1、继承Thread类
public static void main(String[] args) {Thread1 thread1 = new Thread1();thread1.start();}// 继承Thread类public static class Thread1 extends Thread{@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread().getName());}}
1.2、实现Runnable接口
public static void main(String[] args) {Runnable1 runnable1 = new Runnable1();new Thread(runnable1).start();}public static class Runnable1 implements Runnable{@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread().getName());}}
1.3、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new Callable1());// 线程启动new Thread(futureTask).start();// 堵塞等待整个线程执行完成,获取返回结果String name = futureTask.get();System.out.println("name = " + name);}public static class Callable1 implements Callable<String>{@Overridepublic String call() throws Exception {String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;}}
1.4、使用线程池
前三种启动线程方式,我们的业务代码都不会使用,在高并发情况下,线程一直创建,最终会导致资源耗尽。
.
使用线程池方式,分配给线程池设置的线程,等它处理完了,再处理下一个
// 整个系统最好只要一两个池public static ExecutorService executorService = Executors.newFixedThreadPool(10);// 直接把任务提交给线程池,让它执行。public static void main(String[] args) {//executorService.submit() // 有返回值executorService.execute(new Runnable() { // 无返回值@Overridepublic void run() {System.out.println("当前线程 = " + Thread.currentThread());}});}
2、线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(10000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
2.1、线程池七大参数
-
corePoolSize
:[5] 核心线程数(一直存在,除非allowCoreThreadTimeOut);线程池创建好就准备了5个new Thread。 -
maximumPoolSize
:[200]最大线程数量;控制资源用,不管多高的并发,也只有200个正在运行。 -
keepAliveTime
:存活时间。只要线程空闲大于指定的keepAliveTime,释放空闲的线程(除核心线程外)。 -
unit
:时间单位,给上面存活时间用。 -
workQueue
:堵塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要线程空闲,就会去队列里取出新的任务继续执行。 -
threadFactory
:线程创建工厂。 -
RejectedExecutionHandler handler
: 如果队列满了,按照我们指定的拒绝策略拒绝任务。四种拒绝策略:
2.2、运行流程
1、线程池创建,准备好core数量的核心线程,准备接收任务。
2、core核心线程满了,就将再进来的任务放入堵塞队列中。空闲的core就会自己去堵塞队列获取任务执行。
3、堵塞队列满了,就直接开启新线程执行,最大只能开到max指定的数量。
4、max满了,就有RejectedExecutionHandler拒绝策略拒接任务。
5、max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放除核心线程外的线程。
2.3、常见的4种线程池特点
该部分参考:http://blog.csdn.net/czd3355/article/details/52608567
Executors.newFixedThreadPool
:
1、线程数量固定
2、只有核心线程切并且不会被回收
3、当所有线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来
Executors.newCachedThreadPool
:
1、线程数量不定的线程池
2、只有非核心线程,最大线程数量为Integer.MAX_VALUE,可视为任意大
3、有超时机制,时长为60s,即超过60s的空闲线程就会被回收
4、当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。因此任何任务都会被立即执行
5、该线程池比较适合执行大量耗时较少的任务
Executors.newScheduledThreadPool
:
1、核心线程数量是固定的,而非核心线程数不固定的,并且非核心线程有超时机制,只要处于闲置状态就会被立即回收
2、该线程池主要用于执行定时任务和具有固定周期的重复任务
Executors.newSingleThreadPool
:
只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。因此在这些任务之间不需要处理线程同步的问题
二、CompletableFuture异步编排
CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
1、创建CompletableFuture
1.1、runAsync(无返回值)
CompletableFuture.runAsync(()->{System.out.println("当前线程 = " + Thread.currentThread().getName());});
指定线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) {CompletableFuture.runAsync(()->{System.out.println("当前线程 = " + Thread.currentThread().getName());},executorService);}
1.2、supplyAsync(有返回值)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;});// 获取返回值future.get();
指定线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{String name = Thread.currentThread().getName();System.out.println("当前线程 = " + name);return name;},executorService);// 获取返回值future.get();}
2、各个方法使用
1、whenComplete
翻译为:当任务完成时
whenComplete主要用于注入任务完成时的回调通知逻辑
// 创建不写,就是上方创建的代码/ * res 异步返回结果 * excption 异常信息 */// 当任务完成时的逻辑 future.whenComplete((res,excption)->{System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);});
exceptionally
处理异常时默认返回
// 当任务完成时的逻辑future.whenComplete((res,excption)->{System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);}).exceptionally((throwable -> { // 异常处理return "当出现异常时,默认返回这个";}));
2、handle
handle与whenComplete的作用有些类似,但是handle可以处理返回结果。
/ * res 异步返回结果 * excption 异常信息 */future.handle((res,excption)->{if (res!=null){ // 正常,直接返结果。return res;}if (excption!=null){return "如果异常,返这个";}return "两个都不走返这个";});
3、线程串行化 thenApply / thenAccept / thenRun
以下有加async就是开启另一个线程执行这些方法。
如现在有A、B两个方法。
thenApply
:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(拿A的返回值,处理后,返回处理后的值)
future.thenApply((res)->{return "这个任务完成后,执行这个方法,可以接收任务返回值,还可以把这个方法执行后的返回值返回去";});// 用get方法获取返回值future.get();
thenAccept
:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。(拿A的返回值,B还要再处理,用这个方法)
future.thenAccept((res)->{System.out.println("这个任务完成后,执行这个方法,可以接收任务返回值,但是这个方法执行完没有返回值");});
thenRun
:只要上面的任务执行完成,就开始执行thenRun,只是处理完成任务后,执行thenRun的后续结果。(不接收A返回值,直接执行B,用这个)
future.thenRun(()->{System.out.println("这个任务完成后,执行这个方法,不能拿到这个任务的返回值");});
4、任务组合 thenCombine / thenAcceptBoth / runAfterBoth
thenCombine
:组合两个future,获取两个future的返回结果,并返回当前任务的返回值。
/ * future1 任务1 * future2 任务2 */CompletableFuture<String> future = future1.thenCombine(future2, (f1, f2) -> {// 获取两个的结果后处理逻辑,无返回值System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:" + f1 + "任务2结果" + f2);return "返回这个结果";});future.get(); // 用get获取结果
thenAcceptBoth
:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
future1.thenAcceptBoth(future2,(f1,f2)->{// 获取两个的结果后处理逻辑,无返回值System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:"+ f1 +"任务2结果" + f2); });
runAfterBoth
:组合两个future,不需要获取future的结果,只需两个future处理完成任务后,处理该任务。
/ * future1 任务1 * future2 任务2 */future1.runAfterBoth(future2,()->{System.out.println("任务1、2都结束后,来处理这个任务");},executorService);//指定线程池。
5、组合任务(一个完成)applyToEither / acceptEither / runAfterEither
applyToEither
:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
CompletableFuture<String> future3 = future1.applyToEither(future2, (res) -> {return "任务1、2有一个做完,就来执行这个,可以获取到返回结果,返回新的结果";});future3.get(); // 用get方法获取返回值
acceptEither
:两个任务有一个执行完成,获取他的返回值,处理任务,没有新的返回值。
future1.acceptEither(future2,(res)->{System.out.println("任务1、2有一个做完,就来执行这个,可以获取到返回结果,无新的结果返回");});
runAfterEither
:两个任务有一个执行完成,不需要获取futrue结果,处理任务,没有返回值。
future1.runAfterEither(future2,()->{System.out.println("任务1、2有一个做完,就来执行这个,无返回值");});
6、多任务组合 allOf / anyOf
allOf
:等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);allOf.get(); // 等待所有结果完成System.out.println("所有结果都完成,才会打印我");
anyOf
:只要有一个任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);anyOf.get(); // 拿的是完成的那个的返回值。System.out.println("任意一个完成,都会打印我");