我孤身走在路上, 石子在雾中发亮,夜很安静,荒原面对太空,星星互诉衷肠
FASTAPI建立WebSocket管理
FASTAPI建立WebSocket管理

FASTAPI建立WebSocket管理

在FastAPI中,通过ConnectionManager类创建WebSocket会话与直接创建WebSocket会话的主要区别在于管理和维护活动连接的能力。

如果你直接创建WebSocket会话,那么你需要自己管理所有的活动连接,包括添加新的连接,删除旧的连接,以及在需要时向特定的连接发送消息。

ConnectionManager类则为你提供了这些功能。在ConnectionManager中,所有的活动连接都被存储在active_connections列表中。当一个新的WebSocket连接被接受时,它会被添加到这个列表中。当一个WebSocket连接被断开时,它会从这个列表中删除。此外,ConnectionManager还提供了send_personal_message方法,允许你向特定的WebSocket连接发送消息。

因此,使用ConnectionManager类可以让你更方便地管理和维护WebSocket连接,而无需自己编写大量的代码。

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    async def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)


manager = ConnectionManager()

创建WS链接时

@site_app.websocket("/YOURPATH")
async def websocket_endpoint(websocket: WebSocket, order_id: str):
    await manager.connect(websocket)
          try:

          #do something.....

    except WebSocketDisconnect:
        await manager.disconnect(websocket)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

17 + = 20