Python中的asyncio代码详解

作者:且听风吟 时间:2022-03-14 00:01:33 

asyncio介绍

熟悉c#的同学可能知道,在c#中可以很方便的使用 async 和 await 来实现异步编程,那么在python中应该怎么做呢,其实python也支持异步编程,一般使用 asyncio 这个库,下面介绍下什么是 asyncio :

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。 asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。 asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio中的基本概念

可以看见,使用asyncio库我们也可以在python代码中使用 async 和 await 。在 asyncio 中,有四个基本概念,分别是:

Eventloop

Eventloop 可以说是 asyncio 应用的核心,中央总控, Eventloop 实例提供了注册、取消、执行任务和回调 的方法。 简单来说,就是我们可以把一些异步函数注册到这个事件循环上,事件循环回循环执行这些函数(每次只能执行一个),如果当前正在执行的函数在等待I/O返回,那么事件循环就会暂停它的执行去执行其他函数。当某个函数完成I/O后会恢复,等到下次循环到它的时候就会继续执行。

Coroutine

协程本质就是一个函数,


import asyncio
import time
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
async def main():
start = time.perf_counter()
await asyncio.gather(a(), b())
print(f'{main.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
asyncio.run(main())

执行上述代码,可以看到类似这样的输出:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997

关于协程的具体介绍,可以参考我以前的文章python中的协程 不过以前的那种写法,需要使用装饰器,已经过时了。

Future

Future 是表示一个“未来”对象,类似于 javascript 中的 promise ,当异步操作结束后会把最终结果设置到这个 Future 对象上, Future 是对协程的封装。


>>> import asyncio
>>> def fun():
...  print("inner fun")
...  return 111
...
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(None, fun) #这里没有使用await
inner fun
>>> future #可以看到,fun方法状态是pending
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]>
>>> future.done() # 还没有完成
False
>>> [m for m in dir(future) if not m.startswith('_')]
['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
>>> future.result() #这个时候如果直接调用result()方法会报错
Traceback (most recent call last):
File "<input>", line 1, in <module>
asyncio.base_futures.InvalidStateError: Result is not set.
>>> async def runfun():
...  result=await future
...  print(result)
...  
>>>loop.run_until_complete(runfun()) #也可以通过 loop.run_until_complete(future) 来执行,这里只是为了演示await
111
>>> future
<Future finished result=111>
>>> future.done()
True
>>> future.result()
111
Task

Eventloop 除了支持协程,还支持注册 Future 和 Task 2种类型的对象,而 Future 是协程的封装, Future 对象提供了很多任务方法(如完成后的回调,取消,设置任务结果等等),但是一般情况下开发者不需要操作 Future 这种底层对象,而是直接用 Future 的子类 Task 协同的调度协程来实现并发。那么什么是 Task 呢?下面介绍下:

一个与 Future 类似的对象,可运行 Python 协程。非线程安全。 Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象, Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象完成被打包的协程将恢复执行。 事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task 、回调或执行IO操作。

下面看看用法:


>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
Suspending a
Resuming a

asyncio中一些常见用法的区别

Asyncio.gather和asyncio.wait

我们在上面的代码中用到过 asyncio.gather ,其实还有另外一种用法是 asyncio.wait ,他们都可以让多个协程并发执行,那么他们有什么区别呢?下面介绍下。


>>> import asyncio
>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  return 'A'
...
...
... async def b():
...  print('Suspending b')
...  await asyncio.sleep(1)
...  print('Resuming b')
...  return 'B'
...
>>> async def fun1():
...  return_value_a, return_value_b = await asyncio.gather(a(), b())
...  print(return_value_a,return_value_b)
...  
>>> asyncio.run(fun1())
Suspending a
Suspending b
Resuming b
Resuming a
A B
>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()])
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending b
Suspending a
Resuming b
Resuming a
{<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>}
set()
<Task finished coro=<a() done, defined at <input>:1> result='A'>
A

根据上述代码,我们可以看出两者的区别:

asyncio.gather 能收集协程的结果,而且会按照输入协程的顺序保存对应协程的执行结果,而 asyncio.wait 的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。

asyncio.wait 支持接受一个参数 return_when ,在默认情况下, asyncio.wait 会等待全部任务完成 (return_when='ALL_COMPLETED') ,它还支持 FIRST_COMPLETED (第一个协程完成就返回)和 FIRST_EXCEPTION (出现第一个异常就返回):


>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED)
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending a
Suspending b
Resuming b
{<Task finished coro=<b() done, defined at <input>:8> result='B'>}
{<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>}
<Task finished coro=<b() done, defined at <input>:8> result='B'>
B

一般情况下,用 asyncio.gather 就足够了。

asyncio.create_task和loop.create_task以及asyncio.ensure_future

这三种方法都可以创建 Task ,从Python3.7开始可以统一的使用更高阶的 asyncio.create_task .其实 asyncio.create_task 就是用的 loop.create_task . loop.create_task 接受的参数需要是一个协程,但是 asyncio.ensure_future 除了接受协程,还可以是 Future 对象或者 awaitable 对象:

  1. 如果参数是协程,其底层使用 loop.create_task ,返回 Task 对象

  2. 如果是 Future 对象会直接返回

  3. 如果是一个 awaitable 对象,会 await 这个对象的 __await__ 方法,再执行一次 ensure_future ,最后返回 Task 或者 Future 。

所以 ensure_future 方法主要就是确保这是一个 Future 对象,一般情况下直接用 asyncio.create_task 就可以了。

注册回调和执行同步代码

可以使用 add_done_callback 来添加成功回调:


def callback(future):
print(f'Result: {future.result()}')
def callback2(future, n):
print(f'Result: {future.result()}, N: {n}')
async def funa():
await asyncio.sleep(1)
return "funa"
async def main():
task = asyncio.create_task(funa())
task.add_done_callback(callback)
await task
#这样可以为callback传递参数
task = asyncio.create_task(funa())
task.add_done_callback(functools.partial(callback2, n=1))
await task
if __name__ == '__main__':
asyncio.run(main())

执行同步代码

如果有同步逻辑,想要用 asyncio 来实现并发,那么需要怎么做呢?下面看看:


def a1():
time.sleep(1)
return "A"
async def b1():
await asyncio.sleep(1)
return "B"
async def main():
loop = asyncio.get_running_loop()
await asyncio.gather(loop.run_in_executor(None, a1), b1())
if __name__ == '__main__':
start = time.perf_counter()
asyncio.run(main())
print(f'main method Cost: {time.perf_counter() - start}')
# 输出: main method Cost: 1.0050589740000002

可以使用 run_into_executor 来将同步函数逻辑转化成一个协程,第一个参数是要传递 concurrent.futures.Executor 实例的,传递 None 会选择默认的 executor 。

总结

以上所述是小编给大家介绍的Python中的asyncio代码详解,网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

来源:https://www.lylinux.net/article/2019/6/9/57.html

标签:python,asyncio
0
投稿

猜你喜欢

  • 编写Python脚本抓取网络小说来制作自己的阅读器

    2022-11-06 18:51:03
  • Django项目中model的数据处理以及页面交互方法

    2022-12-26 20:12:25
  • windows 10 设定计划任务自动执行 python 脚本的方法

    2023-11-11 20:45:57
  • 浅析PyTorch中nn.Module的使用

    2021-10-29 14:04:53
  • pandas删除行删除列增加行增加列的实现

    2022-10-27 13:22:55
  • python实现堆排序的实例讲解

    2023-01-06 20:50:38
  • Navicat数据存放位置和备份数据库路径设置方式

    2024-01-16 14:32:53
  • 恢复被删除的数据 Log Explorer for SQL Server 4.2 (一)

    2010-07-01 19:24:00
  • 在ASP.NET 2.0中操作数据之四十:自定义DataList编辑界面

    2023-07-07 04:45:20
  • js DNA动态序列比对代码

    2024-04-16 10:41:26
  • python实战之Scrapy框架爬虫爬取微博热搜

    2022-07-08 02:09:50
  • python编写adb截图工具的实现源码

    2021-03-24 08:50:15
  • python之pyinstaller组件打包命令和异常解析实战

    2023-08-05 05:34:23
  • JavaScript来实现打开链接页面的简单实例

    2024-04-30 09:51:32
  • Python实现监控Nginx配置文件的不同并发送邮件报警功能示例

    2023-09-22 11:07:33
  • 详解Python中while无限迭代循环方法

    2022-08-17 12:53:48
  • Perl下应当如何连接Access数据库

    2008-12-04 13:06:00
  • 详解Django中类视图使用装饰器的方式

    2023-12-20 15:35:57
  • Python中elasticsearch插入和更新数据的实现方法

    2023-02-25 02:14:20
  • Oracle数据库中的级联查询、级联删除、级联更新操作教程

    2023-06-25 19:01:42
  • asp之家 网络编程 m.aspxhome.com