Adds universal websocket manager any extension can use

Connect to the `ws://<your-lnbits>/api/v1/ws/{item_id}` endpoint

POST data to the websocket with `https://<your-lnbits>/api/v1/ws/{item_id}`
This commit is contained in:
ben
2022-11-23 21:47:26 +00:00
parent c8e18ac4be
commit 5a96bcd558

View File

@@ -12,7 +12,7 @@ from urllib.parse import ParseResult, parse_qs, urlencode, urlparse, urlunparse
import async_timeout import async_timeout
import httpx import httpx
import pyqrcode import pyqrcode
from fastapi import Depends, Header, Query, Request from fastapi import Depends, Header, Query, Request, WebSocket, WebSocketDisconnect
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
from fastapi.params import Body from fastapi.params import Body
from loguru import logger from loguru import logger
@@ -697,3 +697,47 @@ async def api_auditor(wallet: WalletTypeInfo = Depends(get_key_type)):
"delta_msats": delta, "delta_msats": delta,
"timestamp": int(time.time()), "timestamp": int(time.time()),
} }
##################UNIVERSAL WEBSOCKET MANAGER########################
class websocketConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket, item_id: str):
await websocket.accept()
websocket.id = item_id
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, item_id: str):
for connection in self.active_connections:
if connection.id == item_id:
await connection.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = websocketConnectionManager()
@core_app.websocket("/api/v1/ws/{item_id}")
async def websocket_endpoint(websocket: WebSocket, item_id: str):
await manager.connect(websocket, item_id)
try:
while True:
data = await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
@core_app.post("/api/v1/ws/{item_id}")
async def websocket_endpoint(item_id: str, data: str):
await updater(item_id, data)
async def updater(item_id, data):
return await manager.send_personal_message(
f"{data}", item_id
)