APScheduler调度器实现定时任务

APScheduler (Advanced Python Scheduler) 是一个功能强大的Python库,可以用来调度任何可调用的对象,如函数、方法等。

1.安装:

pip install apscheduler

2.新建定时任务目录和代码文件

在你的应用目录下创建一个新的文件夹 management/commands,然后在这个文件夹里创建一个名为 schedule.py 的文件。

xxx    xxx

schedule.py内代码如下

# myapp/management/commands/schedule.py
from django.core.management.base import BaseCommand
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_events
from myapp.tasks import my_scheduled_job  # 假设你已经在myapp/tasks.py中定义了这个函数

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        scheduler = BackgroundScheduler()
        scheduler.add_jobstore(DjangoJobStore(), "default")

        # 添加定时任务
        scheduler.add_job(
            my_scheduled_job,
            trigger=CronTrigger(second="*/10"),  # 每10秒执行一次
            id="my_scheduled_job",  # 任务唯一ID
            max_instances=1,
            replace_existing=True,
        )
        register_events(scheduler)

        scheduler.start()
        print("Scheduler started...")

        try:
            # 这里可以用其他方式保持程序运行,比如监听信号
            while True:
                pass
        except (KeyboardInterrupt, SystemExit):
            # 不要直接使用 scheduler.shutdown(),否则会关闭后台线程
            scheduler.remove_all_jobs()
            scheduler.shutdown()
            print("Scheduler shut down successfully!")

2. 定义任务

新建 myapp/tasks.py 文件中定义你的任务函数。例如:

# myapp/tasks.py
def my_scheduled_job():
    print("定时任务正在运行")
    # 在这里添加你的业务逻辑








Comment IconComment Icon
Icon
评论 0
时间↑ 点赞↓

Comment Icon