Skip to main content

文件上传

20200826153633171

基础使用

字段定义

from fastapi import FastAPI, File, Form, UploadFile

@app.post("/files/")
async def create_file(
file: bytes = File(),
fileb: UploadFile = File(),
token: str = Form(), # 这里定义了一个token的字符串参数,该参数是从form内获取的
):

常见方式

fastapi常见有两种上传文件类型:

  • file: bytes = File(...):bytes 类型,会直接进内存,适合临时文件
  • file: UploadFile = File(...):file 的类型区别,这种方式会混存到硬盘,还可以读取文件名等更多属性
from fastapi import File, UploadFile

# 这里的 file 是 bytes 类型,会直接进内存,适合临时文件
@app.route("/file")
async def file_upload(file: bytes = File(..., max_length=2097152)):
"""使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
with open("D:\\lufei.jpg", "wb") as f:
f.write(file)
return {"file_size": len(file)}


# 注意 file 的类型区别,这种方式会混存到硬盘,还可以读取文件名等更多属性
@app.route("/file2")
def create_file2(file: UploadFile = File(...)):
...

其中参数的名字也要和 <input type="file" name="xxx"> 中的名字对应上。

代码实例

小文件(bytes)

如果文件类型声明为 bytes,那么文件会直接缓存到内存中,可以直接用 io.BytesIO 打开读取。 如果文件类型定义为 UploadFile 类型,那么当文件过大的时候 FastAPI 会在硬盘中缓存文件。 使用 bytes 当然方便,但是如果每个用户都在上传 2G 大小的文件,可能你的内存一会儿就爆了。

所有数据都保存到内存中,比较快,但是遇到大文件将会出现非常明显的性能问题

# 小文件[单个]
@app.post("/files/")
async def file_upload(file: bytes = File(..., max_length=2097152)):
"""使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
with open("D:\\lufei.jpg", "wb") as f:
f.write(file)
return {"file_size": len(file)}

# 小文件[批量]
@app.post("/files/")
async def file_upload(file: List[bytes] = File(..., max_length=2097152)):
"""使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
return {"file_size": len(file)}

大文件(硬盘)

# 大文件[单个]
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
return {"filename": file.filename}

# 大文件[批量]
# 对应标签 <input name="files" type="file" multiple>
@app.post("/uploadfiles/")
async def create_upload_files(files: List[UploadFile]):
return {"filenames": [file.filename for file in files]}

大文件分割接收

每次读取的是10M数据,可自己调整,open函数是直接打开一个路径

  • 单文件
@app.post("/uploadfile/")
async def upload_file(tempfile: UploadFile = File(...)):
with open(f"/opt/{tempfile.filename}", 'wb') as f:
for i in iter(lambda : tempfile.file.read(1024*1024*10),b''):
f.write(i)
f.close()
return {"file_name":tempfile.filename}
  • 多文件
@app.post("/uploadfiles/")
async def upload_file(tempfiles: List[UploadFile] = File(...)):
for tempfile in tempfiles:
with open(f"/opt/{tempfile.filename}", 'wb') as f:
for i in iter(lambda : tempfile.file.read(1024*1024*10),b''):
f.write(i)
f.close()
return {"files_name":[x.filename x in tempfiles]}
  • 大文件【视频】上传
import uvicorn, cv2, os, datetime
from fastapi import FastAPI,UploadFile,File
from fastapi.responses import JSONResponse

@app.post("/uploadVideo",summary="上传视频",tags=["视频处理"])
async def getVideo(file: UploadFile = File(...)):
time_now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 当前时间字符串
temp_file_name = f"tempVideoFile_{time_now}.mp4" # 临时文件的文件名
batch_size = 10 * 2 ** 20 # 每次写入文件的数据大小,这里代表10 MiB
with open(temp_file_name, 'wb') as f: # 分批写入数据
# 从网络文件流分批读取数据到 b'',再写入文件
for i in iter(lambda: file.file.read(batch_size), b''):
f.write(i)

file_size = os.path.getsize(temp_file_name) # 统计文件大小

cap = cv2.VideoCapture(temp_file_name) # 读取视频数据
fps = cap.get(cv2.CAP_PROP_FPS) # 统计视频的帧率
total_s = cap.get(cv2.CAP_PROP_FRAME_COUNT) # 统计视频的帧数
total_time = total_s/fps # 计算视频的时长

断点续传

import base64, re, os
import shutil
import stat
from email.utils import formatdate
from mimetypes import guess_type
from pathlib import Path
from urllib.parse import quote

import aiofiles
from fastapi import Body, FastAPI, File, Path as F_Path, Request, UploadFile
from starlette.responses import StreamingResponse

app = FastAPI(docs_url="/docs")

base_dir = os.path.dirname(os.path.abspath(__file__))
upload_file_path = Path(base_dir, './uploads')


@app.post("/file-slice")
async def upload_file(
request: Request,
identifier: str = Body(..., description="文件唯一标识符"),
number: str = Body(..., description="文件分片序号(初值为0)"),
file: UploadFile = File(..., description="文件")
):
"""文件分片上传"""
path = Path(upload_file_path, identifier)
if not os.path.exists(path):
os.makedirs(path)
file_name = Path(path, f'{identifier}_{number}')
if not os.path.exists(file_name):
async with aiofiles.open(file_name, 'wb') as f:
await f.write(await file.read())
return {
'code': 1,
'chunk': f'{identifier}_{number}'
}


@app.put("/file-slice")
async def merge_file(
request: Request,
name: str = Body(..., description="文件名称(不含后缀)"),
file_type: str = Body(..., description="文件类型/后缀"),
identifier: str = Body(..., description="文件唯一标识符")
):
"""合并分片文件"""
target_file_name = Path(upload_file_path, f'{name}.{file_type}')
path = Path(upload_file_path, identifier)
try:
async with aiofiles.open(target_file_name, 'wb+') as target_file: # 打开目标文件
for i in range(len(os.listdir(path))):
temp_file_name = Path(path, f'{identifier}_{i}')
async with aiofiles.open(temp_file_name, 'rb') as temp_file: # 按序打开每个分片
data = await temp_file.read()
await target_file.write(data) # 分片内容写入目标文件
except Exception as e:
return {
'code': 0,
'error': f'合并失败:{e}'
}
shutil.rmtree(path) # 删除临时目录
return {
'code': 1,
'name': f'{name}.{file_type}'
}


@app.get("/file-slice/{file_name}")
async def download_file(request: Request, file_name: str = F_Path(..., description="文件名称(含后缀)")):
"""分片下载文件,支持断点续传"""
# 检查文件是否存在
file_path = Path(upload_file_path, file_name)
if not os.path.exists(file_path):
return {
'code': 0,
'error': '文件不存在'
}
# 获取文件的信息
stat_result = os.stat(file_path)
content_type, encoding = guess_type(file_path)
content_type = content_type or 'application/octet-stream'
# 读取文件的起始位置和终止位置
range_str = request.headers.get('range', '')
range_match = re.search(r'bytes=(\d+)-(\d+)', range_str, re.S) or re.search(r'bytes=(\d+)-', range_str, re.S)
if range_match:
start_bytes = int(range_match.group(1))
end_bytes = int(range_match.group(2)) if range_match.lastindex == 2 else stat_result.st_size - 1
else:
start_bytes = 0
end_bytes = stat_result.st_size - 1
# 这里 content_length 表示剩余待传输的文件字节长度
content_length = stat_result.st_size - start_bytes if stat.S_ISREG(stat_result.st_mode) else stat_result.st_size
# 构建文件名称
name, *suffix = file_name.rsplit('.', 1)
suffix = f'.{suffix[0]}' if suffix else ''
filename = quote(f'{name}{suffix}') # 文件名编码,防止中文名报错
# 打开文件从起始位置开始分片读取文件
return StreamingResponse(
file_iterator(file_path, start_bytes, 1024 * 1024 * 1), # 每次读取 1M
media_type=content_type,
headers={
'content-disposition': f'attachment; filename="{filename}"',
'accept-ranges': 'bytes',
'connection': 'keep-alive',
'content-length': str(content_length),
'content-range': f'bytes {start_bytes}-{end_bytes}/{stat_result.st_size}',
'last-modified': formatdate(stat_result.st_mtime, usegmt=True),
},
status_code=206 if start_bytes > 0 else 200
)


def file_iterator(file_path, offset, chunk_size):
"""
文件生成器
:param file_path: 文件绝对路径
:param offset: 文件读取的起始位置
:param chunk_size: 文件读取的块大小
:return: yield
"""
with open(file_path, 'rb') as f:
f.seek(offset, os.SEEK_SET)
while True:
data = f.read(chunk_size)
if data:
yield data
else:
break


if __name__ == '__main__':
import uvicorn
uvicorn.run(app=app, host="0.0.0.0", port=8000)