定时任务
定时任务
根据任务的的类型,可以分为阻塞的,非阻塞的两大类,而根据任务量,又可以分为重io,重cpu,或者轻量任务
常用的解决方案
- arq被认为是rq的简单、现代和高性能的继任者
- 基于 asyncio
apscheduler 94% 的测试覆盖率
- django-apscheduler
- flask-apscheduler
Celery
subprocess.Popen
APScheduler
- 安装
# 安装 redis
# 安装依赖
pip install apscheduler
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
REDIS_DB = {
"db": 1,
"host": "127.0.0.1"
}
def func(name):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now + f" Hello world, {name}")
interval_task = {
# 配置存储器
"jobstores": {
# 使用Redis进行存储
'default': RedisJobStore(**REDIS_DB)
},
# 配置执行器
"executors": {
# 使用进程池进行调度,最大进程数是10个
'default': ProcessPoolExecutor(10)
},
# 创建job时的默认参数
"job_defaults": {
'coalesce': False, # 是否合并执行
'max_instances': 3, # 最大实例数
}
}
scheduler = AsyncIOScheduler(**interval_task)
# 添加一个定时任务
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="desire_job", replace_existing=True)