Java ThreadPoolExecutor 线程池的使用介绍

作者:sc_ik 时间:2021-06-28 12:40:35 

Executors

Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池.

Java ThreadPoolExecutor 线程池的使用介绍

从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个:

  • newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待.

  • newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60秒), 若无可回收,则新建线程.

  • newSingleThreadExecutor(): 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行. 如果某一个任务执行出错, 将有另一个线程来继续执行.

  • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池, 多数情况下可用来替代Timer类.

Executors 例子

newCachedThreadPool

线程最大数为 Integer.MAX_VALUE, 当我们往线程池添加了 n 个任务, 这 n 个任务都是一起执行的.


 ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
   cachedThreadPool.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           Thread.currentThread().sleep(1000);
           System.out.println(Thread.currentThread().getName());
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

cachedThreadPool.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           Thread.currentThread().sleep(1000);
           System.out.println(Thread.currentThread().getName());
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

newFixedThreadPool


 ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
   cachedThreadPool.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           Thread.currentThread().sleep(1000);
           System.out.println(Thread.currentThread().getName());
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

cachedThreadPool.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           Thread.currentThread().sleep(1000);
           System.out.println(Thread.currentThread().getName());
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

newScheduledThreadPool

三秒执行一次, 只有执行完这一次后, 才会执行.


ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
   scheduledExecutorService.schedule(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           Thread.currentThread().sleep(2000);
           System.out.println(Thread.currentThread().getName());
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

顺序执行各个任务, 第一个任务执行完, 才会执行下一个.


ExecutorService executorService = Executors.newSingleThreadExecutor();
   executorService.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           System.out.println(Thread.currentThread().getName());
           Thread.currentThread().sleep(10000);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

executorService.execute(new Runnable() {
     @Override
     public void run() {
       for (;;) {
         try {
           System.out.println(Thread.currentThread().getName());
           Thread.currentThread().sleep(2);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
     }
   });

Executors存在什么问题

Java ThreadPoolExecutor 线程池的使用介绍

在阿里巴巴Java开发手册中提到,使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出),但是并没有说明为什么,那么接下来我们就来看一下到底为什么不允许使用Executors?

我们先来一个简单的例子,模拟一下使用Executors导致OOM的情况.


/**
* @author Hollis
*/
public class ExecutorsDemo {
 private static ExecutorService executor = Executors.newFixedThreadPool(15);
 public static void main(String[] args) {
   for (int i = 0; i < Integer.MAX_VALUE; i++) {
     executor.execute(new SubThread());
   }
 }
}

class SubThread implements Runnable {
 @Override
 public void run() {
   try {
     Thread.sleep(10000);
   } catch (InterruptedException e) {
     //do nothing
   }
 }
}

通过指定JVM参数:-Xmx8m -Xms8m 运行以上代码,会抛出OOM:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代码指出,ExecutorsDemo.java 的第16行,就是代码中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有两种实现, 分别是 ArrayBlockingQueueLinkedBlockingQueue.

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列, 必须设置容量.


public ArrayBlockingQueue(int capacity, boolean fair) {
 if (capacity <= 0)
   throw new IllegalArgumentException();
 this.items = new Object[capacity];
 lock = new ReentrantLock(fair);
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
}

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列, 容量可以选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 Integer.MAX_VALUE.


public LinkedBlockingQueue() {
 this(Integer.MAX_VALUE);
}

这里的问题就出在如果我们不设置 LinkedBlockingQueue 的容量的话, 其默认容量将会是 Integer.MAX_VALUE.

newFixedThreadPool 中创建 LinkedBlockingQueue 时, 并未指定容量. 此时, LinkedBlockingQueue 就是一个无边界队列, 对于一个无边界队列来说, 是可以不断的向队列中加入任务的, 这种情况下就有可能因为任务过多而导致内存溢出问题.

newCachedThreadPoolnewScheduledThreadPool 这两种方式创建的最大线程数可能是Integer.MAX_VALUE, 而创建这么多线程, 必然就有可能导致OOM.

ThreadPoolExecutor 创建线程池

避免使用 Executors 创建线程池, 主要是避免使用其中的默认实现, 那么我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池. 在创建的同时, 给 BlockQueue 指定容量就可以了.


ExecutorService executor = new ThreadPoolExecutor(10, 10,
   60L, TimeUnit.SECONDS,
   new ArrayBlockingQueue(10));

这种情况下, 一旦提交的线程数超过当前可用线程数时, 就会抛出 java.util.concurrent.RejectedExecutionException, 这是因为当前线程池使用的队列是有边界队列, 队列已经满了便无法继续处理新的请求.

除了自己定义 ThreadPoolExecutor 外. 还有其他方法. 如apache和guava等.

四个构造函数


 public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue)

public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory)

public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
              RejectedExecutionHandler handler)

int corePoolSize => 该线程池中核心线程数最大值

线程池新建线程的时候,如果当前线程总数小于corePoolSize, 则新建的是核心线程, 如果超过corePoolSize, 则新建的是非核心线程

核心线程默认情况下会一直存活在线程池中, 即使这个核心线程啥也不干(闲置状态).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:


TimeUnit.DAYS;        //天
TimeUnit.HOURS;       //小时
TimeUnit.MINUTES;      //分钟
TimeUnit.SECONDS;      //秒
TimeUnit.MILLISECONDS;   //毫秒
TimeUnit.MICROSECONDS;   //微妙
TimeUnit.NANOSECONDS;    //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

来源:https://segmentfault.com/a/1190000018758205

标签:Java,ThreadPoolExecutor
0
投稿

猜你喜欢

  • Spring Boot缓存实战之Redis 设置有效时间和自动刷新缓存功能(时间支持在配置文件中配置)

    2023-11-11 01:57:18
  • 最优雅地整合 Spring & Spring MVC & MyBatis 搭建 Java 企业级应用(附源码)

    2023-09-26 18:27:57
  • Android高手进阶教程(二十六)之---Android超仿Path菜单的功能实现!

    2022-05-13 01:16:58
  • SpringBoot配置和切换Tomcat流程详解

    2022-07-13 02:35:51
  • Java Spring5学习之JdbcTemplate详解

    2023-11-25 20:17:23
  • C#实现IP摄像头的方法

    2023-12-09 03:42:51
  • 基于SpringBoot生成二维码的几种实现方式

    2022-02-27 16:24:31
  • MyBatis 多个条件使用Map传递参数进行批量删除方式

    2023-11-29 08:09:01
  • SpringBoot登录用户权限拦截器

    2022-07-15 04:18:04
  • android中Activity横竖屏切换的那些事

    2023-04-02 22:32:33
  • Android实现图片自动轮播并且支持手势左右无限滑动

    2021-11-04 22:56:46
  • Mybatis-Plus自动填充更新操作相关字段的实现

    2023-06-04 22:37:12
  • 关于MyBatis模糊查询的几种实现方式

    2023-05-09 04:23:12
  • 基于C#的socket编程的TCP异步的实现代码

    2023-04-13 06:42:05
  • Java map 优雅的元素遍历方式说明

    2022-11-12 16:57:28
  • Spring session 获取当前账户登录数的实例代码

    2022-10-17 10:02:05
  • Android开发之设置开机自动启动的几种方法

    2021-12-04 20:35:37
  • 关于MD5算法原理与常用实现方式

    2023-03-18 11:09:04
  • 使用spring框架中的组件发送邮件功能说明

    2022-12-29 03:53:55
  • C#实现的字符串相似度对比类

    2023-08-08 20:35:10
  • asp之家 软件编程 m.aspxhome.com