由于之前一直未深入去了解过关于fastapi中websocket多进程问题,由于之前的测试有可能都是但进程的方式进行启动测试,即便有时候是多进程的方式启动,但是巧合的是估计刚好用户都注册到同一个进程上面了,所以两户之间通信是没啥问题。

刚好一位“老友粉”遇到这种情况问题,索性抽空实践一番。

1 老友的问题描述:

  • 线上生产环境使用的多进程的方式部署启动fastapi服务
  • 然后需要需要通过一个后台发送HTTP请求之后处理后通过服务端webscoket向客户端websocket进行一次发送数据。

2 问题现象:

  • 通过后台发送HTTP请求之后处理后端服务端webscoket发送数据,但是客户端没收到?

3 问题的分析

起初我以为fock出来的子进程内部会通过某种机制进行内部的通信,结果自己太嫩了!理解错了!悲催~哈哈

我们都知道多进程的情况下,每个启动的进程有自己独立的存储空间。所以此时我们的某个进程下的连接的管理对象,其实是不存在数据。

4 问题的图解

 

 

5 问题原因

基于独立内存空间下的,我们的每个进程中的保存的客户端连接的对象也是独享的,所以只能另辟蹊径!

6 使用消息队列机制

PS:为了简单使用了redis的消息队列机制

一开始我所能想到就是利于消息的发布和订阅机制处理,确保所有的进程都会进行消息订阅,这样就可以达到每个进行收到消息的时候都会执行相关的消息了!

其实思路是对滴!自己实践一番之后,所有有了以下的一些总结:

主要处理思路:

1:每个进行启动的时候,都进行一个消息的机制的订阅 2:http进行接口请求的时候,进行消息发布 3:再消息消费的时候,进行调用进行自身的消息广播机制 4:如果你想延伸的,还可以跨进行的进行连接的同时备份(但是这个还没实践,仅仅是想法,想来这样肯定会有问题,所以不推荐)

7 最终的代码示例:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.endpoints import WebSocketEndpoint
from fastapi.responses import HTMLResponse
from enum import Enum
from typing import Any, Dict, List, Optional
import asyncio

app = FastAPI()


import redis
import aioredis
import os

class ConnectionManager:
    def __init__(self):
        # 保存当前所有的链接的websocket对象
        # self.active_connections: List[WebSocket] = []
        self.active_connections = []

    async def connect(self, websocket: WebSocket):


        client = str(websocket)[1:-1].split(' ')[3]
        print("是后端还是兑换",client)
        await websocket.accept()
        # 添加到当前已链接成功的队列中进行管理
        self.active_connections.append(websocket)

    async def close(self, websocket: WebSocket):
        # 主动的断开的客户端的链接,不是抛出异常的方式断开
        await websocket.close()
        self.active_connections.remove(websocket)

    async def disconnect(self, websocket: WebSocket):
        # 从队列里面删除我们的已经断开链接的websocket对象
        self.active_connections.remove(websocket)
        # await websocket.close()

    async def send_personal_message(self, message: str, websocket: WebSocket):
        # 发现消息
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        # 循环变量给所有在线激活的链接发送消息-全局广播
        print("当前的用户链接数,",len(self.active_connections))
        for connection in self.active_connections:
            await connection.send_text(message)




@app.get("/test")
async def get34545():
    print("全局广播!!!PID", os.getpid())
    app.state.pubmessage.publish('message_channel_http', "我要全局广播!!!!!!!!!!")
    return '我要全局广播!'





@app.websocket_route("/ws/{user_id}", name="ws")
class EchoSever(WebSocketEndpoint):
    encoding: str = "text"
    session_name: str = ""
    count: int = 0

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 从args中提取对应的输入的参数信息
        print(args[0]['endpoint'])
        print(args[0]['path_params'])
        self.user_id: Optional[str] = args[0]['path_params'].get('user_id')

    # 开始有链接上来的时候对应的处理
    async def on_connect(self, websocket):
        await  app.state.manager.connect(websocket)
        print("进入房间的时候的pid",  os.getpid())
        await  app.state.manager.broadcast(f"游客: {self.user_id}进入了房间!")
        # await self.daojishi(websocket)

    # 客户端开始有数据发送过来的时候的处理
    async def on_receive(self, websocket, data):
        # timeout_count = getattr(websocket, 'timeout_count')
        # setattr(websocket, 'timeout_count', 0)
        print("说话时候的PID", os.getpid())
        await  app.state.manager.broadcast(f"游客:{self.user_id} 说》{data}")

    # 客户端断开链接的时候
    async def on_disconnect(self, websocket, close_code):
        # 进行全局的广播所有的在线链接的所有用户消息
        try:
            await  app.state.manager.disconnect(websocket)
            # 广播给其他所有在线的websocket
            await  app.state.manager.broadcast(f"游客: {self.user_id} 离开了聊天室")
        except ValueError:
            # 倒计时自动结束的之后,客户端再点击一次断开的时候异常处理!
            pass



@app.on_event('startup')
async def on_startup():

    # 异步redis消息的队列的处理机制
    # https://aioredis.readthedocs.io/en/v1.2.0/start.html
    pubmessage = await aioredis.create_redis( 'redis://localhost')
    await pubmessage.set("ceshi","我是测试数据")
    sadsa = await pubmessage.get("ceshi")
    print('读取测试数据,验证redis链接情况:',sadsa)
    print("读取测试数据,验证redis链接情况!!!PID", os.getpid())

    app.state.pubmessage = pubmessage

    # 执行消息订阅机制
    loop = asyncio.get_event_loop()
    loop.create_task(register_pubsub())



async def register_pubsub():
    pool = await aioredis.create_pool( 'redis://localhost',minsize=5, maxsize=10)
    async def reader(channel):
        # 进行消息的消费
        while (await channel.wait_message()):
            msg = await channel.get(encoding='utf-8')
            print("========================================>")
            print("全局的广播信息!!!essage in {}: {}".format(channel.name, msg))
            # 执行全局的消息广播
            await app.state.manager.broadcast(f"HTTP游客:接收到全局的广播信息!")

    with await pool as conn:
        # 执行消息注册
        await conn.execute_pubsub('subscribe', 'message_channel_http')
        channel = conn.pubsub_channels['message_channel_http']
        await reader(channel)  # wait for reader to complete
        await conn.execute_pubsub('unsubscribe', 'message_channel_http')

    # 加下面的的话就会容易断开!傻叉了!
    # pool.close()
    # await pool.wait_closed()



@app.on_event('startup')
async def on_startup():

    manager = ConnectionManager()
    # 设置发布者属性对象
    app.state.manager = manager
    # 设置任务渠道消费者


if __name__ == '__main__':
    import uvicorn
    # import threading
    # kkl =threading.Thread(target=doresubscribe)
    # kkl.start()


    uvicorn.run('wstest:app', host='0.0.0.0', port=9082, access_log=False, workers=2, use_colors=True)
    # uvicorn.run(app='wstest:app', host="127.0.0.1", port=8000, workers =5, reload=True, debug=True)

 

 

8 演示:

1:使用客户端连接我们的服务端上:

 

 

2:请求到我们的HTTP接口上进行广播处理:

http://127.0.0.1:9082/test

 

 

3:官网我们处于不同进行的情况的客户端接收信息的情况:

 

 

 

这样就可以完成跨进程的之间的处理了!打完收工!

转发自https://www.modb.pro/db/185487

 

原文地址:http://www.cnblogs.com/a00ium/p/16931133.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性