feat: handle stream

This commit is contained in:
Vlad Stan 2024-06-25 19:43:11 +03:00
parent 9d21c22aad
commit 9a84dd940f

View File

@ -3,7 +3,7 @@ import uuid
from asyncio import Queue, TimeoutError
from http import HTTPStatus
from json import dumps, loads
from typing import Any, Awaitable, Dict, Mapping, Optional
from typing import Any, AsyncIterator, Awaitable, Dict, Mapping, Optional
from urllib.parse import urlencode
import httpx
@ -27,6 +27,7 @@ class HTTPTunnelClient:
self._receive_fn = receive_fn
self._req_resp: Dict[str, Queue] = {}
self._chunks: Queue = Queue()
@property
def connected(self) -> bool:
@ -214,6 +215,11 @@ class HTTPTunnelClient:
timeout=timeout,
)
async def aiter_text(self) -> AsyncIterator[str]:
for chunk in await self._chunks.get():
print("### chunk", chunk)
yield chunk
async def aclose(self) -> None:
self.disconnect()
@ -224,7 +230,7 @@ class HTTPTunnelClient:
if request_id:
await self._handle_request_id(resp, request_id)
else:
self._handle_streaming()
self._handle_streaming(resp)
async def _handle_request_id(self, resp, request_id):
awaiting_req = self._req_resp.get(request_id)
@ -233,8 +239,9 @@ class HTTPTunnelClient:
else:
logger.warning(f"Unknown request id: '{request_id}'. Possible timeout!")
def _handle_streaming(self):
print("### handle streaming here")
def _handle_streaming(self, data: dict):
print("### handle streaming here", data)
self._chunks.put(data)
class HTTPTunnelResponse:
@ -400,6 +407,11 @@ class WebSocketReverseWallet:
logger.info(f"[Wallet: {self.wallet_id}] Diconnectig...")
self._ws_client.close()
def notify_payment(self, payment_hash: str):
print("### notify_payment", payment_hash)
if self._ws_client:
self._ws_client.send(dumps({"payment_hash": payment_hash}))
def _on_open(self, _):
logger.info(f"[Wallet: {self.wallet_id}] Connected as reverse funding source.")