Skip to main content

aiohttp

基础使用

官方文档

安装

pip install aiohttp

引入

import aiohttp, asyncio

使用规范

  • 不要为每个请求创建会话,会话内部包含一个连接池。连接重用和保持活动(默认情况下都打开)可以提高整体性能。

发起请求

官方示例

# 创建一个链接
session = aiohttp.ClientSession()

# 发送请求
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

# 如果不使用上下文调用,需要配合 .close()来实时关闭链接
await session.close()

为了使对同一个站点的多个请求更简单,可以使用ClientSession构造函数的base_url参数。例如,请求http://httpbin.org的不同端点可以使用以下代码:

base_url = 'http://httpbin.org'

async with aiohttp.ClientSession(base_url) as session:
async with session.get('/get'): # 相当于 'http://httpbin.org/get'
pass
async with session.post('/post', data=b'data'): # 相当于 'http://httpbin.org/post'
pass
async with session.put('/put', data=b'data'): # 相当于 'http://httpbin.org/put'
pass

get

# 不带参
async def get_v1(url:HttpUrl):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.status)
res = await resp.text()
print(res[:100])

# 带传参
async def get_v2(url, params):
""" 以下三种方式均可以 """
params = {'key1': 'value1', 'key2': 'value2'} # 一个键一个值
params = [('key', 'value1'), ('key', 'value2')] # 一个键多个值
params = 'key=value+1'
async with aiohttp.ClientSession() as sess:
async with sess.get(url, params=params) as resp:
assert str(resp.url) == expect
print(resp.status) # 本次响应的状态码

post

async def post_v1(url):
data = b'\x00Binary-data\x00' # 未经编码的数据通过bytes数据上传
data = 'text' # 传递文本数据
data = {'key': 'value'} # 传递form表单
async with aiohttp.ClientSession() as sess:
async with sess.post('http://httpbin.org/post', data=data) as resp:
print(resp.status)

# 复杂的 post 请求
async def post_v2():
payload = {'key': 'value'} # 传递 pyload
async with aiohttp.ClientSession() as sess:
async with sess.post('http://httpbin.org/post', json=payload) as resp:
print(resp.status)
print(await resp.text())
# 发送 json 格式的参数
async with aiohttp.ClientSession() as session:
async with session.post(url, json={'test': 'object'})

# 不使用的默认的json格式
import ujson
async with aiohttp.ClientSession(
json_serialize=ujson.dumps) as session:
await session.post(url, json={'test': 'object'})
  • 流式上传

如果您将文件对象作为数据参数传递,aiohttp 将自动将其流式传输到服务器。检查StreamReader 支持的格式信息。

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)
  • 指定content_type
url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
open('report.xls', 'rb'),
filename='report.xls',
content_type='application/vnd.ms-excel')

await session.post(url, data=data)

内容响应

async def get_v2(url, params):
""" 以下三种方式均可以 """
params = {'key1': 'value1', 'key2': 'value2'} # 一个键一个值
async with aiohttp.ClientSession() as sess:
async with sess.get('http://httpbin.org/get', params=params) as resp:
assert str(resp.url) == expect
print(resp.status) # 本次响应的状态码
print(await resp.text()) # 本次返回的结果
# await resp.text(encoding='windows-1251') # 配合encoding

# 二进制响应 和传输编码会自动为您解码gzip。deflate,
# 您可以启用brotli传输编码支持,只需安装 brotli。
print(await resp.read())

# json(小文件)
print(await resp.json()) # 可以为json()调用指定自定义编码和解码器功能。

# json (流文件)

下载文件

import aiohttp
from aiohttp import ClientResponse
from helper import print_dict # 个人工具库

async def download_as_chunk(
resp: ClientResponse, output_path: str, chunk_size: int = 1024
):
"""
一边下载一边保存

- param resp :{ClientResponse} 来自 aiohttp.resp
- param output_path :{str} {description}
- param chunk_size=1024 :{int} 每次使用多少内存

@returns `{}` {description}

"""
with open(output_path, "wb") as fd:
async for each_chunk in resp.content.iter_chunked(chunk_size):
fd.write(each_chunk)


async def download(resp: ClientResponse, output_path: str):
"""
一次性下载
"""
data_str = await resp.text()
with open(output_path, "w") as f:
f.write(data_str)


async def download_url(
url: str, output_path: str, timeout: int = 5 * 60, method: str = "get"
):
timeout = aiohttp.ClientTimeout(total=timeout)

async with aiohttp.ClientSession(timeout=timeout) as session:
method = session.get if method.lower() == "get" else session.post
async with method(url) as resp:
print_dict(resp.headers)

# 一边下载一边保存,节约内存
await download_as_chunk(resp, output_path)

# 一次性下载
# await download(resp, output_path)
  • 使用
if __name__ == "__main__":
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

url = r"https://packagecontrol.io/channel_v3.json"

run = download_url(url, "channel_v32.json")
asyncio.run(run)

上传文件

有时候,我们确实是有想服务器传文件的需求,eg:上传回执单;

上传图片...... 100张 10000张的量级的时候我们会想用多线程去处理,但量再大 你再使用 多线程+requests 的方式就会发现有大量的报错,若有类似的使用场景,可以用以下 case 处理

async def main():
""" 传递文件 """
files = {'file': open('report.xls', 'rb')}
async with aiohttp.ClientSession() as sess:
async with sess.post('http://httpbin.org/post', data=files) as resp:
print(resp.status)
print(await resp.text())


async def main2():
""" 实例化 FormData 可以指定 filename 和 content_type """
data = aiohttp.FormData()
data.add_field('file',
open('report.xls', 'rb'),
filename='report.xls',
content_type='application/vnd.ms-excel')
async with aiohttp.ClientSession() as sess:
async with sess.post('http://httpbin.org/post', data=data) as resp:
print(resp.status)
print(await resp.text())


async def main3():
""" 流式上传文件 """
async with aiohttp.ClientSession() as sess:
with open('report.xls', 'rb') as f:
async with sess.post('http://httpbin.org/post', data=f) as resp:
print(resp.status)
print(await resp.text())


async def main4():
""" 因为 content属性是 StreamReader(提供异步迭代器协议),
所以可以将 get 和 post 请求链接在一起。python3.6+能使用"""
async with aiohttp.ClientSession() as sess:
async with sess.get('http://python.org') as resp:
async with sess.post('http://httpbin.org/post', data=resp.content) as r:
print(r.status)
print(await r.text())

流式上传

aiohttp支持多种类型的流式上传,这使您可以发送大文件而无需将它们读入内存。

作为一个简单的案例,只需为您的 body 提供一个类似文件的对象:

with open('massive-body', 'rb') as f:
await session.post('http://httpbin.org/post', data=f)
  • 使用异步的生成器
# 首先生成读取文件的生成器
async def file_sender(file_name=None, chunk_size = 64*1024):
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(chunk_size)
while chunk:
yield chunk
chunk = await f.read(chunk_size)

# Then you can use file_sender as a data provider:
# 将生成器直接发送
url = 'http://httpbin.org/post'
async with session.post(
url,
data=file_sender(file_name='huge_file')) as resp:
print(await resp.text())

因为该content属性是一个 StreamReader(提供异步迭代器协议),所以您可以将 get 和 post 请求链接在一起:

resp = await session.get('http://python.org')
await session.post('http://httpbin.org/post',
data=resp.content)

常用配置

定义请求头

headers={'User-Agent': 'your agent'"refer":"http://httpbin.org"}

async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('http://httpbin.org/headers') as resp:
print(resp.status)
print(await resp.text())

URL规范化

例如URL('http://example.com/путь/%30?a=%31')

转换为 URL('http://example.com/%D0%BF%D1%83%D1%82%D1%8C/0?a=1')

await session.get(
URL('http://example.com/%30', encoded=True))

SSL验证警告问题

默认情况下,aiohttp对HTTPS协议使用严格检查,如果你不想上传SSL证书,可将ssl设置为False。

r = await session.get('https://example.com', ssl=False)

请求超时

有时候,我们向服务器发送请求,若没有设置超时时间,此请求就会一直阻塞直到系统报错,这对于我们的系统是无法容忍的,所以发请求的时候千万要记得加上超时时间。

timeout = aiohttp.ClientTimeout(
total:int=5*60 # 整个操作的最大秒数,包括建立连接、发送请求和读取响应。
connect:int=None # 如果超出池连接限制,
# 则建立新连接或等待池中的空闲连接的最大秒数。
sock_connect:int=None # 为新连接连接到对等点的最大秒数,不是从池中给出的。
sock_read:int=None # 从对等点读取新数据部分之间允许的最大秒数。
)
timeout = aiohttp.ClientTimeout(total=60)

async def main():
async with aiohttp.ClientSession(timeout=timeout) as sess:
async with sess.get('http://httpbin.org/get') as resp:
print(resp.status)
print(await resp.text())

定义cookie

cookies = {'cookies_are': 'working'}

async def main():
async with aiohttp.ClientSession(cookies=cookies) as session:
async with session.get('http://httpbin.org/cookies') as resp:
print(resp.status)
print(await resp.text())
assert await resp.json() == {"cookies": {"cookies_are": "working"}}

多请求共享cookie

async def main():
async with aiohttp.ClientSession() as session:
await session.get('http://httpbin.org/cookies/set?my_cookie=my_value')
filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
print(filtered)
assert filtered['my_cookie'].value == 'my_value'
async with session.get('http://httpbin.org/cookies') as r:
json_body = await r.json()
print(json_body)
assert json_body['cookies']['my_cookie'] == 'my_value'

Cookie 的安全性问题: 默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的 cookie,只能接受 DNS 解析IP产生的cookie。

可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)

使用虚假Cookie Jar: 有时不想处理cookie。这时可以在会话中使用aiohttp.DummyCookieJar来达到目的。

jar = aiohttp.DummyCookieJar()
session = aiohttp.ClientSession(cookie_jar=jar)

设置代理

# 第一种
async with aiohttp.ClientSession() as session:
proxy_auth = aiohttp.BasicAuth('user', 'pass')
async with session.get("http://python.org", proxy="http://proxy.com", proxy_auth=proxy_auth) as resp:
print(resp.status)
# 第二种
session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

aoihttp 连接池

  • 使用连接器

想要调整请求的传输层你可以为ClientSession及其同类组件传递自定义的连接器。例如:

conn = aiohttp.TCPConnector()
session = aiohttp.ClientSession(connector=conn)
  • 限制连接池的容量

限制同一时间打开的连接数可以传递limit参数:

conn = aiohttp.TCPConnector(limit=30)

这样就将总数限制在30,默认情况下是100.如果你不想有限制,传递0即可:

conn = aiohttp.TCPConnector(limit=0)

并发

import asyncio

async def fetch(session:ClientSession, params:dict, method:Literal["get", "post"] = "get"):
async with session.get(url, params=params) as resp:
return await resp.json()

async def main(urls)
async with aiohttp.ClientSession(headers=headers) as session:
task_list = [fetch(url) for __ in urls]
response_list = await asyncio.gather(*task_list)