Python3多进程 multiprocessing 模块实例详解

作者:Citizen_Wang 时间:2022-05-27 11:11:51 

本文实例讲述了Python3多进程 multiprocessing 模块。分享给大家供大家参考,具体如下:

多进程 Multiprocessing 模块

multiprocessing 模块官方说明文档

Process 类

Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

star() 方法启动进程,
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入

示例:


import multiprocessing
import os
def run_proc(name):
 print('Child process {0} {1} Running '.format(name, os.getpid()))
if __name__ == '__main__':
 print('Parent process {0} is Running'.format(os.getpid()))
 for i in range(5):
   p = multiprocessing.Process(target=run_proc, args=(str(i),))
   print('process start')
   p.start()
 p.join()
 print('Process close')

结果:

Parent process 809 is Running
process start
process start
process start
process start
process start
Child process 0 810 Running
Child process 1 811 Running
Child process 2 812 Running
Child process 3 813 Running
Child process 4 814 Running
Process close

Pool

Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。

- Pool 对象调用 join 方法会等待所有的子进程执行完毕
- 调用 join 方法之前,必须调用 close
- 调用 close 之后就不能继续添加新的 Process 了

pool.apply_async

apply_async 方法用来同步执行进程,允许多个进程同时进入池子。


import multiprocessing
import os
import time
def run_task(name):
 print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
 time.sleep(1)
 print('Task {0} end.'.format(name))
if __name__ == '__main__':
 print('current process {0}'.format(os.getpid()))
 p = multiprocessing.Pool(processes=3)
 for i in range(6):
   p.apply_async(run_task, args=(i,))
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All processes done!')

结果:

current process 921
Waiting for all subprocesses done...
Task 0 pid 922 is running, parent id is 921
Task 1 pid 923 is running, parent id is 921
Task 2 pid 924 is running, parent id is 921
Task 0 end.
Task 3 pid 922 is running, parent id is 921
Task 1 end.
Task 4 pid 923 is running, parent id is 921
Task 2 end.
Task 5 pid 924 is running, parent id is 921
Task 3 end.
Task 4 end.
Task 5 end.
All processes done!

pool.apply

apply(func[, args[, kwds]])

该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。


import multiprocessing
import os
import time
def run_task(name):
 print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
 time.sleep(1)
 print('Task {0} end.'.format(name))
if __name__ == '__main__':
 print('current process {0}'.format(os.getpid()))
 p = multiprocessing.Pool(processes=3)
 for i in range(6):
   p.apply(run_task, args=(i,))
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All processes done!')

结果:

Task 0 pid 928 is running, parent id is 927
Task 0 end.
Task 1 pid 929 is running, parent id is 927
Task 1 end.
Task 2 pid 930 is running, parent id is 927
Task 2 end.
Task 3 pid 928 is running, parent id is 927
Task 3 end.
Task 4 pid 929 is running, parent id is 927
Task 4 end.
Task 5 pid 930 is running, parent id is 927
Task 5 end.
Waiting for all subprocesses done...
All processes done!

Queue 进程间通信

Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put。

put 方法

Put 方法用来插入数据到队列中,有两个可选参数,blocked 和 timeout。
- blocked = True(默认值),timeout 为正

该方法会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常。

blocked = False

如果 Queue 已满,立刻抛出 Queue.Full 异常

get 方法

get 方法用来从队列中读取并删除一个元素。有两个参数可选,blocked 和 timeout
- blocked = False (默认),timeout 正值

等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。

blocked = True

Queue 有一个值可用,立刻返回改值;Queue 没有任何元素,


from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def proc_write(q,urls):
 print('Process(%s) is writing...' % os.getpid())
 for url in urls:
   q.put(url)
   print('Put %s to queue...' % url)
   time.sleep(random.random())
# 读数据进程执行的代码:
def proc_read(q):
 print('Process(%s) is reading...' % os.getpid())
 while True:
   url = q.get(True)
   print('Get %s from queue.' % url)
if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
 proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
 proc_reader = Process(target=proc_read, args=(q,))
 # 启动子进程proc_writer,写入:
 proc_writer1.start()
 proc_writer2.start()
 # 启动子进程proc_reader,读取:
 proc_reader.start()
 # 等待proc_writer结束:
 proc_writer1.join()
 proc_writer2.join()
 # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
 proc_reader.terminate()

结果:

Process(1083) is writing...
Put url_1 to queue...
Process(1084) is writing...
Put url_4 to queue...
Process(1085) is reading...
Get url_1 from queue.
Get url_4 from queue.
Put url_5 to queue...
Get url_5 from queue.
Put url_2 to queue...
Get url_2 from queue.
Put url_6 to queue...
Get url_6 from queue.
Put url_3 to queue...
Get url_3 from queue.

Pipe 进程间通信

常用来在两个进程间通信,两个进程分别位于管道的两端。

multiprocessing.Pipe([duplex])

示例一和示例二,也是网上找的别人的例子,尝试理解并增加了注释而已。网上的例子,大多是例子一和例子二在一起的,这里分开来看,比较容易理解。

示例一:


from multiprocessing import Process, Pipe
def send(pipe):
 pipe.send(['spam'] + [42, 'egg'])  # send 传输一个列表
 pipe.close()
if __name__ == '__main__':
 (con1, con2) = Pipe()              # 创建两个 Pipe 实例
 sender = Process(target=send, args=(con1, ))   # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
 sender.start()                  # Process 类启动进程
 print("con2 got: %s" % con2.recv())       # 管道的另一端 con2 从send收到消息
 con2.close()                   # 关闭管道

结果:

con2 got: ['spam', 42, 'egg']

示例二:


from multiprocessing import Process, Pipe
def talk(pipe):
 pipe.send(dict(name='Bob', spam=42))      # 传输一个字典
 reply = pipe.recv()               # 接收传输的数据
 print('talker got:', reply)
if __name__ == '__main__':
 (parentEnd, childEnd) = Pipe()         # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
 child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child
 child.start()                  # 启动进程
 print('parent got:', parentEnd.recv())     # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
 parentEnd.send({x * 2 for x in 'spam'})     # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
 child.join()                  # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
 print('parent exit')

结果:

parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit

希望本文所述对大家Python程序设计有所帮助。

来源:https://blog.csdn.net/cityzenoldwang/article/details/78584175

标签:Python3,多进程,multiprocessing
0
投稿

猜你喜欢

  • exe反编译为.py文件的方法

    2022-06-10 16:18:16
  • python http通信接口开发示例

    2022-06-07 05:15:29
  • Keras多线程机制与flask多线程冲突的解决方案

    2023-09-12 02:10:51
  • python中split(), os.path.split()和os.path.splitext()的用法

    2022-03-23 09:06:40
  • MySQL中处理各种重复的一些方法

    2024-01-12 17:45:14
  • PyQt5每天必学之切换按钮

    2023-06-14 09:07:06
  • mysql 8.0.18 安装配置图文教程

    2024-01-21 09:46:33
  • 戏说编码发展史

    2022-12-12 02:48:05
  • 基于K-Means聚类算法演示及可视化展示

    2023-02-08 16:01:39
  • python 协程 gevent原理与用法分析

    2021-10-12 23:36:19
  • QCon大会散记

    2010-05-03 14:19:00
  • Python求正态分布曲线下面积实例

    2021-01-28 18:20:07
  • python 正则式使用心得

    2021-09-17 14:39:49
  • python调用百度语音REST API

    2022-09-16 18:19:07
  • TensorFlow使用Graph的基本操作的实现

    2023-04-10 22:22:37
  • Vue的v-if和v-show的区别图文介绍

    2024-04-30 10:41:40
  • photoshop快捷键大全及使用技巧

    2007-10-26 07:40:00
  • python获取外网ip地址的方法总结

    2022-11-09 18:40:06
  • Swoole webSocket消息服务系统方案设计详解

    2023-06-12 16:16:32
  • 用python求一个数组的和与平均值的实现方法

    2021-01-10 20:11:24
  • asp之家 网络编程 m.aspxhome.com