asyncio
基础使用
主要模块
- 事件循环 - 管理所有时间
- Future对象 - 尚未完成的计算,还未完成的结果
- Task - Future对象的子类,用来并发运行多个任务
关键方法
几个重要函数比较
函数 | 传参 | 返回值 | 返回值顺序 | 函数意义 | |
---|---|---|---|---|---|
asyncio.gather | 可以传递多个协程或者Futures,函数会自动将协程包装成task,例如协程生成器。 | 包含Futures结果的list | 按照原始顺序排列 | 注重收集结果,等待一堆Futures并按照顺序返回结果 | |
asyncio.wait | a list of futures | 返回两个Future集合 (done, pending) | 无序(暂定) | 是一个协程等传给他的所有协程都运行完之后结束,并不直接返回结果 | |
asyncio.as_completed | a list of futures | 返回一个协程迭代器 | 按照完成顺序 | 返回的迭代器每次迭代只返回已经完成的Futures |
引入
import asyncio
执行异步任务
# python3.7+ 支持写法
asyncio.run(main())
# python3.6及以下版本写法
event_loop = asyncio.get_event_loop()
result, pending = event_loop.run_until_complete(asyncio.gather(main()))
event_loop.close()
关键方法
创建task
基础用例
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
延时执行
asyncio.sleep()
async def main():
await asyncio.sleep(1)
print('... World!')
coro = main()
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
loop.close()
异步执行
run_until_complete()
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
t = main()
task = asyncio.create_task(t)
# 3.6
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
loop.close()
# 3.7
asyncio.run(t)
res = task.result()
并发执行
create_task()
async def multiple():
async def test():
await asyncio.sleep(1)
task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test())
await task1
await task2
asyncio.run(multiple())
gather()
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
async def main():
task_list = set()
task1 = task_func()
task_list.add(task1)
task2 = task_func()
task_list.add(task2)
# this is also valid:
result_list = await asyncio.gather(
*task_list,
return_exceptions:bool, # 是否返回错误
)
return result_list
# 最终执行
asyncio.run(main())
asyncio.wait()
+ run_until_complete()
async def main(name):
await asyncio.sleep(1)
print('... World!', name)
task_list = [main(i) for i in range(10)]
task_list_run = asyncio.wait(task_list)
# 处理结果,处理状态
result, pendings = loop.run_until_complete(task_list_run)
print(result)
loop.close()
后台任务
background_tasks()
background_tasks = set()
for i in range(10):
task = asyncio.create_task(some_coro(param=i))
# Add task to the set. This creates a strong reference.
background_tasks.add(task)
# To prevent keeping references to finished tasks forever,
# make each task remove its own reference from the set after
# completion:
task.add_done_callback(background_tasks.discard)
同步阻塞的问代码如何处理
一些同步高io或者cpu耗时的函数,可以通过多线程或者多进程解决
import asyncio
import concurrent.futures
def blocking_io():
# 这是一个高耗时的IO任务
# 一般使用线程池
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# 这是一个高耗时的CPU计算任务
# 使用进程池
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
# 还可以通过设置默认的处理形式来让执行高耗时的任务
# loop.set_default_executor(executor)
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
# 开启一个线程池来运行该任务
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
# 开启一个进程池来执行该任务
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
这个方法返回一个 asyncio.Future
对象。
常见错误
run() - Event loop is closed
原因
这是一个只会出现在 window 的错误,因为windows平台自身对事件处理的机制比较敏感?反正具体原因就是过早的关闭了事件循环,导致函数报错
Windows上的一个已知问题(请参见https://github.com/encode/httpx/issues/914)