以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了
前言
大家好,我是小郭,之前分享了CountDownLatch的使用,我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。
万万没想到,在生产环境中竟然翻车了,因为没有考虑到一些场景,导致了CountDownLatch出现了问题,接下来来分享一下由于CountDownLatch导致的问题。
# 【线程】并发流程控制的同步工具-CountDownLatch
需求背景
先简单介绍下业务场景,针对用户批量下载的文件进行修改上传
为了提高执行的速度,所以在采用线程池去执行 下载-修改-上传 的操作,并在全部执行完之后统一提交保存文件地址到数据库,于是加入了CountDownLatch来进行控制。
具体实现
根据服务本身情况,自定义一个线程池
public static ExecutorService testExtcutor() {
return new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1));
}模拟执行
public static void main(String[] args) {
// 下载文件总数
List<Integer> resultList = new ArrayList<>(100);
IntStream.range(0,100).forEach(resultList::add);
// 下载文件分段
List<List<Integer>> split = CollUtil.split(resultList, 10);
ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (List<Integer> list : split) {
executorService.execute(() -> {
list.forEach(i ->{
try {
// 模拟业务操作
Thread.sleep(500);
System.out.println("任务进入");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(e.getMessage());
} finally {
System.out.println(countDownLatch.getCount());
countDownLatch.countDown();
}
});
});
}
try {
countDownLatch.await();
System.out.println("countDownLatch.await()");
} catch (InterruptedException e) {
e.printStackTrace();
}
}一开始我个人感觉没有什么问题,反正finally都能够做减一的操作,到最后调用await方法,进行主线程任务
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown由于任务数量较多,阻塞队列中已经塞满了,所以默认的拒绝策略,当队列满时,处理策略报错异常,
要注意这个异常是线程池,自己抛出的,不是我们循环里面打印出来的,
这也造成了,线上这个线程池被阻塞了,他永远也调用不到await方法,
利用jstack,我们就能够看到有问题
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)解决方案
调大阻塞队列,但是问题来了,到底多少阻塞队列才是大呢,如果太大了会不由又造成内存溢出等其他的问题
在第一个的基础上,我们修改了拒绝策略,当触发拒绝策略的时候,用调用者所在的线程来执行任务
public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){
return new ThreadPoolExecutor(
size,
size,
0L,
TimeUnit.SECONDS,
workQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
}你可能又会想说,会不会任务数量太多,导致调用者所在的线程执行不过来,任务提交的性能急剧下降
那我们就应该自定义拒绝策略,将这下排队的消息记录下来,采用补偿机制的方式去执行
同时也要注意上面的那个异常是线程池抛出来的,我们自己也需要将线程池进行try catch,记录问题数据,并且在finally中执行countDownLatch.countDown来避免,线程池的使用
总结
目前根据业务部门的反馈,业务实际中任务数不很特别多的情况,所以暂时先采用了第二种方式去解决这个线上问题
在这里我们也可以看到,如果没有正确的关闭countDownLatch,可能会导致一直等待,这也是我们需要注意的。
工具虽然好,但是依然要注意他带来的问题,没有正确的去处理好,引发的一系列连锁反应。
链接:https://juejin.cn/post/7129116234804625421
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。