多线程--CompletableFuture
- 大家在工作的时候,肯定会遇到很多多线程的场景。我们先简单的比较一下单线程和多线程之间的差距
public static void main(String[] args) throws ExecutionException, InterruptedException { //单线程计算累加 Instant start = Instant.now(); long num=0; for (long i = 0; i < 400000000; i++) { num++; } Instant end = Instant.now(); System.out.println(num+"单线程耗时:"+(end.getNano()-start.getNano())); Instant startThread = Instant.now(); long num1=0; //多线程Callable计算累加 Callable<Long> add1= () -> { long callableNum =0; for (long i = 0; i < 200000000; i++) { callableNum++; } return callableNum; }; Callable<Long> add2= () -> { long callableNum =0; for (long i = 0; i < 200000000; i++) { callableNum++; } return callableNum; }; FutureTask<Long> integerFutureTask = new FutureTask<>(add1); new Thread(integerFutureTask).start(); FutureTask<Long> integerFutureTask2 = new FutureTask<>(add2); new Thread(integerFutureTask2).start(); Long integer = integerFutureTask.get(); Long integer1 = integerFutureTask2.get(); Instant endThread = Instant.now(); System.out.println(integer+integer1+"多线程耗时:"+(endThread.getNano()-startThread.getNano())); //jdk8 CompletableFuture 多线程计算累加 Instant startCompletableFuture = Instant.now(); CompletableFuture<Long> longCompletableFuture1 = CompletableFuture.supplyAsync(() -> { long callableNum = 0; for (long i = 0; i < 200000000; i++) { callableNum++; } return callableNum; }); CompletableFuture<Long> longCompletableFuture2 = CompletableFuture.supplyAsync(() -> { long callableNum = 0; for (long i = 0; i < 200000000; i++) { callableNum++; } return callableNum; }); Instant endCompletableFuture = Instant.now(); CompletableFuture.allOf(longCompletableFuture1,longCompletableFuture2).join(); System.out.println(longCompletableFuture1.get()+longCompletableFuture2.get()+"CompletableFuture多线程耗时:"+(endCompletableFuture.getNano()-startCompletableFuture.getNano())); }
结果如下:
400000000单线程耗时:-788952400400000000多线程耗时:66014200400000000CompletableFuture多线程耗时:4001600
由此可见,当我们的循环次数累加到400000000的时候,单线程和多线程之间的差距就能够看出来了,而多线程CompletableFuture方式比Callable的方式效率更好一些。
- 下面简单的介绍一下Callable方式和CompletableFuture创建多线程
- 集成Thread和实现接口Runnable请参考 https://blog.csdn.net/qq_40597962/article/details/124490301
- Callable和FutureTask方式创建多线程
public static void main(String[] args) throws ExecutionException, InterruptedException{ long start = System.currentTimeMillis(); FutureTask futureTask=new FutureTask(new Callable() { @Override public String call() throws Exception { Thread.sleep(20000); System.out.println("calld方法执行了"); return "call方法返回值"; } }); FutureTask futureTask1=new FutureTask(new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); System.out.println("calld方法执行了1"); return "call方法返回值1"; } }); new Thread(futureTask).start(); new Thread(futureTask1).start(); System.out.println("获取返回值: " + futureTask.get()); System.out.println("获取返回值1: " + futureTask1.get()); long end = System.currentTimeMillis(); System.out.println(end - start); }
执行结果
calld方法执行了1calld方法执行了获取返回值: call方法返回值获取返回值1: call方法返回值120016
- CompletableFuture方式
public static void main(String[] args) { Supplier<String> s1 = () -> "Hello World!"; System.out.println(s1.get()); Random random = new Random(); Supplier<Integer> s2 = () -> random.nextInt(10); System.out.println(s2.get()); System.out.println("-------------CompletableFuture---------"); CompletableFuture<String> future = CompletableFuture.supplyAsync(s1); try { System.out.println(future.get()); } catch (Exception e) { e.printStackTrace(); } System.out.println("-------------join---------"); future.complete("hello join"); System.out.println(future.join()); Supplier<String> s3 = MyUtil::getFavoriteBook; System.out.println(s3.get()); System.out.println("=============complete==============="); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(s1); try { System.out.println(future1.get()); } catch (Exception e) { e.printStackTrace(); } future1.complete("hello complete"); System.out.println(future1.join()); } class MyUtil { private Integer age = 30; public static String getFavoriteBook(){ return "Mahabharat"; } public Integer getAge(){ return age; }}
返回结果
Hello World!8-------------CompletableFuture---------Hello World!-------------join---------Hello World!Mahabharat=============complete===============Hello World!Hello World!
简单的分析一下,Supplier是java8的一个特性,不输入任何参数返回一个结果,这个可以通过lamda表达式或者函数式接口::接口进行赋值,比如
Supplier s1 = () -> "Hello World!";
CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
查看CompletableFuture的API,得知有下面四个创建任务的方式runAsync是没有返回值的,supplyAsync是有返回值的,在使用supplyAsync方法时,可以通过get()或者join()方法获取返回值,而这两个的方法区别在于,当返回值为null时,使用get()会抛出InterruptedException异常。
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(ASYNC_POOL, runnable);} public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) { return asyncRunStage(screenExecutor(executor), runnable);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}
- thenApply / thenAccept / thenRun
- thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。
- thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。
- thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。
public static void main(String[] args) {Supplier<String> s1 = () -> "Hello World!"; System.out.println("-------------CompletableFuture---------"); CompletableFuture<String> future = CompletableFuture.supplyAsync(s1); try { System.out.println(future.get()); } catch (Exception e) {e.printStackTrace(); } /** * thenApply / thenAccept / thenRun * * thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。 * thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。 * thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。 * */ CompletableFuture<String> future2 = future.thenApply((p)->{ System.out.println("compute 2"); return p+10; }); System.out.println("result: " + future2.join()); }
执行结果
-------------CompletableFuture---------Hello World!compute 2result: Hello World!10
可以看到我们首先创建了一个future,然后又创建了一个future2,使用thenApply方法将future的结果作为入参,最后得到
Hello World!10。
- thenCombine
public static void main(String[] args) { /** * thenCombine * * thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型) * 从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时 传递给下游处理任务,从而得到最终结果 * * */ System.out.println("------------------thenCombine-----------------------------"); CompletableFuture future11 = CompletableFuture.supplyAsync(()->{ System.out.println("compute 1"); return 1; }); CompletableFuture future12 = CompletableFuture.supplyAsync(()->{ System.out.println("compute 2"); return 10; }); CompletableFuture future13 = future11.thenCombine(future12, (r1, r2)->r1 + r2); System.out.println("result: " + future13.join()); }
- allOf/anyOf
CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
// completableFutureList是多个任务的集合,下面的代码的作用是:当所有的任务都执行完成后才会执行后续的逻辑CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).join();