> 文档中心 > 多线程--CompletableFuture

多线程--CompletableFuture

  • 大家在工作的时候,肯定会遇到很多多线程的场景。我们先简单的比较一下单线程和多线程之间的差距
public static void main(String[] args) throws ExecutionException, InterruptedException { //单线程计算累加 Instant start = Instant.now(); long num=0; for (long i = 0; i < 400000000; i++) {     num++; } Instant end = Instant.now(); System.out.println(num+"单线程耗时:"+(end.getNano()-start.getNano())); Instant startThread = Instant.now(); long num1=0; //多线程Callable计算累加 Callable<Long> add1= () -> {     long callableNum =0;     for (long i = 0; i < 200000000; i++) {  callableNum++;     }     return callableNum; }; Callable<Long> add2= () -> {     long callableNum =0;     for (long i = 0; i < 200000000; i++) {  callableNum++;     }     return callableNum; }; FutureTask<Long> integerFutureTask = new FutureTask<>(add1); new Thread(integerFutureTask).start(); FutureTask<Long> integerFutureTask2 = new FutureTask<>(add2); new Thread(integerFutureTask2).start(); Long integer = integerFutureTask.get(); Long integer1 = integerFutureTask2.get(); Instant endThread = Instant.now(); System.out.println(integer+integer1+"多线程耗时:"+(endThread.getNano()-startThread.getNano())); //jdk8 CompletableFuture 多线程计算累加 Instant startCompletableFuture = Instant.now(); CompletableFuture<Long> longCompletableFuture1 = CompletableFuture.supplyAsync(() -> {     long callableNum = 0;     for (long i = 0; i < 200000000; i++) {  callableNum++;     }     return callableNum; }); CompletableFuture<Long> longCompletableFuture2 = CompletableFuture.supplyAsync(() -> {     long callableNum = 0;     for (long i = 0; i < 200000000; i++) {  callableNum++;     }     return callableNum; }); Instant endCompletableFuture = Instant.now(); CompletableFuture.allOf(longCompletableFuture1,longCompletableFuture2).join(); System.out.println(longCompletableFuture1.get()+longCompletableFuture2.get()+"CompletableFuture多线程耗时:"+(endCompletableFuture.getNano()-startCompletableFuture.getNano()));    }

结果如下:

400000000单线程耗时:-788952400400000000多线程耗时:66014200400000000CompletableFuture多线程耗时:4001600

由此可见,当我们的循环次数累加到400000000的时候,单线程和多线程之间的差距就能够看出来了,而多线程CompletableFuture方式比Callable的方式效率更好一些。

  • 下面简单的介绍一下Callable方式和CompletableFuture创建多线程
  • 集成Thread和实现接口Runnable请参考 https://blog.csdn.net/qq_40597962/article/details/124490301
  • Callable和FutureTask方式创建多线程
 public static void main(String[] args)  throws ExecutionException, InterruptedException{ long start = System.currentTimeMillis(); FutureTask futureTask=new FutureTask(new Callable() {     @Override     public String call() throws Exception {  Thread.sleep(20000);  System.out.println("calld方法执行了");  return "call方法返回值";     } }); FutureTask futureTask1=new FutureTask(new Callable() {     @Override     public String call() throws Exception {  Thread.sleep(3000);  System.out.println("calld方法执行了1");  return "call方法返回值1";     } }); new Thread(futureTask).start(); new Thread(futureTask1).start(); System.out.println("获取返回值: " + futureTask.get()); System.out.println("获取返回值1: " + futureTask1.get()); long end = System.currentTimeMillis(); System.out.println(end - start);    }

执行结果

calld方法执行了1calld方法执行了获取返回值: call方法返回值获取返回值1: call方法返回值120016
  • CompletableFuture方式
 public static void main(String[] args) { Supplier<String> s1 = () -> "Hello World!"; System.out.println(s1.get()); Random random = new Random(); Supplier<Integer> s2 = () -> random.nextInt(10); System.out.println(s2.get()); System.out.println("-------------CompletableFuture---------"); CompletableFuture<String> future = CompletableFuture.supplyAsync(s1); try {     System.out.println(future.get()); } catch (Exception e) {   e.printStackTrace(); } System.out.println("-------------join---------"); future.complete("hello join"); System.out.println(future.join()); Supplier<String> s3 = MyUtil::getFavoriteBook; System.out.println(s3.get()); System.out.println("=============complete==============="); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(s1); try {     System.out.println(future1.get()); } catch (Exception e) {     e.printStackTrace(); } future1.complete("hello complete"); System.out.println(future1.join());    }    class MyUtil {    private Integer age = 30;    public static String getFavoriteBook(){ return "Mahabharat";    }    public Integer getAge(){ return age;    }}    

返回结果

Hello World!8-------------CompletableFuture---------Hello World!-------------join---------Hello World!Mahabharat=============complete===============Hello World!Hello World!

简单的分析一下,Supplier是java8的一个特性,不输入任何参数返回一个结果,这个可以通过lamda表达式或者函数式接口::接口进行赋值,比如

Supplier s1 = () -> "Hello World!";

CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

查看CompletableFuture的API,得知有下面四个创建任务的方式runAsync是没有返回值的,supplyAsync是有返回值的,在使用supplyAsync方法时,可以通过get()或者join()方法获取返回值,而这两个的方法区别在于,当返回值为null时,使用get()会抛出InterruptedException异常。

 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(ASYNC_POOL, runnable);}  public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) { return asyncRunStage(screenExecutor(executor), runnable);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}
  • thenApply / thenAccept / thenRun
    • thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。
    • thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。
    • thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。
public static void main(String[] args) {Supplier<String> s1 = () -> "Hello World!"; System.out.println("-------------CompletableFuture---------");     CompletableFuture<String> future = CompletableFuture.supplyAsync(s1);     try { System.out.println(future.get());     } catch (Exception e) {e.printStackTrace();     }     /**      * thenApply / thenAccept / thenRun      *      * thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。      * thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。      * thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。      *      */      CompletableFuture<String> future2 = future.thenApply((p)->{  System.out.println("compute 2");  return p+10;      });      System.out.println("result: " + future2.join());     }

执行结果

-------------CompletableFuture---------Hello World!compute 2result: Hello World!10

可以看到我们首先创建了一个future,然后又创建了一个future2,使用thenApply方法将future的结果作为入参,最后得到

Hello World!10。

  • thenCombine
 public static void main(String[] args) { /** * thenCombine  *  * thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型)  * 从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时    传递给下游处理任务,从而得到最终结果  *  *  */ System.out.println("------------------thenCombine-----------------------------"); CompletableFuture future11 = CompletableFuture.supplyAsync(()->{     System.out.println("compute 1");     return 1; }); CompletableFuture future12 = CompletableFuture.supplyAsync(()->{     System.out.println("compute 2");     return 10; }); CompletableFuture future13 = future11.thenCombine(future12, (r1, r2)->r1 + r2); System.out.println("result: " + future13.join()); }
  • allOf/anyOf

CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

// completableFutureList是多个任务的集合,下面的代码的作用是:当所有的任务都执行完成后才会执行后续的逻辑CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).join();