浅谈一下python线程池简单应用
作者:Heney33 时间:2023-08-27 10:48:37
一、线程池简介
传统多线程方案会使用“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务时执行时间较短,而且执行次数及其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
一个线程的运行时间可以分为三部分:线程的启动时间、线程体的运行时间和线程的销毁时间。
在多线程处理的情景中,如果线程不能被重用,就意味着每次线程运行都要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,减低了效率。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。
当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和稳定性。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致Python解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
服务器CPU数有限,能够同时并发的线程数有限,并不是开得越多越好,以及线程切换时有开销的,如果线程切换过于频繁,反而会使性能降低。
线程池适用于:突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短的场景
二、线程池在python中的应用
从python3.2开始,标准库提供了concurrent.futures模块,它提供了两个子类:ThreadPoolExecutor和ProcessPoolExecutor。其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。不仅可以自动调度线程,还可以做到:
主线程可以获取某一个线程(或任务)的状态,以及返回值
当一个线程完成的时候,主线程能够立即知道
让多线程和多进程编码接口一致
使用线程池/进程池来管理并发编程,只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
ThreadPoolExecutor构造函数有两个参数:
一个是max_workers参数,用于指定线程池的最大线程数,如果不指定的话则默认是CPU核数的5倍。
另一个参数是thread_name_prefix,它用来指定线程池中线程的名称前缀(可选),如下:
threadPool = ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="test_")
Exectuor 提供了如下常用方法:
方法 | 描述 |
submit(fn, *args, **kwargs) | 将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,**kwargs 代表以关键字参数的形式为 fn 函数传入参数 |
map(func,*iterables, timeout=None, chunksize=1) | 该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理 |
shutdown(wait=True) | 关闭线程池。wait=True,等待池内所有任务执行完毕回收完资源后才继续;wait=False,立即返回,并不会等待池内的任务执行完毕。但不管wait参数为何值,整个程序都会等到所有任务执行完毕 |
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
方法 | 描述 |
cancel() | 取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True |
cancelled() | 返回 Future 代表的线程任务是否被成功取消 |
running() | 如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True |
done() | 如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True |
result(timeout=None) | 获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒 |
exception(timeout=None) | 获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None |
add_done_callback(fn) | 为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数 |
线程池用完后,应调用线程池的shutdown()方法,关闭线程池。也可使用with语句来管理线程池,可避免手动关闭线程池
示例一(使用submit方式):
from concurrent.futures import ThreadPoolExecutor # 导入ThreadPoolExecutor模块
import time
max_workers = 5
t = []
t1 = time.time()
# 作为线程任务的函数
def task(x, y):
return x + y
threadPool = ThreadPoolExecutor(max_workers) # 创建最大线程数为max_workers的线程池
for i in range(20): # 循环向线程池中提交task任务
future = threadPool.submit(task, i, i+1)
t.append(future)
# 若不需要获取返回值,则可不需要下面两行代码
for i in t:
print(i.result()) # 获取每个任务的返回值,result()会阻塞主线程
threadPool.shutdown() # 阻塞主线程,所有任务执行完后关闭线程池
print(time.time() - t1)
示例二(使用map方式):
from concurrent.futures import ThreadPoolExecutor# 导入ThreadPoolExecutor模块
max_workers = 5
t = []
t1 = time.time()
# 作为线程任务的函数
def task(x):
return x + (x + 1)
args = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19)
with ThreadPoolExecutor(max_workers) as threadPool: # 创建最大线程数为max_workers的线程池
results = threadPool.map(task, args) # 启动线程,并收集每个线任务的返回结果
# 若无返回值,则可不需要下面两行代码
for i in results:
print(i)
示例三:
as_complete():是一个生成器,在没有任务完成的时候会阻塞,在有某个任务完成的时候会yield这个任务,执行语句,继续阻塞,循环到所有任务结束,先完成的任务会先通知主线程
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
max_workers = 5
t = []
t1 = time.time()
# 作为线程任务的函数
def task(x, y):
return x + y
def handle_result(future):
print(future.result())
with ThreadPoolExecutor(max_workers) as threadPool: # 创建最大线程数为max_workers的线程池
for i in range(20): # 循环向线程池中提交task任务
future = threadPool.submit(task, i, i+1)
t.append(future)
# 若不需要获取返回值,则可不需要下面两行代码
for future in as_completed(t): # as_completed,哪个先完成就先处理哪个,会阻塞主线程,直到完成所有,除非设置timeout
future.add_done_callback(handle_result)
来源:https://blog.csdn.net/Heney33/article/details/124493267