diff --git a/lnbits/utils/gateway.py b/lnbits/utils/gateway.py index c84ee2690..beb33e6af 100644 --- a/lnbits/utils/gateway.py +++ b/lnbits/utils/gateway.py @@ -1,9 +1,12 @@ +import asyncio import uuid +from asyncio import Queue, TimeoutError from http import HTTPStatus from json import dumps, loads -from typing import Any, Awaitable, Mapping, Optional +from typing import Any, Awaitable, Dict, Mapping, Optional from urllib.parse import urlencode +import httpx from fastapi import HTTPException, WebSocket, WebSocketDisconnect from fastapi.routing import APIRouter from loguru import logger @@ -22,22 +25,26 @@ class HTTPTunnelClient: self._send_fn = send_fn self._receive_fn = receive_fn + self._req_resp: Dict[str, Queue] = {} + + @property + def connected(self) -> bool: + return self._send_fn is not None and self._receive_fn is not None + async def connect(self, send_fn: Awaitable, receive_fn: Awaitable): self._send_fn = send_fn self._receive_fn = receive_fn - while settings.lnbits_running: - req = await self._receive_fn() - print("### req", req) + while settings.lnbits_running and self.connected: + resp = await self._receive_fn() + print("### resp", resp) + + await self._handle_response(resp) def disconnect(self): self._send_fn = None self._receive_fn = None - @property - def connected(self) -> bool: - return self._send_fn and self._receive_fn - async def request( self, method: str, @@ -47,11 +54,12 @@ class HTTPTunnelClient: json: Optional[dict] = None, params: Optional[Mapping[str, str]] = None, headers: Optional[Mapping[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[int], ) -> "HTTPTunnelResponse": try: assert self.connected, "Tunnel connection not established." request_id = uuid.uuid4().hex + self._req_resp[request_id] = Queue() body = data if json: body = dumps(json) @@ -65,8 +73,22 @@ class HTTPTunnelClient: "headers": headers, } ) + resp = await asyncio.wait_for( + self._req_resp[request_id].get(), timeout or 30 + ) + print("### resp", resp) + + del self._req_resp[request_id] + except TimeoutError as exc: + logger.warning(exc) + return HTTPTunnelResponse({"status": int(HTTPStatus.REQUEST_TIMEOUT)}) except Exception as exc: logger.warning(exc) + return HTTPTunnelResponse( + {"status": int(HTTPStatus.INTERNAL_SERVER_ERROR), "detail": str(exc)} + ) + finally: + del self._req_resp[request_id] async def get( self, @@ -193,28 +215,65 @@ class HTTPTunnelClient: ) async def aclose(self) -> None: - print("### todo close fn") + self.disconnect() + + async def _handle_response(self, resp: Optional[dict]): + if not resp: + return + request_id = resp.get("request_id") + if request_id: + await self._handle_request_id(resp, request_id) + else: + self._handle_streaming() + + async def _handle_request_id(self, resp, request_id): + awaiting_req = self._req_resp.get(request_id) + if awaiting_req: + await awaiting_req.put(resp) + else: + logger.warning(f"Unknown request id: '{request_id}'. Possible timeout!") + + def _handle_streaming(self): + print("### handle streaming here") class HTTPTunnelResponse: # status code, detail - def __init__(self): - pass + def __init__(self, resp: Optional[dict]): + self._resp = resp @property def is_error(self) -> bool: - pass + status = self._resp.get("status", 500) + return 400 <= status <= 599 + + @property + def is_success(self) -> bool: + status = self._resp.get("status", 500) + return 200 <= status <= 299 @property def text(self) -> str: - pass + if not self._resp or "body" not in self._resp: + return "" + return self._resp["body"] def raise_for_status(self) -> "HTTPTunnelResponse": - pass + if self._resp is None: + raise RuntimeError( + "Cannot call `raise_for_status` as the response " + "instance has not been set on this response." + ) + if self.is_success: + return self + + # todo add request, test flow + raise httpx.HTTPStatusError(self.text, request=None, response=self) def json(self, **kwargs: Any) -> Any: - pass + body = self.text + return loads(body, **kwargs) if body else None class HTTPInternalCall: