APScheduler调度器实现定时任务
APScheduler (Advanced Python Scheduler) 是一个功能强大的Python库,可以用来调度任何可调用的对象,如函数、方法等。
1.安装:
pip install apscheduler
2.新建定时任务目录和代码文件
在你的应用目录下创建一个新的文件夹 management/commands,然后在这个文件夹里创建一个名为 schedule.py 的文件。
schedule.py内代码如下
# myapp/management/commands/schedule.py
from django.core.management.base import BaseCommand
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_events
from myapp.tasks import my_scheduled_job # 假设你已经在myapp/tasks.py中定义了这个函数
class Command(BaseCommand):
help = "Runs APScheduler."
def handle(self, *args, **options):
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加定时任务
scheduler.add_job(
my_scheduled_job,
trigger=CronTrigger(second="*/10"), # 每10秒执行一次
id="my_scheduled_job", # 任务唯一ID
max_instances=1,
replace_existing=True,
)
register_events(scheduler)
scheduler.start()
print("Scheduler started...")
try:
# 这里可以用其他方式保持程序运行,比如监听信号
while True:
pass
except (KeyboardInterrupt, SystemExit):
# 不要直接使用 scheduler.shutdown(),否则会关闭后台线程
scheduler.remove_all_jobs()
scheduler.shutdown()
print("Scheduler shut down successfully!")
2. 定义任务
新建 myapp/tasks.py
文件中定义你的任务函数。例如:
# myapp/tasks.py
def my_scheduled_job():
print("定时任务正在运行")
# 在这里添加你的业务逻辑