Python多进程并发与同步机制超详细讲解

作者:alwaysrun 时间:2022-11-22 08:35:02 

在《多线程与同步》中介绍了多线程及存在的问题,而通过使用多进程而非线程可有效地绕过全局解释器锁。 因此,通过multiprocessing模块可充分地利用多核CPU的资源。

多进程

多进程是通过multiprocessing包来实现的,multiprocessing.Process对象(和多线程的threading.Thread类似)用来创建一个进程对象:

  • 在类UNIX平台上,需要对每个Process对象调用join()方法 (实际上等同于wait)避免其成为僵尸进程。

  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式。

  • 多进程应尽量避免共享资源。必要时可以通过共享内存和Manager的方法来共享资源。

僵尸进程

在unix或unix-like系统中,当一个子进程退出后,它就会变成一个僵尸进程,如果父进程没有通过wait系统调用来读取这个子进程的退出状态的话,这个子进程就会一直维持僵尸进程状态(占据部分系统资源,无法释放)。

要清除僵尸进程,有:

结束父进程(一般是主进程):当父进程退出的时候僵尸进程也会被随之清除。

读取子进程退出状态:如通过multiprocessing.Process产出的进程可以:

  • 调用join()来等待子进程的方法来(内部会wait子进程);

  • 在父进程中处理SIGCHLD信号:在处理程序中调用wait系统调用或者直接设置为SIG_IGN来清除僵尸进程;

把进程变成孤儿进程,这样进程就会自动交由init进程来自动处理。

通过设定signal.signal(signal.SIGCHLD, signal.SIG_IGN)或join进程可避免僵尸进程的产生

def zombieProc():
   print("zombie running")
   time.sleep(5)
   print("zombie exit")
if __name__ == '__main__':
   signal.signal(signal.SIGCHLD, signal.SIG_IGN)
   proc = multiprocessing.Process(target=zombieProc)
   proc.start()
   # proc.join()
   time.sleep(30)

Process类

Process([group [, target [, name [, args [, kwargs]]]]]),实例化得到的对象,表示一个子进程任务:

  • group参数未使用,值始终为None;

  • target表示调用对象,即子进程要执行的任务;

  • args表示调用对象的位置参数元组,args=(1, ‘test’, [‘one’]);

  • kwargs表示调用对象的字典参数,kwargs={‘name’:‘mike’,‘age’:18};

  • name为子进程的名称;

Process类的属性与方法:

  • start():启动进程,并执行其run方法;

  • run():进程启动时运行的方法,继承Process类时必须要实现方法;

  • terminate():强制终止进程,不会进行任何清理操作(若p创建了子进程,则子进程就成了僵尸进程);如进程还持有锁等,那么也不会被释放,进而导致死锁;

  • is_alive():返回进程是否在运行状态;

  • join([timeout]):等待进程终止;

  • daemon:默认值为False,如果设为True,代表为守护进程(当父进程终止时,随之终止;并且不能创建自己的新进程),必须在start()之前设置;

  • name:进程的名称;

  • pid/ident:进程的pid;

  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束;

  • authkey:进程的身份验证码(默认是由os.urandom()随机生成的32字符的字符串),在涉及网络连接的底层进程间通信时提供安全性;

也可通过os.getpid()获取进程的PID,os.getppid()获取父进程的PID。

函数方式

通过Process类直接运行函数:

def simpleRoutine(name, delay):
   print(f"routine {name} starting...")
   time.sleep(delay)
   print(f"routine {name} finished")
if __name__ == '__main__':
   thrOne = multiprocessing.Process(target=simpleRoutine, args=("First", 1))
   thrTwo = multiprocessing.Process(target=simpleRoutine, args=("Two", 2))
   thrOne.start()
   thrTwo.start()
   thrOne.join()
   thrTwo.join()

继承方式

通过继承Process类,并实现run方法来启动进程:

class SimpleProcess(multiprocessing.Process):
   def __init__(self, name, delay):
       super().__init__()
       self.name = name
       self.delay = delay
   def run(self):
       print(f"Process {self.name} starting...")
       time.sleep(self.delay)
       print(f"Process {self.name} finished")
if __name__ == '__main__':
   thrOne = SimpleProcess("First", 2)
   thrTwo = SimpleProcess("Second", 1)
   thrOne.start()
   thrTwo.start()
   thrOne.join()
   thrTwo.join()

同步机制

进程间同步与线程间同步类似(只是所有对象都在multiprocessing模块中):

  • Lock/Rlock: 通过acquire()和release()来获取与释放锁;

  • Event: 事件信号,通过set()和clear()来设定与清楚信号;通过wait()来等待信号;

  • Condition: 条件变量;通过wait()用来等待条件,通过notify/notify_all()来通知等待此条件的进程(等待与通知前,都需先持有锁);

  • Semaphore: 信号量;维护一个计数器;

  • Barrier: 屏障;只有等待进程数量达到要求数量,才会同时开始执行屏障保护后的代码。

屏障示例:

def waitBarrier(name, barr: multiprocessing.Barrier):
   print(f"{name} waiting for open")
   try:
       barr.wait()
       print(f"{name} running")
       time.sleep(2)
   except multiprocessing.BrokenBarrierError:
       print(f"{name} exception")
   print(f"{name} finished")
def openFun():  # 屏障满足条件时,执行一次
   print("barrier opened")
if __name__ == '__main__':
   signal = multiprocessing.Barrier(5, openFun)
   for i in range(10):
       multiprocessing.Process(target=waitBarrier, args=(i, signal)).start()  
       time.sleep(1)  

当第5个进程启动时,前面5个进程会同时开始执行(openFun函数会执行一次);当第10个进程启动时,后面5个进程会同时开始执行一次(openFun函数又会执行一次)。

状态管理Managers

Managers提供了一种创建由多进程(包括跨机器间进程共享)共享的数据的方式:

  • multiprocessing.Manager()返回一个SyncManager对象;此对象对应着一个管理者子进程(manager process)以及代理(其他子进程使用);

  • 它确保当某一进程修改了共享对象之后,其他进程中的共享对象也会得到更新;

  • 其支持的类型有:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value和Array。

多进程进共享字典与列表(每个进程中都能看到其他进程修改过的内容)

def worker(dictContext: dict, lstContext: list, name):
   pid = os.getpid()
   dictContext[name] = pid
   lstContext.append(pid)
   print(f"{name} worker: {lstContext}")
def managerContext():
   mgr = multiprocessing.Manager()
   multiprocessing.managers
   dictContext = mgr.dict()
   lstContext = mgr.list()
   jobs = [multiprocessing.Process(target=worker, args=(dictContext, lstContext, i)) for i in range(10)]
   for j in jobs:
       j.start()
   for j in jobs:
       j.join()
   print('Results:', dictContext)

来源:https://blog.csdn.net/alwaysrun/article/details/127164976

标签:Python,多进程,并发,同步
0
投稿

猜你喜欢

  • PYQT5开启多个线程和窗口,多线程与多窗口的交互实例

    2023-07-19 04:21:21
  • 网页屏蔽鼠标左右键和键盘按键功能

    2007-10-17 21:30:00
  • 利用Python计算质数与完全数的方法实例

    2022-12-30 23:35:26
  • Linux下安装MySQL5.7.19问题小结

    2024-01-16 06:21:37
  • Python基于FTP模块实现ftp文件上传操作示例

    2024-01-02 00:04:38
  • python 从list中随机取值的方法

    2021-10-06 04:08:11
  • 哪种Python框架适合你?简单介绍几种主流Python框架

    2023-04-27 03:21:52
  • python 统计代码耗时的几种方法分享

    2023-11-03 19:51:06
  • 在PyCharm的 Terminal(终端)切换Python版本的方法

    2021-10-31 08:37:07
  • Oracle常见错误代码的分析与解决

    2024-01-14 20:28:51
  • django 环境变量配置过程详解

    2021-11-19 03:07:52
  • processlist命令 查看mysql 线程

    2024-01-24 02:58:30
  • 原生js实现对Ajax的封装(仿jquery)

    2024-04-22 12:57:06
  • windows下python安装小白入门教程

    2022-01-05 08:45:44
  • 未知高度的图片垂直居中

    2010-12-17 12:36:00
  • golang 字符串比较是否相等的方法示例

    2024-02-05 14:45:07
  • python实现猜数字游戏

    2022-12-23 17:30:06
  • 2007/12/23更新创意无限,简单实用(javascript log)

    2024-04-26 17:11:46
  • 数据库中两张表之间的数据同步增加、删除与更新实现思路

    2024-01-21 18:50:24
  • MySQL如何快速创建800w条测试数据表

    2024-01-19 09:27:28
  • asp之家 网络编程 m.aspxhome.com