Netty分布式NioEventLoop任务队列执行源码分析

作者:向南是个万人迷 时间:2022-10-08 04:07:19 

前文传送门:NioEventLoop处理IO事件

执行任务队列

继续回到NioEventLoop的run()方法:

protected void run() {
   for (;;) {
       try {
           switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
               case SelectStrategy.CONTINUE:
                   continue;
               case SelectStrategy.SELECT:
                   //轮询io事件(1)
                   select(wakenUp.getAndSet(false));
                   if (wakenUp.get()) {
                       selector.wakeup();
                   }
               default:
           }
           cancelledKeys = 0;
           needsToSelectAgain = false;
           //默认是50
           final int ioRatio = this.ioRatio;
           if (ioRatio == 100) {
               try {
                   processSelectedKeys();
               } finally {
                   runAllTasks();
               }
           } else {
               //记录下开始时间
               final long ioStartTime = System.nanoTime();
               try {
                   //处理轮询到的key(2)
                   processSelectedKeys();
               } finally {
                   //计算耗时
                   final long ioTime = System.nanoTime() - ioStartTime;
                   //执行task(3)
                   runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
               }
           }
       } catch (Throwable t) {
           handleLoopException(t);
       }
       //代码省略
   }
}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {
   //定时任务队列中聚合任务
   fetchFromScheduledTaskQueue();
   //从普通taskQ里面拿一个任务
   Runnable task = pollTask();
   //task为空, 则直接返回
   if (task == null) {
       //跑完所有的任务执行收尾的操作
       afterRunningAllTasks();
       return false;
   }
   //如果队列不为空
   //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
   final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
   long runTasks = 0;
   long lastExecutionTime;
   //执行每一个任务
   for (;;) {
       safeExecute(task);
       //标记当前跑完的任务
       runTasks ++;
       //当跑完64个任务的时候, 会计算一下当前时间
       if ((runTasks & 0x3F) == 0) {
           //定时任务初始化到当前的时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           //如果超过截止时间则不执行(nanoTime()是耗时的)
           if (lastExecutionTime >= deadline) {
               break;
           }
       }
       //如果没有超过这个时间, 则继续从普通任务队列拿任务
       task = pollTask();
       //直到没有任务执行
       if (task == null) {
           //记录下最后执行时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           break;
       }
   }
   //收尾工作
   afterRunningAllTasks();
   this.lastExecutionTime = lastExecutionTime;
   return true;
}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {
   long nanoTime = AbstractScheduledEventExecutor.nanoTime();
   //从定时任务队列中抓取第一个定时任务
   //寻找截止时间为nanoTime的任务
   Runnable scheduledTask  = pollScheduledTask(nanoTime);
   //如果该定时任务队列不为空, 则塞到普通任务队列里面
   while (scheduledTask != null) {
       //如果添加到普通任务队列过程中失败
       if (!taskQueue.offer(scheduledTask)) {
           //则重新添加到定时任务队列中
           scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
           return false;
       }
       //继续从定时任务队列中拉取任务
       //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
       scheduledTask = pollScheduledTask(nanoTime);
   }
   return true;
}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime()

 代表从定时任务初始化到现在过去了多长时间

 Runnable scheduledTask= pollScheduledTask(nanoTime) 

代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
   assert inEventLoop();
   //拿到定时任务队列
   Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
   //peek()方法拿到第一个任务
   ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
   if (scheduledTask == null) {
       return null;
   }
   if (scheduledTask.deadlineNanos() <= nanoTime) {
       //从队列中删除
       scheduledTaskQueue.remove();
       //返回该任务
       return scheduledTask;
   }
   return null;
}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
   long nanoTime = AbstractScheduledEventExecutor.nanoTime();
   //从定时任务队列中抓取第一个定时任务
   //寻找截止时间为nanoTime的任务
   Runnable scheduledTask  = pollScheduledTask(nanoTime);
   //如果该定时任务队列不为空, 则塞到普通任务队列里面
   while (scheduledTask != null) {
       //如果添加到普通任务队列过程中失败
       if (!taskQueue.offer(scheduledTask)) {
           //则重新添加到定时任务队列中
           scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
           return false;
       }
       //继续从定时任务队列中拉取任务
       //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
       scheduledTask = pollScheduledTask(nanoTime);
   }
   return true;
}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过

scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)

重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {
   //定时任务队列中聚合任务
   fetchFromScheduledTaskQueue();
   //从普通taskQ里面拿一个任务
   Runnable task = pollTask();
   //task为空, 则直接返回
   if (task == null) {
       //跑完所有的任务执行收尾的操作
       afterRunningAllTasks();
       return false;
   }
   //如果队列不为空
   //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
   final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
   long runTasks = 0;
   long lastExecutionTime;
   //执行每一个任务
   for (;;) {
       safeExecute(task);
       //标记当前跑完的任务
       runTasks ++;
       //当跑完64个任务的时候, 会计算一下当前时间
       if ((runTasks & 0x3F) == 0) {
           //定时任务初始化到当前的时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           //如果超过截止时间则不执行(nanoTime()是耗时的)
           if (lastExecutionTime >= deadline) {
               break;
           }
       }
       //如果没有超过这个时间, 则继续从普通任务队列拿任务
       task = pollTask();
       //直到没有任务执行
       if (task == null) {
           //记录下最后执行时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           break;
       }
   }
   //收尾工作
   afterRunningAllTasks();
   this.lastExecutionTime = lastExecutionTime;
   return true;
}

首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

任务不为空, 则通过

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 

计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task

我们跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {
   try {
       //直接调用run()方法执行
       task.run();
   } catch (Throwable t) {
       //发生异常不终止
       logger.warn("A task raised an exception. Task: {}", task, t);
   }
}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法

protected boolean runAllTasks(long timeoutNanos) {
   //定时任务队列中聚合任务
   fetchFromScheduledTaskQueue();
   //从普通taskQ里面拿一个任务
   Runnable task = pollTask();
   //task为空, 则直接返回
   if (task == null) {
       //跑完所有的任务执行收尾的操作
       afterRunningAllTasks();
       return false;
   }
   //如果队列不为空
   //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
   final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
   long runTasks = 0;
   long lastExecutionTime;
   //执行每一个任务
   for (;;) {
       safeExecute(task);
       //标记当前跑完的任务
       runTasks ++;
       //当跑完64个任务的时候, 会计算一下当前时间
       if ((runTasks & 0x3F) == 0) {
           //定时任务初始化到当前的时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           //如果超过截止时间则不执行(nanoTime()是耗时的)
           if (lastExecutionTime >= deadline) {
               break;
           }
       }
       //如果没有超过这个时间, 则继续从普通任务队列拿任务
       task = pollTask();
       //直到没有任务执行
       if (task == null) {
           //记录下最后执行时间
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           break;
       }
   }
   //收尾工作
   afterRunningAllTasks();
   this.lastExecutionTime = lastExecutionTime;
   return true;
}

每次执行完task, runTasks自增

这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

章节小结

本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

        1.  NioEventLoopGroup如何选择分配NioEventLoop

        2.  NioEventLoop如何开启

        3.  NioEventLoop如何进行select操作

        4.  NioEventLoop如何执行task

来源:https://www.cnblogs.com/xiangnan6122/p/10203169.html

标签:Netty,分布式,NioEventLoop,任务队列
0
投稿

猜你喜欢

  • java interface的两个经典用法

    2021-08-17 06:20:56
  • jvm crash的崩溃日志详细分析及注意点

    2022-12-18 07:49:00
  • 详解获取Spring MVC中所有RequestMapping以及对应方法和参数

    2023-12-09 21:29:17
  • android实现图片橡皮擦和快速染色功能

    2023-09-08 01:30:02
  • idea的使用之关于tomcat热部署的教程

    2022-12-02 20:16:46
  • 详解Android Flutter中SliverAppBar的使用教程

    2023-06-23 12:11:27
  • ViewPager实现漂亮的引导页

    2022-12-27 21:56:58
  • Android开发之ProgressBar字体随着进度条的加载而滚动

    2023-12-28 03:06:53
  • Android多个TAB选项卡切换效果

    2022-04-10 03:03:15
  • java+jsp+struts2实现发送邮件功能

    2023-08-28 18:25:27
  • Android获取和读取短信验证码的实现方法

    2021-09-06 07:29:17
  • C#中new操作符的工作机制

    2023-12-08 13:57:20
  • elasticsearch head的安装及使用过程解析

    2021-10-10 10:24:59
  • Android使用 Retrofit 2.X 上传多文件和多表单示例

    2023-08-06 03:48:25
  • 详解Jackson 使用以及性能介绍

    2023-02-21 00:08:31
  • Unity通过代码修改按钮点击效果

    2022-07-29 13:19:37
  • OpenXml读写Excel实例代码

    2023-09-10 18:11:22
  • Mybatis插件之自动生成不使用默认的驼峰式操作

    2023-11-19 01:20:03
  • 深入理解Android MD5数据加密

    2022-03-16 07:48:03
  • 带你了解C++的数组与函数

    2023-12-10 20:44:18
  • asp之家 软件编程 m.aspxhome.com