irpas技术客

JAVA8中并发类CompletableFuture使用遇到的坑-守护线程_pp_lan_completablefuture的坑

irpas 1083

1. 前言

最近在看Java8中对并发的支持CompletableFuture类,觉得挺好的,在单一线程执行的时候可以省略很多代码,手动验证后,发现其中还是有一部分坑的,此处记录一下。此处主要描述其创建线程为守护线程的问题,会随着主线程消亡直接消亡,导致任务失败。

2. CompletableFuture优点

简洁,后面以代码为例

3. 案例简述

3.1 CompletableFuture创建一个线程,内部进行轮询读取任务(如:模拟kafka之类的),会发现,执行结束后,整个线程一起关闭了,无法打到一直轮询的目的。传统直接创建的线程(默认为非守护线程)会由于子线程未结束,阻止主线程关闭。

4. 代码 package com.hz.threadpool.thread; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** * @description: * @author: pp_lan * @date: 2022/3/24 */ public class CompletableFutureTest { /** * java8并发接口 */ public static void main(String[] args) throws InterruptedException { System.out.println("============== testAsyncRunnable ==============="); CompletableFuture.runAsync((() -> { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000); System.out.println("**************kafka读取消息结束**************"); } catch (InterruptedException e) { e.printStackTrace(); } } })); TimeUnit.SECONDS.sleep(3); } } 5. 执行结果验证

主线程等待了3秒,子线程内部第3秒还未执行完成,随着主线程一起关闭了

Connected to the target VM, address: '127.0.0.1:64615', transport: 'socket' ============== testAsyncRunnable =============== **************kafka读取消息结束************** **************kafka读取消息结束************** Disconnected from the target VM, address: '127.0.0.1:64615', transport: 'socket' 6. 源码分析

?默认的线程池为java.util.concurrent.ForkJoinPool

线程池内部创建线程的时候将线程设置为了守护线程?

7. 解决方案(不完毕待补充)

既然问题出在了自定义的线程池创建Thread的时候,那么我们可以自定义线程池(此处也可以不可以覆写ThreadFactory),规避该问题 ,代码如下:

package com.hz.threadpool.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @description: * @author: pp_lan * @date: 2022/3/24 */ public class CompletableFutureTest { /** * java8并发接口 */ public static void main(String[] args) throws InterruptedException { System.out.println("============== testAsyncRunnable ==============="); ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), r -> new Thread(r)); CompletableFuture.runAsync((() -> { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000); System.out.println("**************kafka读取消息结束**************"); } catch (InterruptedException e) { e.printStackTrace(); } } }), pool); TimeUnit.SECONDS.sleep(3); } }

使用了自定义线程池虽然可以解决创建守护线程的问题。但是,经过测试,使用了自定义线程池后,线程池没有及时关闭,该方案极不推荐。如果不是配合get()使用,本人目前还是愿意使用自定义线程池直接执行线程。CompletableFuture该部分后面再进行补充。

相关文档:守护线程(Daemon)、钩子线程(Hook)简述


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #1 #2 #案例简述31