Python进程间通讯与进程池超详细讲解

作者:alwaysrun 时间:2023-09-05 16:50:41 

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;

  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;

  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
   try:
       for i in range(10):
           qu.put(f"{name}-{i + 1}")
           time.sleep(.1)
   except ValueError:
       print("queue closed")
   print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
   try:
       while True:
           r = qu.get()
           print(f"{name} recv: {r}")
   except ValueError:
       print("queue closed")
   print(f"{name}: get complete")
if __name__ == '__main__':
   qu = multiprocessing.Queue(100)
   puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
   gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
   list(map(lambda f: f.start(), puts))
   list(map(lambda f: f.start(), gets))
   for f in puts:
       f.join()
   print("To close")
   qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;

  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;

  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;

  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。

def pipeProc(pipe):
   outPipe, inPipe = pipe
   inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
   try:
       while True:
           r = outPipe.recv()
           print("Recv:", r)
   except EOFError:
       print("RECV end")
if __name__ == '__main__':
   outPipe, inPipe = multiprocessing.Pipe()
   sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
   sub.start()
   outPipe.close() # 必须在进程成功运行后,才可关闭
   with inPipe:
       for x in range(10):
           inPipe.send(x)
           time.sleep(.1)
   print("send complete")
   sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;

  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;

  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;

  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;

  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;

  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;

  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;

  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;

  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);

  • terminate():立即终止所有任务;

  • join():等待工作进程完成(必须已close或terminate了);

def poolWorker():
   print(f"worker in process {os.getpid()}")
   time.sleep(1)
def poolWorkerOne(name):
   print(f"worker one {name} in process {os.getpid()}")
   time.sleep(random.random())
   return name
def poolWorkerTwo(first, second):
   res = first + second
   print(f"worker two {res} in process {os.getpid()}")
   time.sleep(1./(first+1))
   return res
def poolInit():
   print("pool init")
if __name__ == '__main__':
   workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
   with workers:
       for i in range(5):
           workers.apply_async(poolWorker)
       arg = [(i, i) for i in range(10)]
       workers.map_async(poolWorkerOne, arg)
       results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
       print("Starmap:", results.get())
       results = workers.imap_unordered(poolWorkerOne, arg)
       for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
           print("Unordered:", r)
   # 必须保证workers已close了
   workers.join()

来源:https://blog.csdn.net/alwaysrun/article/details/127185356

标签:Python,进程间通讯,进程池
0
投稿

猜你喜欢

  • Python网络编程详解

    2022-01-09 15:25:10
  • php+mysqli实现批量替换数据库表前缀的方法

    2023-11-22 10:15:55
  • Python 不设计 do-while 循环结构的理由

    2021-08-04 11:55:19
  • Python去除、替换字符串空格的处理方法

    2022-07-17 05:38:25
  • 解决python-redis-lock分布式锁的问题

    2023-05-23 18:57:49
  • 详解MySQL数据库安全配置

    2010-01-26 15:19:00
  • 简单理解vue中track-by属性

    2024-04-30 10:21:05
  • JavaScript实现动态数字时钟

    2024-04-10 11:01:09
  • 浅谈tensorflow1.0 池化层(pooling)和全连接层(dense)

    2021-11-09 05:06:17
  • MySQL数据备份之mysqldump的使用详解

    2024-01-18 20:46:57
  • Windows XP操作系统下的MYSQL安装过程

    2008-11-24 12:52:00
  • Python实现批量压缩文件/文件夹zipfile的使用

    2021-03-08 07:08:21
  • python学习之第三方包安装方法(两种方法)

    2021-02-20 03:29:40
  • Python基本数据类型详细介绍

    2021-10-14 07:02:50
  • Python中使用HTMLParser解析html实例

    2023-01-17 11:47:16
  • Python下载网络小说实例代码

    2023-08-01 18:22:17
  • PyCharm上安装Package的实现(以pandas为例)

    2021-09-21 12:26:30
  • Django ORM 多表查询示例代码

    2021-07-25 05:22:02
  • php ajax异步读取rss文档数据

    2023-10-17 19:59:02
  • python 获得任意路径下的文件及其根目录的方法

    2022-02-02 17:32:15
  • asp之家 网络编程 m.aspxhome.com