实现java简单的线程池

作者:小胖java攻城狮 时间:2023-08-09 06:05:15 

拆分实现流程

请看下面这张图

实现java简单的线程池

首先我们得对线程池进行一个功能拆分

  • Thread Pool 就是我们的线程池,t1,t2,t3代表三个线程

  • Blocking Queue代表阻塞队列

  • main代表main方法的线程

  • task1,task2,task3代表要执行的每个任务

现在我们梳理一下执行的流程,注意这里是简略版的,文章后面我会给出详细版的

实现java简单的线程池

所以此时,我们发现了需要创建几个类,或者说几个角色,分别是

  • 线程池

  • 工作线程

  • 阻塞队列

  • 拒绝策略(干嘛的?就是当线程数已经满了,并且阻塞队列也满了,还有任务想进入阻塞队列的时候,就可以拒绝这个任务)

实现方式

1.拒绝策略


/**
* 拒绝策略
*/
@FunctionalInterface
interface RejectPolicy<T>{
//queue就是我们自己实现的阻塞队列,task是任务
   void reject(BlockingQueue<T> queue,T task);
}

2.阻塞队列

我们需要实现四个方法,获取和添加,超时获取和超时添加,至于方法实现的细节,我都备注了大量的注释进行解释。


/**
* 阻塞队列
*/
class BlockingQueue<T>{
   //阻塞队列
   private Deque<T> queue = new ArrayDeque<>();

//锁
   private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
   private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
   private Condition emptyWaitSet = lock.newCondition();

//容量
   private int capacity;

public BlockingQueue(int capacity){
       this.capacity = capacity;
   }

//带有超时阻塞获取
   public T poll(long timeout, TimeUnit timeUnit){
       lock.lock();
       try {
           //将timeout统一转换为纳秒
           long nanos = timeUnit.toNanos(timeout);
           while(queue.isEmpty()){
               try {
                   if(nanos <= 0){
                       //小于0,说明上次没有获取到,代表已经超时了
                       return null;
                   }
                   //返回值是剩余的时间
                   nanos = emptyWaitSet.awaitNanos(nanos);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           T t = queue.removeFirst();
           //通知生产者
           fullWaitSet.signal();
           return t;
       }finally {
           lock.unlock();
       }
   }

//阻塞获取
   public T take(){
       lock.lock();
       try{
           while(queue.isEmpty()){ //如果任务队列为空,代表线程池没有可以执行的内容
               try {
                    /*
                   也就说此时进来的线程是执行不了任务的,所以此时emptyWaitSet消费者要进行阻塞状态
                   等待下一次唤醒,然后继续判断队列是否为空
                    */
                   emptyWaitSet.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           /*
           代码执行到这里。说明任务队列不为空,线程池就从任务队列拿出一个任务出来执行
           也就是说把阻塞队列的一个任务出队
            */
           T t = queue.removeFirst();
           /*
           然后唤醒之前存放在生成者Condition休息室,因为由于之前阻塞队列已满,fullWaitSet才会进入阻塞状态
           所以当阻塞队列删除了任务,就要唤醒之前进入阻塞状态的fullWaitSet
            */
           fullWaitSet.signal();
           //返回任务
           return t;
       }finally {
           lock.unlock();
       }
   }

//阻塞添加
   public void put(T task){
       lock.lock();
       try {
           while(queue.size() == capacity){    //任务队列满了
               try {
                   System.out.println("等待加入任务队列"+task);
                   /*
                   也就说此时进来的任务是进不了阻塞队列的,已经满了,所以此时生产者Condition要进入阻塞状态
                   等待下一次唤醒,然后继续判断队列是否为空
                    */
                   fullWaitSet.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           //任务队列还未满
           System.out.println("加入任务队列"+task);
           //把任务加入阻塞队列
           queue.addLast(task);
           /*
           然后唤醒之前存放在消费者Condition休息室,因为由于之前阻塞队列为空,emptyWaitSet才会进入阻塞状态
           所以当阻塞队列加入了任务,就要唤醒之前进入阻塞状态的emptyWaitSet
            */
           emptyWaitSet.signal();
       }finally {
           lock.unlock();
       }
   }

//带超时阻塞时间添加
   public boolean offer(T task,long timeout,TimeUnit timeUnit){
       lock.lock();
       try {
           long nanos = timeUnit.toNanos(timeout);
           while(queue.size() == capacity){
               try {
                   if(nanos < 0){
                       return false;
                   }
                   System.out.println("等待加入任务队列"+task);
                   //不会一直阻塞,超时就会继续向下执行
                   nanos = fullWaitSet.awaitNanos(nanos);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           System.out.println("加入任务队列"+task);
           queue.addLast(task);
           emptyWaitSet.signal();
           return true;
       }finally {
           lock.unlock();
       }
   }

//获取任务数量
   public int size(){
       lock.lock();
       try{
           return queue.size();
       }finally {
           lock.unlock();
       }
   }

//尝试添加任务,如果阻塞队列已经满了,就使用拒绝策略
   public void tryPut(RejectPolicy<T> rejectPolicy, T task){
       lock.lock();
       try {
           //判断队列是否已满
           if(queue.size() == capacity){
               rejectPolicy.reject(this,task);
           }else{  //有空闲
               System.out.println("加入任务队列"+task);
               queue.addLast(task);
               emptyWaitSet.signal();
           }
       }finally {
           lock.unlock();
       }
   }
}

3.线程池和工作线程

我把工作线程当成线程池的内部类去实现。方便调用变量。


/**
* 线程池
*/
class ThreadPool{
   //阻塞队列
   private BlockingQueue<Runnable> taskQueue;

//线程集合
   private HashSet<Worker> workers = new HashSet<>();

//核心线程数
   private int coreSize;

//获取任务的超时时间
   private long timeout;

private TimeUnit timeUnit;

private RejectPolicy<Runnable> rejectPolicy;

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
       this.coreSize = coreSize;
       this.timeout = timeout;
       this.timeUnit = timeUnit;
       this.taskQueue = new BlockingQueue<>(queueCapacity);
       this.rejectPolicy = rejectPolicy;
   }

//执行任务
   public void execute(Runnable task){
       synchronized (workers){
           if(workers.size() <= coreSize){  //当前的线程数小于核心线程数
               Worker worker = new Worker(task);
               workers.add(worker);
               //让线程开始工作,执行它的run方法
               worker.start();
           }else{
               // 1) 死等
               // 2) 带超时等待
               // 3) 让调用者放弃任务执行
               // 4) 让调用者抛出异常
               // 5) 让调用者自己执行任务
               taskQueue.tryPut(rejectPolicy,task);
           }
       }
   }

/**
    * 工作线程,也就是线程池里面的线程
    */
   class Worker extends Thread{
       private Runnable task;
       public Worker(Runnable task){
           this.task = task;
       }

@Override
       public void run() {
           //执行任务
           // 1) 当 task 不为空,执行任务
           // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
           while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
               try {
                   System.out.println("正在执行的任务" + task);
                   task.run();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
                   //代表这个任务已经执行完了
                   task = null;
               }
           }
           synchronized (workers) {
               System.out.println("worker 被移除" + this);
               workers.remove(this);
           }
       }
   }
}

策略模式

细心的小伙伴已经发现,我在拒绝策略这里使用了23种设计模式的策略模式,因为我没有将拒绝的方式写死,而是交给了调用者去实现。

对比JDK的线程池

下面是JDK自带的线程池

实现java简单的线程池

经典的七大核心参数

  • corePoolSize:核心线程数

  • queueCapacity:任务队列容量(阻塞队列)

  • maxPoolSize:最大线程数

  • keepAliveTime:线程空闲时间

  • TimeUnit unit:超时时间单位

  • ThreadFactory threadFactory:线程工程

  • rejectedExecutionHandler:任务拒绝处理器

实际上我们自己实现的也大同小异,只不过JDK官方的更为复杂。

JDK线程执行的流程图

实现java简单的线程池

实现java简单的线程池

线程池的状态转化

线程我们知道在操作系统层面有5种状态

实现java简单的线程池

  • 初始状态:仅是在语言层面创建了线程对象,还未与操作系统线程关联

  • 可运行状态(就绪状态):指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行

  • 运行状态:指获取了 CPU 时间片运行中的状态,当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换

  • 阻塞状态

  • 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】

  • 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】

  • 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们

  • 终止状态:表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

线程在Java API层面有6种状态

实现java简单的线程池

  • NEW 线程刚被创建,但是还没有调用 start() 方法

  • RUNNABLE 当调用了 start() 方法之后,注意,Java API 层面的

  • RUNNABLE 状态涵盖了 操作系统 层面的【可运行状态】、【运行状态】

  • BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分

  • TERMINATED 当线程代码运行结束

线程池有5种状态

  • RUNNING:能接受新任务,并处理阻塞队列中的任务

  • SHUTDOWN:不接受新任务,但是可以处理阻塞队列中的任务

  • STOP:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接不干了!

  • TIDYING:所有任务都终止,并且工作线程也为0,处于关闭之前的状态

  • TERMINATED:已关闭。

实现java简单的线程池

来源:https://blog.csdn.net/qq_45798556/article/details/118703927

标签:java,线程池
0
投稿

猜你喜欢

  • .net使用Aspose.Words进行Word替换操作的实现代码

    2023-03-23 21:28:59
  • Java 执行CMD命令或执行BAT批处理方式

    2022-10-15 03:57:38
  • 使用Spring Expression Language (SpEL)全面解析表达式

    2021-11-19 08:07:10
  • NancyFx框架检测任务管理器详解

    2023-02-18 13:10:53
  • Android studio开发实现计算器功能

    2022-02-12 19:43:05
  • Intellij IDEA 2017新特性之Spring Boot相关特征介绍

    2023-06-22 15:13:02
  • 六款值得推荐的android(安卓)开源框架简介

    2023-06-24 01:46:54
  • Android串口通信之串口读写实例

    2023-11-04 00:11:03
  • C#实现窗口之间的传值

    2022-05-26 08:28:14
  • Android自定义View实现绘制虚线的方法详解

    2022-06-24 01:18:10
  • C#控制台实现飞行棋小游戏

    2023-04-27 05:13:43
  • 解决使用RestTemplate时报错RestClientException的问题

    2023-05-27 19:46:36
  • Java实战之医院管理系统的实现

    2022-04-13 17:39:27
  • C# ConfigHelper 辅助类介绍

    2023-11-20 21:53:09
  • C++类中的六大默认成员函数详解

    2022-09-18 01:33:01
  • Android自定义view仿淘宝快递物流信息时间轴

    2022-02-20 18:51:16
  • Android实现在一个activity中添加多个listview的方法

    2023-10-13 14:41:22
  • java读取文件字符集示例方法

    2023-11-09 12:35:39
  • Mybatis中Mapper映射文件使用详解

    2023-02-13 17:57:07
  • OpenCV实现直线检测并消除

    2023-07-12 20:44:36
  • asp之家 软件编程 m.aspxhome.com