swoole_process实现进程池的方法示例

作者:opso 时间:2024-06-05 15:40:23 

swoole —— 重新定义PHP

swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process 的研究在swoole中显得尤为重要。

预备知识

IO多路复用

swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数。

  • epoll 模型下会持续监听自己名下的素有socket 描述符 fd

  • 当触发了 socket 监听的事件时,epoll 函数才会响应,并返回所有监听该时间的 socket 集合

  • epoll 的本质是阻塞IO,它的优点在于能同事处理大量socket连接

Event loop 事件循环

swoole 对 epoll 实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)

  • Event loop 是一个Reactor线程,其中运行了一个epoll实例。

  • 通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数

  • 不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。

swoole_process

  • 基于C语言封装的进程管理模块,方便php来调用

  • 内置管道、消息队列接口,方便实现进程间通信

我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。

  • 静态模式 即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。

  • 动态模式 指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求

接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。

假如有个定时投递的任务队列:


<?php

/**
* 动态进程池,类似fpm
* 动态新建进程
* 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
*/

// 一个进程定时投递任务

/**
* 1. tick
* 2. process及其管道通讯
* 3. event loop 事件循环
*/
class processPool
{
 private $pool;

/**
  * @var swoole_process[] 记录所有worker的process对象
  */
 private $workers = [];

/**
  * @var array 记录worker工作状态
  */
 private $used_workers = [];

/**
  * @var int 最小进程数
  */
 private $min_woker_num = 5;

/**
  * @var int 初始进程数
  */
 private $start_worker_num = 10;

/**
  * @var int 最大进程数
  */
 private $max_woker_num = 20;

/**
  * 进程闲置销毁秒数
  * @var int
  */
 private $idle_seconds = 5;

/**
  * @var int 当前进程数
  */
 private $curr_num;

/**
  * 闲置进程时间戳
  * @var array
  */
 private $active_time = [];

public function __construct()
 {
   $this->pool = new swoole_process(function () {
     // 循环建立worker进程
     for ($i = 0; $i < $this->start_worker_num; $i++) {
       $this->createWorker();
     }
     echo '初始化进程数:' . $this->curr_num . PHP_EOL;
     // 每秒定时往闲置的worker的管道中投递任务
     swoole_timer_tick(1000, function ($timer_id) {
       static $count = 0;
       $count++;
       $need_create = true;
       foreach ($this->used_workers as $pid => $used) {
         if ($used == 0) {
           $need_create = false;
           $this->workers[$pid]->write($count . ' job');
           // 标记使用中
           $this->used_workers[$pid] = 1;
           $this->active_time[$pid] = time();
           break;
         }
       }
       foreach ($this->used_workers as $pid => $used)
         // 如果所有worker队列都没有闲置的,则新建一个worker来处理
         if ($need_create && $this->curr_num < $this->max_woker_num) {
           $new_pid = $this->createWorker();
           $this->workers[$new_pid]->write($count . ' job');
           $this->used_workers[$new_pid] = 1;
           $this->active_time[$new_pid] = time();
         }

// 闲置超过一段时间则销毁进程
       foreach ($this->active_time as $pid => $timestamp) {
         if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
           // 销毁该进程
           if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
             $this->workers[$pid]->write('exit');
             unset($this->workers[$pid]);
             $this->curr_num = count($this->workers);
             unset($this->used_workers[$pid]);
             unset($this->active_time[$pid]);
             echo "{$pid} destroyed\n";
             break;
           }
         }
       }

echo "任务{$count}/{$this->curr_num}\n";

if ($count == 20) {
         foreach ($this->workers as $pid => $worker) {
           $worker->write('exit');
         }
         // 关闭定时器
         swoole_timer_clear($timer_id);
         // 退出进程池
         $this->pool->exit(0);
         exit();
       }
     });

});

$master_pid = $this->pool->start();
   echo "Master $master_pid start\n";

while ($ret = swoole_process::wait()) {
     $pid = $ret['pid'];
     echo "process {$pid} existed\n";
   }
 }

/**
  * 创建一个新进程
  * @return int 新进程的pid
  */
 public function createWorker()
 {
   $worker_process = new swoole_process(function (swoole_process $worker) {
     // 给子进程管道绑定事件
     swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
       $data = trim($worker->read());
       if ($data == 'exit') {
         $worker->exit(0);
         exit();
       }
       echo "{$worker->pid} 正在处理 {$data}\n";
       sleep(5);
       // 返回结果,表示空闲
       $worker->write("complete");
     });
   });

$worker_pid = $worker_process->start();

// 给父进程管道绑定事件
   swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
     $data = trim($worker_process->read());
     if ($data == 'complete') {
       // 标记为空闲
//        echo "{$worker_process->pid} 空闲了\n";
       $this->used_workers[$worker_process->pid] = 0;
     }
   });

// 保存process对象
   $this->workers[$worker_pid] = $worker_process;
   // 标记为空闲
   $this->used_workers[$worker_pid] = 0;
   $this->active_time[$worker_pid] = time();
   $this->curr_num = count($this->workers);
   return $worker_pid;
 }

}

new processPool();

来源:https://opso.coding.me/2018/07/07/swoole-process/

标签:swoole,process,进程池
0
投稿

猜你喜欢

  • 7个流行的Python强化学习算法及代码实现详解

    2021-07-06 08:38:03
  • Springboot集成Camunda使用Mysql介绍

    2024-01-22 12:41:36
  • Zend Framework动作助手Redirector用法实例详解

    2024-05-13 09:53:15
  • Linux下安装MySQL5.7.19问题小结

    2024-01-16 06:21:37
  • python通过Seq2Seq实现闲聊机器人

    2021-09-02 13:39:15
  • 基于jQuery 实现bootstrapValidator下的全局验证

    2024-04-08 10:56:41
  • Python中变量的输入输出实例代码详解

    2022-11-26 00:47:56
  • MySQL数据库JDBC编程详解流程

    2024-01-15 09:39:55
  • Python在centos7.6上安装python3.9的详细教程(默认python版本为2.7.5)

    2022-05-09 18:58:43
  • Python实现前向和反向自动微分的示例代码

    2022-10-25 15:52:03
  • python实现桌面托盘气泡提示

    2023-05-16 21:27:15
  • Django中的FBV和CBV用法详解

    2023-09-15 10:41:06
  • ASP.NET对路径"xxxxx"的访问被拒绝的解决方法小结

    2023-07-07 20:08:41
  • Python Matplotlib绘图基础详细教程

    2024-01-16 04:34:41
  • Linux下MySQL数据库的主从同步复制配置

    2024-01-19 12:36:41
  • MySQL Order By语法介绍

    2024-01-19 03:29:48
  • selenium+python实现文件上传操作的方法实例

    2022-05-06 13:21:49
  • Python生成器常见问题及解决方案

    2023-01-23 19:09:43
  • python实现人机五子棋

    2022-06-15 08:07:49
  • Python time模块之时间戳与结构化时间的使用

    2024-01-02 09:07:51
  • asp之家 网络编程 m.aspxhome.com