在FastAPI中,通过ConnectionManager类创建WebSocket会话与直接创建WebSocket会话的主要区别在于管理和维护活动连接的能力。
如果你直接创建WebSocket会话,那么你需要自己管理所有的活动连接,包括添加新的连接,删除旧的连接,以及在需要时向特定的连接发送消息。
而ConnectionManager类则为你提供了这些功能。在ConnectionManager中,所有的活动连接都被存储在active_connections列表中。当一个新的WebSocket连接被接受时,它会被添加到这个列表中。当一个WebSocket连接被断开时,它会从这个列表中删除。此外,ConnectionManager还提供了send_personal_message方法,允许你向特定的WebSocket连接发送消息。
因此,使用ConnectionManager类可以让你更方便地管理和维护WebSocket连接,而无需自己编写大量的代码。
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
async def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
manager = ConnectionManager()
创建WS链接时
@site_app.websocket("/YOURPATH")
async def websocket_endpoint(websocket: WebSocket, order_id: str):
await manager.connect(websocket)
try:
#do something.....
except WebSocketDisconnect:
await manager.disconnect(websocket)