Skip to main content

定时任务

定时任务

根据任务的的类型,可以分为阻塞的,非阻塞的两大类,而根据任务量,又可以分为重io,重cpu,或者轻量任务

常用的解决方案

  • arq

    • arq被认为是rq的简单、现代和高性能的继任者
    • 基于 asyncio
  • apscheduler 94% 的测试覆盖率

    • django-apscheduler
    • flask-apscheduler
  • Celery

  • subprocess.Popen

APScheduler

  • 安装
# 安装 redis

# 安装依赖
pip install apscheduler
from datetime import datetime

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

REDIS_DB = {
"db": 1,
"host": "127.0.0.1"
}

def func(name):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now + f" Hello world, {name}")

interval_task = {
# 配置存储器
"jobstores": {
# 使用Redis进行存储
'default': RedisJobStore(**REDIS_DB)
},
# 配置执行器
"executors": {
# 使用进程池进行调度,最大进程数是10个
'default': ProcessPoolExecutor(10)
},
# 创建job时的默认参数
"job_defaults": {
'coalesce': False, # 是否合并执行
'max_instances': 3, # 最大实例数
}

}
scheduler = AsyncIOScheduler(**interval_task)
# 添加一个定时任务
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="desire_job", replace_existing=True)