Nonebot2插件高级

一、 工作流程

1、 概念

hook钩子函数,它们可以在Nonebot处理事件的不同时刻进行拦截、修改或者扩展。在Nonebot中,事件钩子函数分为事件预处理、运行预处理、运行后预处理和事件后处理

MatcherMatcher 并不是一个具体的实例 instance,而是一个具有特定属性的类 class。只有当 Matcher 响应事件时,才会实例化为具体的 instance,也就是 matchermatcher 可以认为是 NoneBot 处理 Event 的基本单位,运行 matcher 是 NoneBot 工作的主要内容。

handler:事件处理函数,它们可以认为是 NoneBot 处理 Event 的最小单位。在不考虑 hook 的情况下,运行 matcher 就是顺序运行 matcher.handlers,这句话换种表达方式就是,handler 只有添加到 matcher.handlers 时,才可以参与到 NoneBot 的工作中来。

2、 简介

NoneBot2 是一个可扩展的 Python 异步机器人框架,它会对机器人收到的事件进行解析和处理,并以插件化的形式,按优先级分发给事件所对应的事件响应器,来完成具体的功能。

在实际应用中,NoneBot 会充当一个高性能,轻量级的 Python 微服务框架。协议端可以通过 http、websocket 等方式与之通信,这个通信往往是双向的:一方面,协议端可以上报数据给 NoneBot,NoneBot 会处理数据并返回响应给协议端;另一方面,NoneBot 可以主动推送数据给协议端。而 NoneBot 便是围绕双向通信进行工作的。

Nonebot启动之前的准备工作:

  1. 运行nonebot.init(),读取配置文件,初始化Nonebot和Driver对象
  2. 注册协议适配器Adapter
  3. 加载插件

准备工作完成后,Nonebot会利用异步进行启动,并且会运行on_startup钩子函数

随后,协议端与Nonebot进行连接,Driver将数据提交给Adapter,然后,实例化Bot,Nonebot会利用Bot开始工作:

  1. 事件处理:Bot会将协议端上报的数据转换为Event事件,之后Nonebot根据一套既定的流程来处理事件
  2. 调用API:在事件处理的过程中,Nonebot可以通过Bot调用协议端指定的API来获取更多数据,或者反馈响应给协议端;Nonebot也可以通过调用API向协议端主动请求数据获取主动推送数据

3、 事件处理

首先,我们来看一下事件处理的流程图:

总的可以分为三个阶段:

  1. Driver 接收上报数据

    协议端会通过 websocket 或 http 等方式与 NoneBot 的后端驱动 Driver 连接,协议端上报数据后,Driver 会将原始数据交给 Adapter 处理

  2. Adapter 处理原始数据

    • Adapter 检查授权许可,并获取 self-id 作为唯一识别 id
    • 根据 self-id 实例化 Adapter 相应的 Bot
    • 根据 Event Model 将原始数据转化为 NoneBot 可以处理的 Event 对象
    • BotEvent 交由 NoneBot 进一步处理
  3. Nonebot 处理Event

    • 执行事件预处理 hook
  • 按优先级升序选出同一优先级的 Matcher
    • 根据 Matcher 定义的 Rule、Permission 判断是否运行
  • 实例化 matcher 并执行运行预处理 hook
    • 顺序运行 matcher 的所有 handlers
  • 执行运行后处理 hook
    • 判断是否停止事件传播
  • 执行事件后处理 hook

同时,Nonebot中还有一些特殊异常处理,可以自行到官网查看:https://v2.nonebot.dev/docs/advanced/#%E7%89%B9%E6%AE%8A%E5%BC%82%E5%B8%B8%E5%A4%84%E7%90%86

4、 调用协议端接口

NoneBot 调用 API 会有如下过程:

  1. 调用 calling_api_hook 预处理钩子
  2. adapter 将信息处理为原始数据,并转交 driverdriver 交给协议端处理
  3. driver 接收协议端的结果,交给adapter 处理之后将结果反馈给 NoneBot
  4. 调用 called_api_hook 后处理钩子

一般来说,我们可以用 bot.* 来调用 API(*是 APIaction 或者 endpoint)。

对于发送消息而言,一方面可以调用既有的 API ;另一方面 NoneBot 实现了两个便捷方法,bot.send(event, message, **kwargs) 方法和可以在 handler 中使用的 Matcher.send(message, **kwargs) 方法,来向事件主体发送消息。

二、 定时任务

1、 安装插件

这个插件是在APScheduler的基础上进行开发的,如果需要更加详细的使用方式,可以通过官方文档进行学习:https://apscheduler.readthedocs.io/en/3.x/,后面我会加一篇关于这个库的使用方式,欢迎大家关注哦!

这个定时任务的实现,是通过一个Nonebot插件来实现的

nb plugin install nonebot_plugin_apscheduler

使用poetry来安装:

poetry add nonebot-plugin-apscheduler

2、 快速使用

  1. 在需要设置定时任务的插件中,通过 nonebot.require 声明插件依赖
  2. 从 nonebot_plugin_apscheduler 导入 scheduler 对象
  3. 在该对象的基础上,根据 APScheduler 的使用方法进一步配置定时任务
from nonebot import require


require("nonebot_plugin_apscheduler")


from nonebot_plugin_apscheduler import scheduler


@scheduler.scheduled_job("cron", hour="*/2", id="xxx", args=[1], kwargs={"arg2": 2})
async def run_every_2_hour(arg1, arg2):
    pass


scheduler.add_job(run_every_day_from_program_start, "interval", days=1, id="xxx")  # 使用这种方法添加也可以

可以使用装饰器来添加定时任务,也可以通过函数来添加定时任务

3、 配置插件

根据项目的 .env 文件设置,向 .env.*bot.py 文件添加 nonebot_plugin_apscheduler 的可选配置项

apscheduler_autostart

  • 类型:bool
  • 默认值:True
  • 是否自动启动APScheduler

apscheduler_config

  • 类型:dict
  • 默认值:{"apscheduler.timezone": "Asia/Shanghai"}
  • APScheduler 相关配置。修改/增加其中配置项需要确保 prefix: apscheduler

三、 匹配规则

1、 创建规则

匹配规则可以是一个 Rule 对象,也可以是一个 RuleChecker 类型。Rule 是多个 RuleChecker 的集合,只有当所有 RuleChecker 检查通过时匹配成功。RuleChecker 是一个返回值为 Bool 类型的依赖函数,即,RuleChecker 支持依赖注入。

2、 创建RuleCheck

async def user_checker(event: Event) -> bool:
    return event.get_user_id() == "123123"  # 检测触发事件的用户的id是否为123123


matcher = on_message(rule=user_checker)

3、 创建Rule

async def user_checker(event: Event) -> bool:
    return event.get_user_id() == "123123"  # 检测触发事件的用户的id是否为123123


async def message_checker(event: Event) -> bool:
    return event.get_plaintext() == "hello"  # 检测用户输入的内容是否为hello


rule = Rule(user_checker, message_checker)  # 将两个RuleCheck进行合并
matcher = on_message(rule=rule)  

使用rule参数来注册到事件响应器当中

4、 合并匹配规则

在定义匹配规则时,我们往往希望将规则进行细分,来更好地复用规则。而在使用时,我们需要合并多个规则。除了使用 Rule 对象来组合多个 RuleChecker 外,我们还可以对 Rule 对象进行合并

rule1 = Rule(foo_checker)
rule2 = Rule(bar_checker)


rule = rule1 & rule2
rule = rule1 & bar_checker
rule = foo_checker & rule2

Rule会自动忽略合并的None值

四、 权限控制

1、 注册使用

如同 Rule 一样,Permission 可以在定义事件响应器时添加 permission 参数来加以应用,这样 NoneBot2 会在事件响应时检测事件主体的权限

from nonebot.permission import SUPERUSER  # 检测是否为超级管理员发送的信息
from nonebot import on_command


matcher = on_command("测试超管", permission=SUPERUSER)

@matcher.handle()
async def _():
    await matcher.send("超管命令测试成功")

@matcher.got("key1", "超管提问")
async def _():
    await matcher.send("超管命令 got 成功")

PermissionRule 的表现并不相同, Rule 只会在初次响应时生效,在余下的对话中并没有限制事件;但是 Permission 会持续生效,在连续对话中一直对事件主体加以限制。

2、 主动调用

Permission 除了可以在注册事件响应器时加以应用,还可以在编写事件处理函数 handler 时主动调用,我们可以利用这个特性在一个 handler 里对不同权限的事件主体进行区别响应

from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent
from nonebot.adapters.onebot.v11 import GROUP_ADMIN, GROUP_OWNER

matcher = on_command("测试权限")

@matcher.handle()
async def _(bot: Bot, event: GroupMessageEvent):
    if await GROUP_ADMIN(bot, event):  # 判断是否为群管理员调用的命令
        await matcher.send("管理员测试成功")
    elif await GROUP_OWNER(bot, event):  # 判断是否为群主调用的命令
        await matcher.send("群主测试成功")
    else:
        await matcher.send("群员测试成功")

3、 自定义

如同 Rule 一样,Permission 也是由非负数个 PermissionChecker 组成的,但只需其中一个返回 True 时就会匹配成功

from nonebot.adapters import Bot, Event
from nonebot.permission import Permission

async def async_checker(bot: Bot, event: Event) -> bool:
    
    return True
def sync_checker(bot: Bot, event: Event) -> bool:
    return True


def check(arg1, arg2):
    
    async def _checker(bot: Bot, event: Event) -> bool:
        return bool(arg1 + arg2)
    
    return Permission(_checker)  # 使用函数封装来生成一个自定义权限控制

# 组合使用
from nonebot.permission import Permission

Permission(async_checker1) | sync_checker | async_checker2

同样地,如果想用 Permission(*checkers) 包裹构造 Permission,函数必须是异步的;但是在利用 |(或符号)连接构造时,NoneBot2 会自动包裹同步函数为异步函数

五、 钩子函数

1、 全局钩子函数

全局钩子函数是指 NoneBot2 针对其本身运行过程的钩子函数。

这些钩子函数是由其后端驱动 Driver 来运行的,故需要先获得全局 Driver 对象:

from nonebot import get_driver

driver=get_driver()

1.1 启动准备

这个钩子函数会在Nonebot启动时运行

@driver.on_startup
async def do_something():
    pass

1.2 终止处理

这个钩子函数会在Nonebot终止时运行

@driver.on_shutdown
async def do_something():
    pass

1.3 bot 连接处理

这个钩子函数会在 Bot 通过 websocket 连接到 NoneBot2 时运行。

@driver.on_bot_connectasync 
def do_something(bot: Bot):    
    pass

1.4 bot 断开处理

这个钩子函数会在 Bot 断开与 NoneBot2 的 websocket 连接时运行。

@driver.on_bot_disconnectasync 
def do_something(bot: Bot):    pass

1.5 api调用钩子

这个钩子函数会在 Bot 调用 API 时运行。

from nonebot.adapters import Bot

@Bot.on_calling_apiasync 
def handle_api_call(bot: Bot, api: str, data: Dict[str, Any]):    
    pass

1.6 api调用后钩子

这个钩子函数会在 Bot 调用 API 后运行。

from nonebot.adapters import Bot

@Bot.on_called_apiasync 
def handle_api_result(bot: Bot, exception: Optional[Exception], api: str, data: Dict[str, Any], result: Any):    
    pass

2、 事件钩子函数

2.1 事件预处理

这个钩子函数会在 Event 上报到 NoneBot2 时运行

from nonebot.message import event_preprocessor

@event_preprocessorasync 
def do_something():    
    pass

2.2 事件后处理

这个钩子函数会在 NoneBot2 处理 Event 后运行

from nonebot.message import event_postprocessor

@event_postprocessorasync 
def do_something():    
    pass

2.3 运行预处理

这个钩子函数会在 NoneBot2 运行 matcher 前运行。

from nonebot.message import run_preprocessor

@run_preprocessorasync 
def do_something():    
    pass

2.4 运行后处理

这个钩子函数会在 NoneBot2 运行 matcher 后运行。

from nonebot.message import run_postprocessor

@run_postprocessorasync 
def do_something():    
    pass

六、 依赖注入

1、 简介

在软件工程中,依赖注入(dependency injection)的意思为,给予调用方它所需要的事物。 “依赖”是指可被方法调用的事物。依赖注入形式下,调用方不再直接使用“依赖”,取而代之是“注入” 。“注入”是指将“依赖”传递给调用方的过程。在“注入”之后,调用方才会调用该“依赖。 传递依赖给调用方,而不是让让调用方直接获得依赖,这个是该设计的根本需求。

依赖注入往往起到了分离依赖和调用方的作用,这样一方面能让代码更为整洁可读,一方面可以提升代码的复用性。

2、 使用依赖注入

from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends
from nonebot.adapters.onebot.v11 import MessageEvent


test = on_command("123")


async def depend(event: MessageEvent): # 2.编写依赖函数
    return {"uid": event.get_user_id(), "nickname": event.sender.nickname}


@test.handle()
async def _(x: dict = Depends(depend)): # 3.在事件处理函数里声明依赖项
    print(x["uid"], x["nickname"])

依赖注入流程:

  1. 引用 Depends
  2. 编写依赖函数。依赖函数和普通的事件处理函数并无区别,同样可以接收 bot, event, state 等参数,你可以把它当作一个普通的事件处理函数,但是去除了装饰器(没有使用 matcher.handle() 等来装饰),并且可以返回任何类型的值。
  3. 在这里我们接受了 event,并以 onebotMessageEvent 作为类型标注,返回一个新的字典,包括 uidnickname 两个键值。
  4. 在事件处理函数中声明依赖项。依赖项必须要 Depends 包裹依赖函数作为默认值。

事实上,bot、event、state 它们本身只是依赖注入的一个特例,它们无需声明这是依赖即可注入。

虽然声明依赖项的方式和其他参数如 bot, event 并无二样,但他的参数有一些限制,必须是可调用对象,函数自然是可调用对象,类和生成器也是。

一般来说,当接收到事件时,NoneBot2 会进行以下处理:

  1. 准备依赖函数所需要的参数
  2. 调用依赖函数并获得返回值
  3. 将返回值作为事件处理函数中的参数值传入

3、 依赖缓存

在使用 Depends 包裹依赖函数时,有一个参数 use_cache ,它默认为 True ,这个参数会决定 Nonebot2 在依赖注入的处理中是否使用缓存。

import random
from nonebot import on_command
from nonebot.params import Depends


test = on_command("123")


async def always_run():
    return random.randint(1, 100)


@test.handle()
async def _(x: int = Depends(always_run, use_cache=False)):
    print(x)

缓存是针对单次事件处理来说的,在事件处理中 Depends 第一次被调用时,结果存入缓存,在之后都会直接返回缓存中的值,在事件处理结束后缓存就会被清除。

当使用缓存时,依赖注入会这样处理:

  1. 查询缓存,如果缓存中有相应的值,则直接返回
  2. 准备依赖函数所需要的参数
  3. 调用依赖函数并获得返回值
  4. 将返回值存入缓存
  5. 将返回值作为事件处理函数中的参数值传入

我们在编写依赖函数时,可以简单的使用同步函数

4、 类和生成器

4.1 类作为依赖

from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends
from nonebot.adapters.onebot.v11 import MessageEvent


test = on_command("123")


class DependClass: # 2.编写依赖类
    def __init__(self, event: MessageEvent):
        self.uid = event.get_user_id()
        self.nickname = event.sender.nickname


@test.handle()
async def _(x: DependClass = Depends(DependClass)): # 3.在事件处理函数里声明依赖项
    print(x.uid, x.nickname)

依然可以用三步说明如何用类作为依赖项:

  1. 引用 Depends
  2. 编写依赖类。类的 __init__ 函数可以接收 bot, event, state 等参数,在这里我们接受了 event,并以 onebotMessageEvent 作为类型标注。
  3. 在事件处理函数中声明依赖项。当用类作为依赖项时,它会是一个对应的实例,在这里 x 就是 DependClass 实例。

4.2 生成器作为依赖

import httpx
from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends


test = on_command("123")


async def get_client(): # 2.编写异步生成器函数
    async with httpx.AsyncClient() as client:
        yield client
    print("调用结束")


@test.handle()
async def _(x: httpx.AsyncClient = Depends(get_client)): # 3.在事件处理函数里声明依赖项
    resp = await x.get("https://v2.nonebot.dev")
    # do something

我们用 yield 代码段作为生成器函数的“返回”,在事件处理函数里用返回出来的 client 做自己需要的工作。在 NoneBot2 结束事件处理时,会执行 yield 之后的代码

4.3 类函数作为依赖

from typing import Type
from nonebot.log import logger
from nonebot.params import Depends # 1.引用 Depends
from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent


test = on_command("123")


class EventChecker: # 2.编写需要的类
    def __init__(self, EventClass: Type[MessageEvent]):
        self.event_class = EventClass


    def __call__(self, event: MessageEvent) -> bool:
        return isinstance(event, self.event_class)


checker = EventChecker(GroupMessageEvent) # 3.将类实例化


@test.handle()
async def _(x: bool = Depends(checker)): # 4.在事件处理函数里声明依赖项
    if x:
        print("这是群聊消息")
    else:
        print("这不是群聊消息")

这是判断 onebot 的消息事件是不是群聊消息事件的一个例子,我们可以用四步来说明这个例子:

  1. 引用 Depends
  2. 编写需要的类。类的 __init__ 函数接收参数 EventClass,它将接收事件类本身。类的 __call__ 函数将接受消息事件对象,并返回一个 bool 类型的判定结果。
  3. 将类实例化。我们传入群聊消息事件作为参数实例化 checker
  4. 在事件处理函数里声明依赖项。NoneBot2 将会调用 checker__call__ 方法,返回给参数 x 相应的判断结果
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性