Spring Boot配置线程池拒绝策略的场景分析(妥善处理好溢出的任务)

作者:程序猿DD 时间:2022-08-05 07:12:24 

通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了用@Async创建异步任务、为异步任务配置线程池、使用多个线程池隔离不同的异步任务。今天这篇,我们继续对上面的知识进行完善和优化!

如果你已经看过上面几篇内容并已经掌握之后,一起来思考下面这个问题:

假设,线程池配置为核心线程数2、最大线程数2、缓冲队列长度2。此时,有5个异步任务同时开始,会发生什么?

场景重现

我们先来把上面的假设用代码实现一下:

第一步:创建Spring Boot应用,根据上面的假设写好线程池配置。


@EnableAsync
@SpringBootApplication
public class Chapter78Application {

public static void main(String[] args) {
       SpringApplication.run(Chapter78Application.class, args);
   }

@EnableAsync
   @Configuration
   class TaskPoolConfig {

@Bean
       public Executor taskExecutor1() {
           ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
           executor.setCorePoolSize(2);
           executor.setMaxPoolSize(2);
           executor.setQueueCapacity(2);
           executor.setKeepAliveSeconds(60);
           executor.setThreadNamePrefix("executor-1-");
           return executor;
       }

}

}

第二步:用@Async注解实现一个部分任务


@Slf4j
@Component
public class AsyncTasks {

public static Random random = new Random();

@Async("taskExecutor1")
   public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
       log.info("开始任务:{}", taskNo);
       long start = System.currentTimeMillis();
       Thread.sleep(random.nextInt(10000));
       long end = System.currentTimeMillis();
       log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
       return CompletableFuture.completedFuture("任务完成");
   }

}

第三步:编写测试用例


@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {

@Autowired
   private AsyncTasks asyncTasks;

@Test
   public void test2() throws Exception {
       // 线程池配置:core-2,max-2,queue=2,同时有5个任务,出现下面异常:
       // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
       // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9

long start = System.currentTimeMillis();

// 线程池1
       CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
       CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
       CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
       CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
       CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");

// 一起执行
       CompletableFuture.allOf(task1, task2, task3, task4, task5).join();

long end = System.currentTimeMillis();

log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
   }

}

执行一下,可以类似下面这样的日志信息:

2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始任务:2
2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-1] com.didispace.chapter78.AsyncTasks       : 开始任务:1

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732

 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
 at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
 at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
 at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
 at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
 at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
 at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
 at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
 at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
 at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at java.util.ArrayList.forEach(ArrayList.java:1255)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at java.util.ArrayList.forEach(ArrayList.java:1255)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
 at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
 at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
 at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
 at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
 ... 74 more

从异常信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 中,可以很明确的知道,第5个任务因为超过了执行线程+缓冲队列长度,而被拒绝了。

所有,默认情况下,线程池的拒绝策略是:当线程池队列满了,会丢弃这个任务,并抛出异常。

配置拒绝策略

虽然线程池有默认的拒绝策略,但实际开发过程中,有些业务场景,直接拒绝的策略往往并不适用,有时候我们可能会选择舍弃最早开始执行而未完成的任务、也可能会选择舍弃刚开始执行而未完成的任务等更贴近业务需要的策略。所以,为线程池配置其他拒绝策略或自定义拒绝策略是很常见的需求,那么这个要怎么实现呢?

下面就来具体说说今天的正题,如何为线程池配置拒绝策略、如何自定义拒绝策略。

看下面这段代码的最后一行,setRejectedExecutionHandler方法就是为线程池设置拒绝策略的方法:


ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//...其他线程池配置

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor中提供了4种线程的策略可以供开发者直接使用,你只需要像下面这样设置即可:


// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含义分别是:

  • AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

  • DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

  • DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。

  • CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。

而如果你要自定义一个拒绝策略,那么可以这样写:


executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   @Override
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
       // 拒绝策略的逻辑
   }
});

当然如果你喜欢用Lamba表达式,也可以这样写:


executor.setRejectedExecutionHandler((r, executor1) -> {
   // 拒绝策略的逻辑
});

来源:https://www.cnblogs.com/didispace/p/15324105.html

标签:Spring,Boot,线程池,拒绝策略
0
投稿

猜你喜欢

  • Java线程优先级和守护线程原理解析

    2023-03-27 16:45:30
  • C#常用数据结构和算法总结

    2021-10-31 22:05:29
  • 使用ehcache三步搞定springboot缓存的方法示例

    2021-06-25 04:44:12
  • Android使用PhotoView实现图片双击放大单击退出效果

    2022-10-10 04:52:11
  • C#多线程爬虫抓取免费代理IP的示例代码

    2023-02-08 01:22:01
  • java上乘武功入门--反射

    2021-06-08 21:32:27
  • android ToolBar的简单使用

    2023-03-05 10:44:33
  • 解决java启动时报线程占用报错:Exception in thread “Thread-14“ java.net.BindException: Address already in use: bind

    2021-07-05 04:26:23
  • C#中WebClient实现文件下载

    2022-10-11 18:04:57
  • android通过自定义toast实现悬浮通知效果的示例代码

    2022-08-11 03:23:54
  • C#修改IIS站点framework版本号的方法

    2023-08-29 19:14:13
  • C#递归算法之归并排序

    2023-01-01 14:49:36
  • C#用RabbitMQ实现消息订阅与发布

    2022-09-05 16:23:40
  • struts2拦截器_动力节点Java学院整理

    2023-06-11 10:11:36
  • mybatis中mapper-locations的作用

    2023-11-10 18:02:58
  • Android指纹识别API初试

    2023-01-15 20:16:11
  • Android开发之全屏与非全屏的切换设置方法小结

    2021-07-05 04:10:13
  • Java与C++分别用递归实现汉诺塔详解

    2021-10-23 01:28:59
  • c#使用正则表达式匹配字符串验证URL示例

    2023-01-01 10:40:10
  • java9版本特性资源自动关闭的语法增强

    2023-10-30 23:35:24
  • asp之家 软件编程 m.aspxhome.com