简单聊一聊Java线程池ThreadPoolExecutor
作者:生命猿于运动 时间:2021-10-23 15:56:58
简介
ThreadPoolExecutor
是一个实现ExecutorService
接口的线程池,ExecutorService
是主要用来处理多线程任务的一个接口,通常比较简单是用法是由Executors
工厂类去创建。
线程池主要解决了两个不同的问题:
在执行大量异步任务时,为了能够提高性能,通常会减少每个任务的调用开销。
提供了一系列多线程任务的管理方法,便于多任务执行时合理分配资源以及一些异常情况的处理。每个ThreadPoolExecutor还维护一些基本统计信息。例如:已完成任务的数量,当前获得线程数等。
参数说明
ThreadPoolExecutor
提供了几个核心参数,方便开发人员根据具体场景合理分配线程资源。
corePoolSize
:核心线程数,在线程池创建时就已初始化好的n个核心线程,即使线程空闲着也会一直保留在线程池中不被销毁,除非调用线程池方法设置了java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(true)
(允许核心线程超时销毁)。maximumPoolSize
:线程池允许存在最大线程数。keepAliveTime
:当线程数大于核心线程数时,多余的线程在执行任务结束后等待新任务的最大等待时间。unit
:TimeUnit
类型,是keepAliveTime
多余线程最大空余时间单位。workQueue
:必须指定一个阻塞队列,在线程池执行execute
方法时新进来的任务在执行前都会保留到此队列里进入等待。threadFactory
:创建线程的工厂,默认采用Executors.defaultThreadFactory()
创建线程。handler
:拒绝策略,当最大线程数已占满,且队列已满,此时线程池将触发拒绝策略,对新进来的任务做拒绝处理,具体的处理方案在后面详细分析(默认使用java.util.concurrent.ThreadPoolExecutor.AbortPolicy
直接抛出异常拒绝处理)。
注:
maximumPoolSize
如果大于corePoolSize
,则多出的部分线程数只有在阻塞队列workQueue占满时才会创建核心线程之外的线程去执行任务,如果我们设置的阻塞队列为 * 队列(默认大小为Integer.MAX_VALUE
),则队列永远无法占满,就不会去创建额外的线程进行工作,一般情况如果任务数足够,那么也是在队列大小还没达到Integer.MAX_VALUE
时就已经出现内存溢出了。Executors
线程池工厂中的newFixedThreadPool()、newSingleThreadExecutor()
方法就是使用了 * 队列LinkedBlockingQueue
,防止内存溢出在日常开发过程中一般是不建议直接去使用Executors
去创建线程池。
如何创建线程池
上面我们提到的可以使用Executors
工厂直接创建线程池,但是Executors
提供的创建线程池都是不可控的,我们还是得按自己的业务做好分析自定义一个线程池。
以下是线程池创建的一个案例:
@Slf4j
@Configuration
public class ThreadPoolConfig {
@Value("${threadPool.corePoolSize:8}")
private int corePoolSize;
@Value("${threadPool.maximumPoolSize:16}")
private int maximumPoolSize;
@Value("${threadPool.keepAliveTime:60}")
private int keepAliveTime;
@Value("${threadPool.queueSize:99999}")
private int queueSize;
@Bean
public ThreadPoolExecutor testExecutor() {
LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize);
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.SECONDS, queue, getThreadFactory(), getRejectedExecutionHandler());
}
/**
* 自定义线程池创建线程工厂,用于线程池创建线程的工厂
* @return
*/
private ThreadFactory getThreadFactory() {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
log.info("===> Create new thread ...");
return new Thread(r);
}
};
}
/**
* 自定义拒绝策略,继续往队列里添加任务进入等待
* @return
*/
private RejectedExecutionHandler getRejectedExecutionHandler() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 继续往队列里添加任务,这里只是一个案例,这种方式并不友好,会抛出队列已满的异常
log.info("===> Handler runnable ......");
executor.getQueue().add(r);
}
};
}
}
application.properties
配置文件
threadPool:
corePoolSize: 8
maximumPoolSize: 16
keepAliveTime: 60
# 为方便测试这里我们配置队列数小一点
queueSize: 99
由以上的线程池配置,我们写一个demo测试一下:
截取部分运行日志:
红框我们可以看到执行到了线程创建工厂部分代码块
蓝色框日志我们可以看到
largestPoolSize=11
,这是由于我们配置的maximumPoolSize=16 > corePoolSize=8
,我们demo执行的是110个任务并发,队列大小是99,由此分析得出(需要额外出创建线程数 = 并发任务总数110 - 核心线程数8 - 队列大小99 = 3),所以线程池在队列已满时会多创建3个线程用于执行任务,在达到keepAliveTime
配置的最大空闲时间后这3个线程即会自动销毁。
注:可能有的同学会想线程池使用后需要销毁吗?在这里补充一下,如果我们是作为局部变量创建出来的线程池(如:在执行的方法内使用
Executors.newFixedThreadPool(10)
创建临时的线程池),这种情况我们用完就必须将它立即销毁,否则主线程就会一直处于运行状态。如果是全局配置的线程池,那么就是为整个系统中诸多业务提供使用的,这种就不需要对线程池做销毁,因为一旦销毁了其他的任务就无法继续使用该线程池执行任务。
销毁线程池主要有两种方式:
shutdown()
:此方法对线程池做销毁,线程池会优先将剩余未完成的任务执行完才会执行销毁任务。shutdownNow()
:此方法会对线程池做立即销毁,无论线程池中的任务是否执行完成。
拒绝策略
通常我们在配置好有限队列大小后,就会有可能出现队列占满的情况,这时候我们的拒绝策略就会起到作用,接下来我们就来分析一下RejectedExecutionHandler
接口具体有哪一些实现方式:
AbortPolicy:线程池的默认拒绝策略,在JDK提供的
ThreadPoolExecutor
线程池中有一个默认线程池变量private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
作为默认拒绝策略,查看如下图源码可知它就是直接抛出RejectedExecutionException
异常,并且会直接丢弃当前任务,如果担心异常影响后续任务执行开发人员需自行捕获异常处理。
CallerRunsPolicy:只要在当线程池未被销毁的情况下,不丢弃任务直接使用主线程(调用线程池执行的线程)执行该任务。因为该策略是由主线程直接执行任务的,所以不建议在并发度高的情况下使用,建议在并发度较低且任务不允许失败的情况下才使用此策略。
DiscardPolicy:直接丢弃当前任务,不做任何处理。直接丢弃任务的情况下,开发人员也无法排查到哪些任务被丢弃掉,一般不建议使用,除非是无关紧要的任务即使丢弃也无所谓的。
DiscardOldestPolicy:在线程池未被销毁的情况下,丢弃最早进入队列的一个任务(即最久未执行的任务),然后再重新将此任务加入线程池,在此策略下需注意被丢弃的任务的重要性,如果任务不重要可直接丢弃。
自定义策略:在以上JDK提供的四种默认拒绝策略之外,我们还可以通过自定义的方式来处理被拒绝的任务。如果担心任务被拒绝或者被丢弃造成不可预估的问题,在时效性没有太大要求的情况下我们可以先将任务内容转换成数据入库做好日志记录,后续可以使用定时任务或者通过MQ消息延迟处理。由以上的线程池配置Demo中的拒绝策略改造伪代码如下:
private RejectedExecutionHandler getRejectedExecutionHandler() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 伪代码
log.info("===> 可根据任务的重要性区分对待,将任务做转换入库延迟处理 ......");
}
};
}
来源:https://juejin.cn/post/7107520546983641119