正在进入ing...

FastAPI 实现定时任务

发布时间:2024-01-21 浏览量: 2185 文章分类: python

FastAPI 实现定时任务

关于定时任务,我基本都是使用apscheduler,详情可以翻我之前的文章,我有介绍这个框架如何在Django或者python中单独使用的教程。

这次在FastAPI中也是直接整合吧,非常方便。

首先安装这些我就都不讲了,网上很多都是很老的方法,我目前理解的使用事件监听方法add_event_handler来实现的。

直接上代码吧

from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import datetime
import uvicorn
from typing import Callable

app = FastAPI()

# 初始化调度器
scheduler = AsyncIOScheduler()


def startup(app: FastAPI) -> Callable:
    async def app_start() -> None:
        # 定时任务
        scheduler.add_job(
            lambda: print("定时任务执行时间:", datetime.datetime.now()), "interval", seconds=10
        )
        scheduler.start()

    return app_start


def stopping(app: FastAPI) -> Callable:
    async def app_stop() -> None:
        # 不停止调度器,直到所有任务完成
        await scheduler.shutdown()

    return app_stop


# 添加事件监听
app.add_event_handler("startup", startup(app))
app.add_event_handler("shutdown", stopping(app))


@app.get("/")
async def read_root():
    return {"message": "Hello, World!"}


# 运行应用
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)