fastapi   task.py 

# task.py
# 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249
import asyncio
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

from public.log import logger

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    """
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数
        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    """

    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        """
        将修饰函数转换为自身重复且定期调用的版本.
        """
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False

        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)

            ensure_future(loop())

        return wrapped

    return decorator
# task.py
# 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249
import asyncio
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

from public.log import logger

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    """
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数
        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    """

    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        """
        将修饰函数转换为自身重复且定期调用的版本.
        """
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False

        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)

            ensure_future(loop())

        return wrapped

    return decorator

main.py 

from fastapi import FastAPI

from tasks import repeat_task
from public.shares import shares

app = FastAPI()

@app.on_event('startup')
@repeat_task(seconds=60 * 5, wait_first=False)
def repeat_task_aggregate_request_records() -> None:
    # 发送钉钉消息
    shares()

shares.py

from datetime import datetime, date
import efinance as ef
from chinese_calendar import is_workday
import matplotlib.pyplot as plt
import time

from public.send_ding import send_ding
from public.log import logger, BASE_PATH, os


def shares():
    stock_code = "601069"
    year = date.today().year
    month = date.today().month
    day = date.today().day
    now_time = datetime.now()
    now_img = int(round(time.time() * 1000))
    weekday = date(year, month, day).strftime("%A")
    # 非法定节假日,周六、周日也需要去掉
    if (not is_workday(date(year, month, day)) or weekday in ["Saturday", "Sunday"]) and not make:
        logger.info(f"当前时间 {now_time} 休市日!!!")
        return
    start_time = datetime(year, month, day, 9, 15, 0)
    end_time = datetime(year, month, day, 15, 5, 0)
    am_time = datetime(year, month, day, 11, 35, 0)
    pm_time = datetime(year, month, day, 13, 00, 0)
    if (now_time < start_time or now_time > end_time or am_time < now_time < pm_time) and not make:
        logger.info(f"当前时间 {now_time} 未开盘!!!")
        return
    # 数据间隔时间为 1 分钟
    freq = 1
    # 获取最新一个交易日的分钟级别股票行情数据
    df = ef.stock.get_quote_history(stock_code, klt=freq)
    if df.empty:
        logger.info(f"当前时间 {now_time} 未获取到股票数据!!!")
        return
    # 绘制折线图
    logger.info(f"当前时间戳: {now_img}")
    plt.plot(df["开盘"].values, linewidth=1, color="red")
    plt.savefig(os.path.join(BASE_PATH, "media", f"Chart-{now_img}.jpg"), bbox_inches='tight')
    # 保存后需要清除图形,不然会重叠
    plt.clf()
    
    share_name = df["股票名称"].values[0]
    open_price = df["开盘"].values[0]
    new_price = df["收盘"].values[-1]
    new_time = df["日期"].values[-1]
    top_price = df["最高"].max()
    down_price = df["最低"].min()
    turnover = df["成交量"].sum()
    average = round(df["开盘"].mean(), 2)
    rise_and_fall = round(df["涨跌幅"].sum(), 2)
    rise_and_price = round(df["涨跌额"].sum(), 2)
    turnover_rate = round(df["换手率"].sum(), 2)
    # markdown 发送字体颜色
    rise_and_fall_color = "#FF0000" if rise_and_fall > 0 else "#00FF00"
    rise_and_price_color = "#FF0000" if rise_and_price > 0 else "#00FF00"
    new_price_color = "#FF0000" if new_price > open_price else "#00FF00"
    top_price_color = "#FF0000" if top_price > open_price else "#00FF00"
    down_price_color = "#FF0000" if down_price > open_price else "#00FF00"

    body = {
        "msgtype": "markdown",
        "markdown": {
            "title": share_name,
            "text": f"### {share_name}\n\n"
                    f"> **开盘价** <font>{open_price}</font> 元/股\n\n"
                    f"> **最高价** <font color={top_price_color}>{top_price}</font> 元/股\n\n"
                    f"> **最低价** <font color={down_price_color}>{down_price}</font> 元/股\n\n"
                    f"> **平均价** <font color=''>{average}</font> 元/股\n\n"
                    f"> **涨跌幅** <font color={rise_and_fall_color}>{rise_and_fall}</font> %\n\n"
                    f"> **涨跌额** <font color={rise_and_price_color}>{rise_and_price}</font> 元\n\n"
                    f"> **成交量** <font>{turnover}</font> 手\n\n"
                    f"> **换手率** <font>{turnover_rate}</font> %\n\n"
                    f"> **时间** <font>{new_time}</font>\n\n"
                    f"> **最新价** <font color={new_price_color}>{new_price}</font> 元/股\n\n"
                    f"> **状态** <font>开盘中</font> \n\n"
                    f"> **折线图:** ![screenshot](http://121.41.54.234/Chart-{now_img}.jpg) @15235514553\n\n"
        },
        "at": {
            "atMobiles": ["15235514553"],
            "isAtAll": False,
        }}
    send_ding(body)

send_ding.py

import hmac
import urllib.parse
import hashlib
import base64
import requests
import urllib3
import time
from public.log import logger

urllib3.disable_warnings()


def ding_sign():
    """
    发送钉钉消息加密
    :return:
    """
    timestamp = str(round(time.time() * 1000))
    secret = "xxx"
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign


def send_ding(body: dict):
    """
    发送钉钉消息
    :param body
    :return:
    """
    headers = {"Content-Type": "application/json"}
    access_token = "xxx"
    timestamp, sign = ding_sign()

    res = requests.post(
        "https://oapi.dingtalk.com/robot/send?access_token={}&timestamp={}&sign={}".format(
            access_token, timestamp, sign), headers=headers, json=body, verify=False).json()
    if res["errcode"] == 0 and res["errmsg"] == "ok":
        logger.info("钉钉通知发送成功!info:{}".format(body["text"]["content"]))
    else:
        logger.error("钉钉通知发送失败!返回值:{}".format(res))

 

原文地址:http://www.cnblogs.com/changqing8023/p/16817217.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性