feat: map response to request

This commit is contained in:
Vlad Stan 2024-06-20 14:40:23 +03:00
parent 271b673d41
commit fb167686d2

View File

@ -1,9 +1,12 @@
import asyncio
import uuid import uuid
from asyncio import Queue, TimeoutError
from http import HTTPStatus from http import HTTPStatus
from json import dumps, loads from json import dumps, loads
from typing import Any, Awaitable, Mapping, Optional from typing import Any, Awaitable, Dict, Mapping, Optional
from urllib.parse import urlencode from urllib.parse import urlencode
import httpx
from fastapi import HTTPException, WebSocket, WebSocketDisconnect from fastapi import HTTPException, WebSocket, WebSocketDisconnect
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from loguru import logger from loguru import logger
@ -22,22 +25,26 @@ class HTTPTunnelClient:
self._send_fn = send_fn self._send_fn = send_fn
self._receive_fn = receive_fn self._receive_fn = receive_fn
self._req_resp: Dict[str, Queue] = {}
@property
def connected(self) -> bool:
return self._send_fn is not None and self._receive_fn is not None
async def connect(self, send_fn: Awaitable, receive_fn: Awaitable): async def connect(self, send_fn: Awaitable, receive_fn: Awaitable):
self._send_fn = send_fn self._send_fn = send_fn
self._receive_fn = receive_fn self._receive_fn = receive_fn
while settings.lnbits_running: while settings.lnbits_running and self.connected:
req = await self._receive_fn() resp = await self._receive_fn()
print("### req", req) print("### resp", resp)
await self._handle_response(resp)
def disconnect(self): def disconnect(self):
self._send_fn = None self._send_fn = None
self._receive_fn = None self._receive_fn = None
@property
def connected(self) -> bool:
return self._send_fn and self._receive_fn
async def request( async def request(
self, self,
method: str, method: str,
@ -47,11 +54,12 @@ class HTTPTunnelClient:
json: Optional[dict] = None, json: Optional[dict] = None,
params: Optional[Mapping[str, str]] = None, params: Optional[Mapping[str, str]] = None,
headers: Optional[Mapping[str, str]] = None, headers: Optional[Mapping[str, str]] = None,
timeout: Optional[int] = None, timeout: Optional[int],
) -> "HTTPTunnelResponse": ) -> "HTTPTunnelResponse":
try: try:
assert self.connected, "Tunnel connection not established." assert self.connected, "Tunnel connection not established."
request_id = uuid.uuid4().hex request_id = uuid.uuid4().hex
self._req_resp[request_id] = Queue()
body = data body = data
if json: if json:
body = dumps(json) body = dumps(json)
@ -65,8 +73,22 @@ class HTTPTunnelClient:
"headers": headers, "headers": headers,
} }
) )
resp = await asyncio.wait_for(
self._req_resp[request_id].get(), timeout or 30
)
print("### resp", resp)
del self._req_resp[request_id]
except TimeoutError as exc:
logger.warning(exc)
return HTTPTunnelResponse({"status": int(HTTPStatus.REQUEST_TIMEOUT)})
except Exception as exc: except Exception as exc:
logger.warning(exc) logger.warning(exc)
return HTTPTunnelResponse(
{"status": int(HTTPStatus.INTERNAL_SERVER_ERROR), "detail": str(exc)}
)
finally:
del self._req_resp[request_id]
async def get( async def get(
self, self,
@ -193,28 +215,65 @@ class HTTPTunnelClient:
) )
async def aclose(self) -> None: async def aclose(self) -> None:
print("### todo close fn") self.disconnect()
async def _handle_response(self, resp: Optional[dict]):
if not resp:
return
request_id = resp.get("request_id")
if request_id:
await self._handle_request_id(resp, request_id)
else:
self._handle_streaming()
async def _handle_request_id(self, resp, request_id):
awaiting_req = self._req_resp.get(request_id)
if awaiting_req:
await awaiting_req.put(resp)
else:
logger.warning(f"Unknown request id: '{request_id}'. Possible timeout!")
def _handle_streaming(self):
print("### handle streaming here")
class HTTPTunnelResponse: class HTTPTunnelResponse:
# status code, detail # status code, detail
def __init__(self): def __init__(self, resp: Optional[dict]):
pass self._resp = resp
@property @property
def is_error(self) -> bool: def is_error(self) -> bool:
pass status = self._resp.get("status", 500)
return 400 <= status <= 599
@property
def is_success(self) -> bool:
status = self._resp.get("status", 500)
return 200 <= status <= 299
@property @property
def text(self) -> str: def text(self) -> str:
pass if not self._resp or "body" not in self._resp:
return ""
return self._resp["body"]
def raise_for_status(self) -> "HTTPTunnelResponse": def raise_for_status(self) -> "HTTPTunnelResponse":
pass if self._resp is None:
raise RuntimeError(
"Cannot call `raise_for_status` as the response "
"instance has not been set on this response."
)
if self.is_success:
return self
# todo add request, test flow
raise httpx.HTTPStatusError(self.text, request=None, response=self)
def json(self, **kwargs: Any) -> Any: def json(self, **kwargs: Any) -> Any:
pass body = self.text
return loads(body, **kwargs) if body else None
class HTTPInternalCall: class HTTPInternalCall: