irpas技术客

CompletableFuture 使用及应用场景_再见丶孙悟空_completablefuture使用场景

网络投稿 8429

JDK1.8 之前,我们会通过 Future 和 Callable 采用轮询来实现异步获取结果

//定义一个异步任务 Future<String> future = executor.submit(()->{ Thread.sleep(2000); return "hello world"; }); //轮询获取结果 while (true){ if(future.isDone()) { System.out.println(future.get()); break; } }

JDK1.8 中提供的 CompletableFuture 提供了异步函数式编程。可以帮助我们简化异步编程的复杂性,通过回调的方式处理计算结果,并且提供了转换和组合的方法。

1 CompletableFuture 的使用 1.1 创建 CompletableFuture 对象

????????提供了四个静态方法来创建

public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

async 代表异步。

runAsync 和 supplyAsync 方法的区别在于,前者没有结果返回,后者会有结果返回

默认用的线程池为 ForkJoinPool.commonPool()

private static void completableFuture() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); // 阻塞的获取结果 while (true) { if (completableFuture.isDone()) { System.out.println(completableFuture.get()); break; } } }

1.2 阻塞获取

????????以下四个方法用于获取结果

public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join()

getNow() 代表计算完,如果返回结果或抛出异常就正常get,否则就返回给定的 valueIfAbsent 值

join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException)

? ? ? ? 主动触发计算

public boolean complete(T value) public boolean completeExceptionally(Throwable ex) private static void completableFuture3() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); // 设置 completableFuture.get() 获取到的值 completableFuture.complete("aaa"); System.out.println(completableFuture.get()); // 以异常的形式触发计算 // completableFuture.completeExceptionally(new Exception()); // System.out.println(completableFuture.get()); } 1.3 计算完成时处理 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

上面四个方法是计算阶段结束的时候触发

BiConsumer 有两个入参,分别代表计算的返回值,以及异常

private static void completableFuture4() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.whenCompleteAsync((v, e) -> { System.out.println("return value:" + v + " exception:" + e); }); System.out.println(completableFuture.get()); } public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

返回的不是原始返回的值,而是经过 BiFunction 处理返回的值

private static void completableFuture5() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.handle((v, e) -> { System.out.println("return value:" + v + " exception:" + e); return "handled " + v; }); System.out.println(completableFuture.get()); } 1.4 thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) private static void completableFuture6() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.thenApply((ele) -> { return "handled1 " + ele; }).thenApply((ele) -> { return "handled2 " + ele; }); System.out.println(completableFuture.get()); }

????????与 handle 方法的区别在于,handle 方法会处理正常值和异常。因此它可以屏蔽异常,避免异常继续抛出。

????????而 thenApply 方法只是用来处理正常值,因此一旦有异常就会抛出。

1.5 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

????????只对结果进行消费,没有返回值

1.6 thenAcceptBoth

public CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

????????用来组合两个 CompletableFuture,其中一个 CompletableFuture 等待另一个 CompletableFuture 的结果。

private static void completableFuture8() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Void> compose = completableFuture1.thenAcceptBoth(completableFuture2, (x, y) -> { System.out.println(x + " " + y); }); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(compose.get()); } 1.7 组合处理 public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) applyToEither 一个完成就触发 FunctionacceptEither 两个都完成才触发 ConsumerallOf 所有都执行完成才执行计算anyOf 任意一个执行完成就执行计算

2 CompletableFuture 的应用场景 2.1 创建异步任务

????????最初设计出来就是为了完成异步任务的功能。

2.2 简单任务异步回调

先说一下 async 和没有 async 的区别

有 async 的方法,前后任务共用一个线程池没有 async 的方法,第二个任务使用的是 ForkJoin 线程池

这部分很多都在前文中解释过了,就不再补充代码测试了。

1 thenRun/thenRunAsync 执行完第一个任务后,执行第二个任务。也就是第二个任务是第一个任务的回调。但是任务前后没有参数传递,第二个任务也没有返回值 2 thenAccept/thenAcceptAsync 执行完第一个任务后,执行第二个任务。会将第一个任务的结果当做入参传入第二个任务。第二个任务没有返回值,是 Consumer 3 thenApply/thenApplyAsync 执行完第一个任务后,执行第二个任务会将第一个任务的结果当做入参传入第二个任务。第二个任务也有返回值,是 Function 4 exceptionally 执行任务发生异常的回调。发生的异常作为参数,传递到回调方法中 5 whenComplete 类似于 thenAccept ,只是回调中会返回 exception而 thenAccept 方法有异常会直接对外抛出 6 handle 类似于 thenApply ,只是回调中会返回 exception而 thenApply 方法有异常会直接对外抛出 2.3 多个任务组合处理 1 and 组合关系

表示将两个任务组合起来,只有两个都正常执行完了,才会执行某个任务

thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值runAfterBoth 不会把执行结果当做方法入参,且没有返回值。 2 or 组合关系

表示将两个任务组合起来,只要其中有一个任务执行完了,就会执行某个任务

applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值runAfterEither: 不会把执行结果当做方法入参,且没有返回值。 3 allOf

????????所有任务都执行完成后,才执行返回的CompletableFuture

????????这边发现取不到 v 和 e 的值,因为可能存在多个任务,不确定最终执行哪个任务。

????????所以这个方法适用于各个任务的返回值没有关联关系

????????等待多个任务确认都完成后,再执行后续回调

????????有点类似 jemeter 中的设置集合点 的概念

private static void completableFuture10() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture1, completableFuture2) .whenComplete((v, e) -> System.out.println("value:" + v + "ex:" + e)); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(allOf.get()); } 4 anyOf

????????任意一个任务执行完,才执行返回的CompletableFuture

????????可以取到 v 和 e 的值,返回的是首先完成的任务的返回值

private static void completableFuture11() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Object> allOf = CompletableFuture.anyOf(completableFuture1, completableFuture2) .whenComplete((v, e) -> { System.out.println("value:" + v + "ex:" + e); }); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(allOf.get()); } 5 thenCompose

????????thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;如果该CompletableFuture实例为null,然后就执行这个新任务


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #jdk18 #之前我们会通过 #future # #callable