JDK1.8 之前,我们会通过 Future 和 Callable 采用轮询来实现异步获取结果
//定义一个异步任务 Future<String> future = executor.submit(()->{ Thread.sleep(2000); return "hello world"; }); //轮询获取结果 while (true){ if(future.isDone()) { System.out.println(future.get()); break; } }JDK1.8 中提供的 CompletableFuture 提供了异步函数式编程。可以帮助我们简化异步编程的复杂性,通过回调的方式处理计算结果,并且提供了转换和组合的方法。
1 CompletableFuture 的使用 1.1 创建 CompletableFuture 对象????????提供了四个静态方法来创建
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)async 代表异步。
runAsync 和 supplyAsync 方法的区别在于,前者没有结果返回,后者会有结果返回
默认用的线程池为 ForkJoinPool.commonPool()
private static void completableFuture() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); // 阻塞的获取结果 while (true) { if (completableFuture.isDone()) { System.out.println(completableFuture.get()); break; } } } 1.2 阻塞获取????????以下四个方法用于获取结果
public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join()getNow() 代表计算完,如果返回结果或抛出异常就正常get,否则就返回给定的 valueIfAbsent 值
join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException)
? ? ? ? 主动触发计算
public boolean complete(T value) public boolean completeExceptionally(Throwable ex) private static void completableFuture3() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); // 设置 completableFuture.get() 获取到的值 completableFuture.complete("aaa"); System.out.println(completableFuture.get()); // 以异常的形式触发计算 // completableFuture.completeExceptionally(new Exception()); // System.out.println(completableFuture.get()); } 1.3 计算完成时处理 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)上面四个方法是计算阶段结束的时候触发
BiConsumer 有两个入参,分别代表计算的返回值,以及异常
private static void completableFuture4() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.whenCompleteAsync((v, e) -> { System.out.println("return value:" + v + " exception:" + e); }); System.out.println(completableFuture.get()); } public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)返回的不是原始返回的值,而是经过 BiFunction 处理返回的值
private static void completableFuture5() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.handle((v, e) -> { System.out.println("return value:" + v + " exception:" + e); return "handled " + v; }); System.out.println(completableFuture.get()); } 1.4 thenApply public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) private static void completableFuture6() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); completableFuture = completableFuture.thenApply((ele) -> { return "handled1 " + ele; }).thenApply((ele) -> { return "handled2 " + ele; }); System.out.println(completableFuture.get()); }????????与 handle 方法的区别在于,handle 方法会处理正常值和异常。因此它可以屏蔽异常,避免异常继续抛出。
????????而 thenApply 方法只是用来处理正常值,因此一旦有异常就会抛出。
1.5 thenAccept public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)????????只对结果进行消费,没有返回值
1.6 thenAcceptBoth public CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)????????用来组合两个 CompletableFuture,其中一个 CompletableFuture 等待另一个 CompletableFuture 的结果。
private static void completableFuture8() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Void> compose = completableFuture1.thenAcceptBoth(completableFuture2, (x, y) -> { System.out.println(x + " " + y); }); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(compose.get()); } 1.7 组合处理 public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) applyToEither 一个完成就触发 FunctionacceptEither 两个都完成才触发 ConsumerallOf 所有都执行完成才执行计算anyOf 任意一个执行完成就执行计算 2 CompletableFuture 的应用场景 2.1 创建异步任务????????最初设计出来就是为了完成异步任务的功能。
2.2 简单任务异步回调先说一下 async 和没有 async 的区别
有 async 的方法,前后任务共用一个线程池没有 async 的方法,第二个任务使用的是 ForkJoin 线程池这部分很多都在前文中解释过了,就不再补充代码测试了。
1 thenRun/thenRunAsync 执行完第一个任务后,执行第二个任务。也就是第二个任务是第一个任务的回调。但是任务前后没有参数传递,第二个任务也没有返回值 2 thenAccept/thenAcceptAsync 执行完第一个任务后,执行第二个任务。会将第一个任务的结果当做入参传入第二个任务。第二个任务没有返回值,是 Consumer 3 thenApply/thenApplyAsync 执行完第一个任务后,执行第二个任务会将第一个任务的结果当做入参传入第二个任务。第二个任务也有返回值,是 Function 4 exceptionally 执行任务发生异常的回调。发生的异常作为参数,传递到回调方法中 5 whenComplete 类似于 thenAccept ,只是回调中会返回 exception而 thenAccept 方法有异常会直接对外抛出 6 handle 类似于 thenApply ,只是回调中会返回 exception而 thenApply 方法有异常会直接对外抛出 2.3 多个任务组合处理 1 and 组合关系表示将两个任务组合起来,只有两个都正常执行完了,才会执行某个任务
thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值runAfterBoth 不会把执行结果当做方法入参,且没有返回值。 2 or 组合关系表示将两个任务组合起来,只要其中有一个任务执行完了,就会执行某个任务
applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值runAfterEither: 不会把执行结果当做方法入参,且没有返回值。 3 allOf????????所有任务都执行完成后,才执行返回的CompletableFuture
????????这边发现取不到 v 和 e 的值,因为可能存在多个任务,不确定最终执行哪个任务。
????????所以这个方法适用于各个任务的返回值没有关联关系
????????等待多个任务确认都完成后,再执行后续回调
????????有点类似 jemeter 中的设置集合点 的概念
private static void completableFuture10() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture1, completableFuture2) .whenComplete((v, e) -> System.out.println("value:" + v + "ex:" + e)); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(allOf.get()); } 4 anyOf????????任意一个任务执行完,才执行返回的CompletableFuture
????????可以取到 v 和 e 的值,返回的是首先完成的任务的返回值
private static void completableFuture11() throws Exception { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world1"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world2"; }); CompletableFuture<Object> allOf = CompletableFuture.anyOf(completableFuture1, completableFuture2) .whenComplete((v, e) -> { System.out.println("value:" + v + "ex:" + e); }); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(allOf.get()); } 5 thenCompose????????thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;如果该CompletableFuture实例为null,然后就执行这个新任务
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |