装饰器实现定时任务的方法可查看:FastApi定时任务发送钉钉消息
发现部署到服务器运行时同一时间会执行多次

通过日志可以看到同一时间执行了5次

这是因为使用gunicorn启动服务时设置的 workers 进程数是5个

from multiprocessing import cpu_count
​
reload_engine = 'inotify'
# //绑定与Nginx通信的端
bind = '0.0.0.0:8000'
daemon = True  # 守护进程
​
workers = cpu_count() * 2 + 1  # 进程数
​
worker_class = 'gevent'  # 默认为阻塞模式,最好选择gevent模式,默认的是sync模式
# 维持TCP链接
keepalive = 6
timeout = 65
graceful_timeout = 10
worker_connections = 65535
# 日志级别
loglevel = 'info'
# 访问日志路径
accesslog = '/www/wwwlogs/gunicorn_access.log'
# 错误日志路径
errorlog = '/www/wwwlogs/gunicorn_error.log'
# 设置gunicorn访问日志格式,错误日志无法设置
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s %(f)s" "%(a)s"'

解决办法

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# 创 建 人: 李先生
# 文 件 名: tasks.py
# 创建时间: 2022/9/29 0029 20:32
# @Version:V 0.1
# @desc :
import asyncio
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union
from aioredis import Redis, create_redis_pool
​
from public.log import logger
​
NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]
​
redis = None # 这是redis全局变量
def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
​
    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        is_coroutine = asyncio.iscoroutinefunction(func)
​
        @wraps(func)
        async def wrapped() -> None:
            repetitions = 0  # 限制定时任务执行次数
            global redis
            if not redis: # redis为None就重新连接
                redis = await create_redis_pool(f"redis://:@127.0.0.1:6379/0", password="123456", encoding="utf-8")
​
            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        lock = await redis.get(key="LOCK") # 查看是否有lock标记
                        if not lock: # 没有就执行定时任务
                            await redis.setex(key="LOCK", value="lock", seconds=seconds) # 任务开始前先设置lock标记
                            if is_coroutine:
                                # 以协程方式执行
                                await func()  # type: ignore
                            else:
                                # 以线程方式执行
                                await run_in_threadpool(func)
                            repetitions += 1
                        else:
                            logger.info(f"多个进程同一时间多次执行定时任务的限制==>{lock}") # 未执行定时任务log打印
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)
​
            ensure_future(loop())
​
        return wrapped
​
    return decorator

然后再次执行定时任务

看样子是解决了~

原文地址:http://www.cnblogs.com/changqing8023/p/16823253.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性