Java线程池ThreadPoolExecutor源码深入分析

作者:niuyongzhi 时间:2023-11-09 19:49:36 

1.线程池Executors的简单使用

1)创建一个线程的线程池。
Executors.newSingleThreadExecutor();
//创建的源码
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
2)创建固定大小的线程池,参数为int,是线程池核心线程和最大线程的数量
Executors.newFixedThreadPool(2);
 //创建的源码
  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
3)创建一个线程数不设限的线程池,
 //创建的源码,核心线程是0,最大线程是Integer.MAX_VALUE
  Executors.newCachedThreadPool();
      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
  }

使用方法,使用同步代码块,保证线程池实例是唯一的。

使用方法:
 private static ExecutorService sSingleThreadExecutor = null; // lazy, guarded by class
    public static ExecutorService singleThreadExecutor() {
       //当前的类对象为锁
        synchronized (ThreadPool.class) {
            if (sSingleThreadExecutor == null) {
                sSingleThreadExecutor = Executors.newSingleThreadExecutor();
            }
            return sSingleThreadExecutor;
        }
  }

通过以上三种方式,可以创建一个简单的线程池。

但是有弊端:

newSingleThreadExecutor和newFixedThreadPool,运行的请求队列是长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而造成oom。

而newCachedThreadPool允许的线程数量为最大值Integer.MAX_VALUE,也会造成oom。

2.通过ThreadPoolExecutor创建线程池

下面是OkHttp中Dispatcher.java线程池:

ExecutorService executorService;
public synchronized ExecutorService executorService() {
   if (executorService == null) {
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;
}

OkHttp中ConnectionPool.java

private static final Executor executor = new ThreadPoolExecutor(0 ,
      Integer.MAX_VALUE , 60L , TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

使用方式:

//call 实现 Runnable 接口。调用execute方法即可将入线程池,执行run方法中的代码。
executorService().execute(call);

3.ThreadPoolExecutor各个参数的含义

corePoolSize:核心线程数,即使是空闲线程也不会销毁。这样做的目的是为了降低执行任务时创建线程的时间和性能开销。
maximumPoolSize:最大线程数。当核心线程被用完时,会创建新的线程来执行任务,但是创建的数量不能超过这个最大值。
keepAliveTime:线程的存活时间。除核心线程外,其他线程一旦执行完任务,就会处于空闲状态,超过这个时间就会被销毁。
unit:keepAliveTime设置的时间单位。
workQueue:任务的阻塞队列。线程数量有限,当任务过多来不及执行时,就会加入到这个阻塞队列中,等到有空闲进程,
           就会从这个队列取出任务去执行。队列都是先进先出的FIFO。
threadFactory:新线程产生的方式。
handler:拒绝策略,超过任务队列设置的最大值时。再有新的任务进来,就会执行这个拒绝策略。

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler) {
}

线程池的阻塞队列:

ArrayBlockingQueue:
    是基于数组的任务队列。里面用一个数组来存放任务。当我们new的时候,需要指定数组大小。
    还有两个int变量putIndex和takeIndex用来表示队列的头部和尾部在数组中的位置。
LinkedBlockingQueue:
    是基于链表的,内部用一个单向链表来存放任务。创建时可以指定大小,如果不指定则是Integer.MAX_VALUE
PriorityBlockingQueue:
   基于优先级的阻塞队列。
SynchronousQueue:
   一种无缓冲的等待队列。有新任务进来直接交给线程执行。
   OkHttp中使用的就是这种队列,他的最大线程数为Integer.MAX_VALUE。保证有任务进来就能马上执行。

RejectedExecutionHandler拒绝策略,这是一个接口。不同的实现执行不同的策略。

public interface RejectedExecutionHandler {
   void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
AbortPolicy:拒绝行为直接抛出异常 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() +
                                                " rejected from " +
                                                e.toString());
       }
DiscardPolicy:保持静默,什么也不做。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       }
DiscardOldestPolicy:丢弃任务队里中最老的任务,尝试将新任务加入队列
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               e.getQueue().poll();
               e.execute(r);
           }
}
CallerRunsPolicy:直接由提交任务这执行这个任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               r.run();
           }
}
如果在创建线程池的时候,不知道具体的拒绝策略。那么ThreadPoolExecutor默认的策略是AbortPolicy。
private static final RejectedExecutionHandler defaultHandler =  new AbortPolicy();

线程池可以执行两种类型的任务:Runable和Callable

class MyRunable implements Runnable{
       @Override
       public void run() {
       }
}
class MyCallable implements Callable{
   @Override
   public Object call() throws Exception {
       return null;
   }
}
Runnable 没有返回值,返回的是void,不允许抛出异常。
Callable 有返回值,返回的是Object,允许抛出异常。

4.线程池的源码分析

线程池的状态:

//运行状态,可以接受新任务,并且处理排队任务。
private static final int RUNNING    = -1 << COUNT_BITS;
//关闭状态,不再接受新任务,不过仍然会处理排队任务。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止状态,不再接受新任务,也不处理排队任务,同时中断处理中的任务
private static final int STOP       =  1 << COUNT_BITS;
//整理状态,当前所有任务终止,workerCount计数为0,线程切换为TIDYING状态,并且执行terminal()方法
private static final int TIDYING    =  2 << COUNT_BITS;
//终止状态,说明terminal()方法执行完成。
private static final int TERMINATED =  3 << COUNT_BITS;

ctlof是得到新的ctl值。通过ctl可以计算线程池的状态和数量

runStateOf 计算当前线程池的状态。

workerCountOf计算线程池的数量。

// ctlOf计算ctl的新值,也就是线程池状态和线程池中线程数量。
private static int ctlOf(int rs, int wc) { return rs | wc; }
//获取ctl的高三位,也就是线程池的状态。
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取ctl的低29位,也就是线程池中的线程数。
private static int workerCountOf(int c)  { return c & CAPACITY; }
其中runStateOf(int c)和workerCountOf(int c)的参数c就是通过ctlOf(int rs, int wc)获得的ctl值。

向线程池中添加一个任务:executorService().execute(call);

然后看看源码中是如何执行的,是如何添加任务的。

ctl 用来表示线程池的状态和线程数量,
在ThreadPoolExcutor中使用32位二进制数来表示线程池的状态和线程中线程数量。
其中前3位表示线程池的状态,后29位表示线程池中的线程数。
public void execute(Runnable command) {
        int c = ctl.get();
       //如果工作线程数量小于核心线程数,
       //提交的任务会通过addWorker(command, true)创建一个新的核心线程来执行, 这个参数传的是true,表示去新增核心线程。
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true)){
               //添加成功则return
               return;
           }
          //添加核心线程失败则重新获取线程池的状态和数量
           c = ctl.get();
       }
       //进入到下面说明当前工作线程大于或等于核心线程。
       //如果线程池处于运行状态,则加入队列
       if (isRunning(c) && workQueue.offer(command)) {
           //如果入队成功,则重新获取线程池的状态
           int recheck = ctl.get();
           //如果线程池不处于运行状态,则从队列中remove
           if (!isRunning(recheck) && remove(command)){
             //成功删除,则执行拒绝策略
             reject(command);
           }else if (workerCountOf(recheck) == 0){
            //进入这个分支有两种情况1.线程池处于运行状态 2.线程从不处于运行状态,但是remove失败
               则会判断workerCountOf如果工作线程为0,则会创建非核心线程去执行任务。
             addWorker为null,和false。false表示非核心线程。null说明创建的线程去执行队列里的任务。
             addWorker(null, false);
           }
        //进入到这个分支有两种情况1.线程池处于非运行状态2.运行状态但是入队失败了。
        这时候创建非核心线程去执行任务
       }else if (!addWorker(command, false)){
           如果创建非核心线程失败了,则执行拒绝策略。
          reject(command);
       }
}

通过以上源码分析,线程池的运行原理可以总结为一下几点:

1.通过execute方法提交任务时,运行线程小于corePoolSize时,则会创建新的核心线程来执行这个任务。

2.通过excute方法提交任务时,运行线程大于等于corePoolSize时,则会加入到队列中,等待线程调度执行。

3.通过excuete方法提交任务时,运行线程大于等于corePoolSize时,并且加入队列失败(队列满了),新提交的任务将会通过创建新的线程执行。

4.通过excute方法提交任务时,运行线程大于maximumPoolSize时,队列也满了,则会执行拒绝策略。

5.当线程池中的线程执行完任务处于空闲状态时,则会尝试从任务队列中取头结点任务执行。

接下来看addWorker如何添加任务。

private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

// 如果线程池处于非运行状态,则不会创建线程。
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty())){
                       return false;
                  }
           //如果线程池处于运行状态,则直接走下面的创建添加逻辑。
           for (;;) {
               //获取工作线程数量
               int wc = workerCountOf(c);
               //wc >= CAPACITY 工作线程大于最大容量
               // wc >= (core ? corePoolSize : maximumPoolSize) 如果工作线程大于了核心线程或最大线程,
               //只要这两个条件有一个成立则return。
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize)){
                   return false;
                }
               //创建线程数量+1,这里用到了CAS。关于CAS后面再写文章分析。
               if (compareAndIncrementWorkerCount(c)){
                   break retry;
                }
                //如果CAS操作失败,线程数量没有加1,则重新获取线程的状态。
               c = ctl.get();  // Re-read ctl
               //判断当前状态和之前状态,如果不同,说明线程池状态发生了变化。重新跳到retry的外层循环。
               //如果相同,则说明线程池没有变化,继续进行内层循环。
               if (runStateOf(c) != rs){
                   continue retry;
               }
               // else CAS failed due to workerCount change; retry inner loop
           }
       }
       //执行到这说明线程数量已经完成+1,接下来进行线程的创建。
       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           //这个创建一个worker对象。在worker构造方法中,会利用ThreadPoolExecutor中传递过了的ThreadFactory创建一个Thread
           //默认是通过Executors.defaultThreadFactory(),创建一个线程。
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
               //拿到一个重入锁对象。
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock();
               try {
                   //拿到线程池的状态
                   int rs = runStateOf(ctl.get());
                   //如果线程池处于运行状态或者处于关闭状态并且firstTask == null
                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) {
                           throw new IllegalThreadStateException();
                       }
                       //添加到work集合
                       workers.add(w);
                       int s = workers.size();
                       if (s > largestPoolSize){ //更新一下最大线程数
                           largestPoolSize = s;
                       }
                       //标志位,添加成功
                       workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();
               }
               if (workerAdded) {
                   //添加成功则启动线程
                   t.start();
                   //启动成功
                   workerStarted = true;
               }
           }
       } finally {
           //如果没有启动成功则从线程池中移除。
           if (! workerStarted){
               addWorkerFailed(w);
           }
       }
       return workerStarted;
   }

关键代码看看 w = new Worker(firstTask);做了啥

Worker(Runnable firstTask) {
       setState(-1);
       //将传进来的任务赋值给成员变量
       this.firstTask = firstTask;
       //创建一个线程,并把Worker本身当做Runnable传进了Thread中去。
       this.thread = getThreadFactory().newThread(this);
}
public interface ThreadFactory {
    Thread newThread(Runnable r);
}

注意newThread(this)。Worker把自己当做Runnable传到了线程中去。当调用t.start()方法时会调用Worker的run方法。

public void run() {
   runWorker(this);
}
final void runWorker(Worker w) {
   Thread wt = Thread.currentThread();
   Runnable task = w.firstTask;
   w.firstTask = null;
   w.unlock(); // allow interrupts
   boolean completedAbruptly = true;
   try {
       //如果task不为null,则先执行当前任务
       //如果task传进来是null则从队列中取任务,执行队列里的任务。
       //getTask()就是从任务队列中提取在等待的队伍。
       while (task != null || (task = getTask()) != null) {
           w.lock();
           //(runStateAtLeast(ctl.get(), STOP) 线程池处于STOP,TIDYING,TERMINATED状态
            处于这些状态的线程池是无法执行任务的。
         if ((runStateAtLeast(ctl.get(), STOP) ||
              (Thread.interrupted() &&
               runStateAtLeast(ctl.get(), STOP))) &&
             !wt.isInterrupted()){
                //中断线程
                 wt.interrupt();
             }
         //执行到下面说明线程池处于RUNNING或SHUTDOWN状态
         //由此也可以看出SHUTDOWN状态的线程池,是可以执行队列里的任务的,但是队列不在接收新的任务添加
           try {
               beforeExecute(wt, task);
               Throwable thrown = null;
               try {
                   //执行任务的
                   task.run();
               } catch (RuntimeException x) {
                   thrown = x; throw x;
               } catch (Error x) {
                   thrown = x; throw x;
               } catch (Throwable x) {
                   thrown = x; throw new Error(x);
               } finally {
                   afterExecute(task, thrown);
               }
           } finally {
               task = null;
               w.completedTasks++;
               w.unlock();
           }
       }
       completedAbruptly = false;
   } finally {
       processWorkerExit(w, completedAbruptly);
   }
}

getTask()从任务队列中,提取任务。

private Runnable getTask() {
  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
     try {
           //从任务队列中取出任务
            Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
 }

通过以上源码分析,可以总结一下几点。

addWorker(Runnable firstTask, boolean core)

1.如果firstTask为null,则会创建线程去执行队列里的任务。

2.如果不为null,则会去执行当前任务,然后再执行队列里的任务。

3.core 如果为true,则会创建核心线程,如果为false,则会创建非核心线程。

4.addWorker 会创建线程,启动线程,执行任务。

在创建线程之前会判断线程池的状态、以及核心线程或最大线程数。

如果创建成功启动线程的start方法,然后调用worker的runWorker()方法。

来源:https://blog.csdn.net/niuyongzhi/article/details/126460731

标签:Java,ThreadPoolExecutor,线程池
0
投稿

猜你喜欢

  • C# WORD操作实现代码

    2023-11-20 05:16:25
  • 详解mybatis-plus配置找不到Mapper接口路径的坑

    2022-03-12 10:37:45
  • 桌面浮动窗口(类似恶意广告)的实现详解

    2023-04-28 06:02:27
  • 一篇文章带你搞定JAVA反射

    2023-09-18 02:27:51
  • android ToolBar的简单使用

    2023-03-05 10:44:33
  • java实现文件拷贝的七种方式

    2023-07-20 19:01:41
  • 基于java集合中的一些易混淆的知识点(详解)

    2023-08-29 03:06:26
  • AndroidStudio项目打包成jar的简单方法

    2023-07-07 05:33:27
  • 详解Android Flutter中SliverAppBar的使用教程

    2023-06-23 12:11:27
  • Hutool Java工具类库_ExcelUtil的使用

    2022-09-08 07:30:28
  • java实现1M图片压缩优化到100kb实现示例

    2022-08-08 03:59:43
  • java实现抽奖功能解析

    2021-08-29 16:08:21
  • Flutter源码分析之自定义控件(RenderBox)指南

    2022-09-08 21:17:28
  • Android Spinner和GridView组件的使用示例

    2022-07-01 15:03:40
  • Android账号注册实现点击获取验证码倒计时效果

    2023-05-18 05:46:33
  • C#实现IP摄像头的方法

    2023-12-09 03:42:51
  • Java IO流—异常及捕获异常处理 try…catch…finally

    2023-03-14 07:35:52
  • Java类和对象的设计原理

    2023-10-06 07:00:32
  • c++函数转c#函数示例程序分享

    2023-08-13 14:45:26
  • Android实现可点击展开的TextView

    2022-04-02 04:58:01
  • asp之家 软件编程 m.aspxhome.com