Python concurrent.futures模块使用实例

作者:小粉优化大师 时间:2023-10-06 07:30:22 

这篇文章主要介绍了Python concurrent.futures模块使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

concurrent.futures的作用:

管理并发任务池。concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换

1、基于线程池使用map()

futures_thread_pool_map.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import threading
import time

def task(n):
 print('{}: 睡眠 {}'.format(threading.current_thread().name,n))
 time.sleep(n / 10)
 print('{}: 执行完成 {}'.format(threading.current_thread().name,n))
 return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始运行')
results = ex.map(task, range(5, 0, -1)) #返回值是generator 生成器
print('main: 未处理的结果 {}'.format(results))
print('main: 等待真实结果')
real_results = list(results)
print('main: 最终结果: {}'.format(real_results))

运行效果


[root@ mnt]# python3 futures_thread_pool_map.py
main: 开始运行
ThreadPoolExecutor-0_0: 睡眠 5
ThreadPoolExecutor-0_1: 睡眠 4
main: 未处理的结果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678>
main: 等待真实结果
ThreadPoolExecutor-0_1: 执行完成 4
ThreadPoolExecutor-0_1: 睡眠 3
ThreadPoolExecutor-0_0: 执行完成 5
ThreadPoolExecutor-0_0: 睡眠 2
ThreadPoolExecutor-0_0: 执行完成 2
ThreadPoolExecutor-0_0: 睡眠 1
ThreadPoolExecutor-0_1: 执行完成 3
ThreadPoolExecutor-0_0: 执行完成 1
main: 最终结果: [0.5, 0.4, 0.3, 0.2, 0.1]

2、futures执行单个任务

futures_thread_pool_submit.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import threading
import time

def task(n):
 print('{}: 睡眠 {}'.format(threading.current_thread().name, n))
 time.sleep(n / 10)
 print('{}: 执行完成 {}'.format(threading.current_thread().name, n))
 return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main :开始')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('等待运行结果')
results = f.result()
print('main: result:{}'.format(results))
print('main: future 之后的结果:{}'.format(f))

运行效果


[root@ mnt]# python3 futures_thread_pool_submit.py
main :开始
ThreadPoolExecutor-0_0: 睡眠 5
main: future: <Future at 0x7f40c0a6a400 state=running>
等待运行结果
ThreadPoolExecutor-0_0: 执行完成 5
main: result:0.5
main: future 之后的结果:<Future at 0x7f40c0a6a400 state=finished returned float>

3、futures.as_completed()按任意顺序运行结果

futures_as_completed.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time
from concurrent import futures

def task(n):
 time.sleep(random.random())
 return (n, n / 10)

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始')
wait_for = [
 ex.submit(task, i) for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
 print('main: result:{}'.format(f.result()))

运行效果


[root@ mnt]# python3 futures_as_completed.py
main: 开始
main: result:(5, 0.5)
main: result:(4, 0.4)
main: result:(3, 0.3)
main: result:(1, 0.1)
main: result:(2, 0.2)

4、Future回调之futures.add_done_callback()

futures_future_callback.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import time

def task(n):
 print('task {} : 睡眠'.format(n))
 time.sleep(0.5)
 print('task {} : 完成'.format(n))
 return n / 10

def done(fn):
 if fn.cancelled():
   print('done {}:取消'.format(fn.arg))
 elif fn.done():
   error = fn.exception()
   if error:
     print('done {} : 错误返回 : {}'.format(fn.arg, error))
   else:
     result = fn.result()
     print('done {} : 正常返回 : {}'.format(fn.arg, result))

if __name__ == '__main__':
 ex = futures.ThreadPoolExecutor(max_workers=2)
 print('main : 开始')
 f = ex.submit(task, 5)
 f.arg = 5
 f.add_done_callback(done)
 result = f.result()

运行效果


[root@ mnt]# python3 futures_future_callback.py
main : 开始
task 5 : 睡眠
task 5 : 完成
done 5 : 正常返回 : 0.5

5、Future任务取消之futures.cancel()

futures_future_callback_cancel.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import time

def task(n):
 print('task {} : 睡眠'.format(n))
 time.sleep(0.5)
 print('task {} : 完成'.format(n))
 return n / 10

def done(fn):
 if fn.cancelled():
   print('done {}:取消'.format(fn.arg))
 elif fn.done():
   error = fn.exception()
   if error:
     print('done {} : 错误返回 : {}'.format(fn.arg, error))
   else:
     result = fn.result()
     print('done {} : 正常返回 : {}'.format(fn.arg, result))

if __name__ == '__main__':
 ex = futures.ThreadPoolExecutor(max_workers=2)
 print('main : 开始')
 tasks = []

for i in range(10, 0, -1):
   print('main: submitting {}'.format(i))
   f = ex.submit(task, i)
   f.arg = i
   f.add_done_callback(done)
   tasks.append((i, f))

for i, task_obj in reversed(tasks):
   if not task_obj.cancel():
     print('main: 不能取消{}'.format(i))
 ex.shutdown()

运行效果


[root@mnt]# python3 futures_future_callback_cancel.py
main : 开始
main: submitting 10
task 10 : 睡眠
main: submitting 9
task 9 : 睡眠
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
done 1:取消
done 2:取消
done 3:取消
done 4:取消
done 5:取消
done 6:取消
done 7:取消
done 8:取消
main: 不能取消9
main: 不能取消10
task 10 : 完成
done 10 : 正常返回 : 1.0
task 9 : 完成
done 9 : 正常返回 : 0.9

6、Future异常的处理

futures_future_exception


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures

def task(n):
 print('{} : 开始'.format(n))
 raise ValueError('这个值不太好 {}'.format(n))

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始...')

f = ex.submit(task, 5)

error = f.exception()
print('main: error:{}'.format(error))

try:
 result = f.result()
except ValueError as e:
 print('访问结果值的异常 {}'.format(e))

运行效果


[root@mnt]# python3 futures_future_exception.py
main: 开始...
5 : 开始
main: error:这个值不太好 5
访问结果值的异常 这个值不太好 5

7、Future上下文管理即利用with打开futures.ThreadPoolExecutor()

futures_context_manager.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures

def task(n):
 print(n)

with futures.ThreadPoolExecutor(max_workers=2) as ex:
 print('main: 开始')
 ex.submit(task, 1)
 ex.submit(task, 2)
 ex.submit(task, 3)
 ex.submit(task, 4)
print('main: 结束')

运行效果


[root@ mnt]# python3 futures_context_manager.py
main: 开始
2
4
main: 结束

8、基于进程池使用map()

futures_process_pool_map.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import os

def task(n):
 return (n, os.getpid())

if __name__ == '__main__':
 ex = futures.ProcessPoolExecutor(max_workers=2)
 results = ex.map(task, range(50, 0, -1))
 for n, pid in results:
   print('task {} in 进程id {}'.format(n, pid))

运行效果


[root@ mnt]# python3 futures_process_pool_map.py
task 5 in 进程id 9192
task 4 in 进程id 8668
task 3 in 进程id 9192
task 2 in 进程id 8668
task 1 in 进程id 9192

9、基于进程池异常处理

futures_process_pool_broken.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import os
import signal

def task(n):
 return (n, os.getpid())

if __name__ == '__main__':
 with futures.ProcessPoolExecutor(max_workers=2) as ex:
   print('获取工作进程的id')
   f1 = ex.submit(os.getpid)
   pid1 = f1.result()

print('结束进程 {}'.format(pid1))
   os.kill(pid1, signal.SIGHUP)

print('提交其它进程')
   f2 = ex.submit(os.getpid)
   try:
     pid2 = f2.result()
   except futures.process.BrokenProcessPool as e:
     print('不能开始新的任务:{}'.format(e))

运行效果


[root@ mnt]# python3 futures_process_pool_broken.py
获取工作进程的id
结束进程 104623
提交其它进程
不能开始新的任务:A process in the process pool was terminated abruptly while the future was running or pending.

来源:https://www.cnblogs.com/ygbh/p/12072085.html

标签:Python,concurrent,futures,模块
0
投稿

猜你喜欢

  • 仅IE9/10同时支持script元素的onload和onreadystatechange事件分析

    2024-04-16 09:27:54
  • python读写修改Excel之xlrd&xlwt&xlutils

    2022-04-03 16:35:43
  • Linux下编译安装MySQL-Python教程

    2021-05-03 05:05:40
  • 基于XML语言的来实现购物车的详细代码

    2008-05-29 13:57:00
  • python编程控制Android手机操作技巧示例

    2021-12-01 07:59:05
  • python多进程提取处理大量文本的关键词方法

    2022-02-17 13:07:36
  • SQL Server日志清除的两种方法

    2009-03-16 17:01:00
  • 简单的PHP缓存设计实现代码

    2023-10-25 19:58:08
  • 浅谈Python的正则表达式

    2022-05-11 00:54:16
  • python networkx 包绘制复杂网络关系图的实现

    2021-03-27 06:11:39
  • 学会sql数据库关系图(Petshop)

    2024-01-28 18:43:23
  • 浅谈webpack对样式的处理

    2024-04-23 09:06:51
  • Python实现好友全头像的拼接实例(推荐)

    2021-08-21 17:29:35
  • RDFa介绍——构建更友好的web页面

    2009-09-19 17:01:00
  • Ajax缓存和编码问题的最终解决方案

    2010-03-30 13:42:00
  • FrontPage2002简明教程四:网页超级链接

    2008-09-17 11:23:00
  • Pytorch Tensor基本数学运算详解

    2022-12-09 03:26:50
  • 使用python-cv2实现Harr+Adaboost人脸识别的示例

    2022-03-16 01:05:10
  • SQL server分页的四种方法思路详解(最全面教程)

    2024-01-16 20:19:52
  • JS统计Flash被网友点击过的代码

    2024-05-03 15:06:18
  • asp之家 网络编程 m.aspxhome.com