Skip to main content

asyncio

基础使用

主要模块

  • 事件循环 - 管理所有时间
  • Future对象 - 尚未完成的计算,还未完成的结果
  • Task - Future对象的子类,用来并发运行多个任务

关键方法

几个重要函数比较

函数传参返回值返回值顺序函数意义
asyncio.gather可以传递多个协程或者Futures,函数会自动将协程包装成task,例如协程生成器。包含Futures结果的list按照原始顺序排列注重收集结果,等待一堆Futures并按照顺序返回结果
asyncio.waita list of futures返回两个Future集合 (done, pending)无序(暂定)是一个协程等传给他的所有协程都运行完之后结束,并不直接返回结果
asyncio.as_completeda list of futures返回一个协程迭代器按照完成顺序返回的迭代器每次迭代只返回已经完成的Futures

引入

import asyncio

执行异步任务

# python3.7+ 支持写法
asyncio.run(main())

# python3.6及以下版本写法
event_loop = asyncio.get_event_loop()
result, pending = event_loop.run_until_complete(asyncio.gather(main()))
event_loop.close()

关键方法

创建task


基础用例

async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')

asyncio.run(main())

延时执行

asyncio.sleep()

async def main():
await asyncio.sleep(1)
print('... World!')

coro = main()

loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
loop.close()

异步执行

run_until_complete()

async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')

t = main()
task = asyncio.create_task(t)

# 3.6
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
loop.close()

# 3.7
asyncio.run(t)
res = task.result()

并发执行

create_task()

async def multiple():
async def test():
await asyncio.sleep(1)

task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test())

await task1
await task2

asyncio.run(multiple())

gather()

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

async def main():
task_list = set()

task1 = task_func()
task_list.add(task1)

task2 = task_func()
task_list.add(task2)

# this is also valid:
result_list = await asyncio.gather(
*task_list,
return_exceptions:bool, # 是否返回错误
)
return result_list

# 最终执行
asyncio.run(main())

asyncio.wait() + run_until_complete()

async def main(name):
await asyncio.sleep(1)
print('... World!', name)

task_list = [main(i) for i in range(10)]
task_list_run = asyncio.wait(task_list)

# 处理结果,处理状态
result, pendings = loop.run_until_complete(task_list_run)
print(result)
loop.close()

后台任务

background_tasks()

background_tasks = set()

for i in range(10):
task = asyncio.create_task(some_coro(param=i))

# Add task to the set. This creates a strong reference.
background_tasks.add(task)

# To prevent keeping references to finished tasks forever,
# make each task remove its own reference from the set after
# completion:
task.add_done_callback(background_tasks.discard)

同步阻塞的问代码如何处理

官方文档

一些同步高io或者cpu耗时的函数,可以通过多线程或者多进程解决

import asyncio
import concurrent.futures

def blocking_io():
# 这是一个高耗时的IO任务
# 一般使用线程池
with open('/dev/urandom', 'rb') as f:
return f.read(100)

def cpu_bound():
# 这是一个高耗时的CPU计算任务
# 使用进程池
return sum(i * i for i in range(10 ** 7))

async def main():
loop = asyncio.get_running_loop()

# 还可以通过设置默认的处理形式来让执行高耗时的任务
# loop.set_default_executor(executor)

## Options:

# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)

# 2. Run in a custom thread pool:
# 开启一个线程池来运行该任务
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)

# 3. Run in a custom process pool:
# 开启一个进程池来执行该任务
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)

asyncio.run(main())

这个方法返回一个 asyncio.Future 对象。

常见错误

run() - Event loop is closed

这是一个只会出现在 window 的错误,因为windows平台自身对事件处理的机制比较敏感?反正具体原因就是过早的关闭了事件循环,导致函数报错

Windows上的一个已知问题(请参见https://github.com/encode/httpx/issues/914)

  • 解决1(官方)

    要避免此错误,只需将事件循环策略设置为WindowsSelectorEventLoopPolicy

    if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  • 解决2

    # 使用 get_event_loop 代替 run
    asyncio.get_event_loop().run_until_complete(xxxxx)