From 9a84dd940feb14bc43328cdb224b93825b6e23c8 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Tue, 25 Jun 2024 19:43:11 +0300 Subject: [PATCH] feat: handle stream --- lnbits/utils/gateway.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lnbits/utils/gateway.py b/lnbits/utils/gateway.py index cd7651479..9e38eec21 100644 --- a/lnbits/utils/gateway.py +++ b/lnbits/utils/gateway.py @@ -3,7 +3,7 @@ import uuid from asyncio import Queue, TimeoutError from http import HTTPStatus from json import dumps, loads -from typing import Any, Awaitable, Dict, Mapping, Optional +from typing import Any, AsyncIterator, Awaitable, Dict, Mapping, Optional from urllib.parse import urlencode import httpx @@ -27,6 +27,7 @@ class HTTPTunnelClient: self._receive_fn = receive_fn self._req_resp: Dict[str, Queue] = {} + self._chunks: Queue = Queue() @property def connected(self) -> bool: @@ -214,6 +215,11 @@ class HTTPTunnelClient: timeout=timeout, ) + async def aiter_text(self) -> AsyncIterator[str]: + for chunk in await self._chunks.get(): + print("### chunk", chunk) + yield chunk + async def aclose(self) -> None: self.disconnect() @@ -224,7 +230,7 @@ class HTTPTunnelClient: if request_id: await self._handle_request_id(resp, request_id) else: - self._handle_streaming() + self._handle_streaming(resp) async def _handle_request_id(self, resp, request_id): awaiting_req = self._req_resp.get(request_id) @@ -233,8 +239,9 @@ class HTTPTunnelClient: else: logger.warning(f"Unknown request id: '{request_id}'. Possible timeout!") - def _handle_streaming(self): - print("### handle streaming here") + def _handle_streaming(self, data: dict): + print("### handle streaming here", data) + self._chunks.put(data) class HTTPTunnelResponse: @@ -400,6 +407,11 @@ class WebSocketReverseWallet: logger.info(f"[Wallet: {self.wallet_id}] Diconnectig...") self._ws_client.close() + def notify_payment(self, payment_hash: str): + print("### notify_payment", payment_hash) + if self._ws_client: + self._ws_client.send(dumps({"payment_hash": payment_hash})) + def _on_open(self, _): logger.info(f"[Wallet: {self.wallet_id}] Connected as reverse funding source.")