feat: websocket for payments events, remove /payments/sse and longpolling endpoint (#2704)

This commit is contained in:
dni ⚡ 2024-12-16 10:10:25 +01:00 committed by GitHub
parent 740180d332
commit 5f8ccee5b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 71 additions and 313 deletions

View File

@ -348,15 +348,6 @@ Assuming your LNbits is running on port `5000` add:
```
yourdomain.com {
handle /api/v1/payments/sse* {
reverse_proxy 0.0.0.0:5000 {
header_up X-Forwarded-Host yourdomain.com
transport http {
keepalive off
compression off
}
}
}
reverse_proxy 0.0.0.0:5000 {
header_up X-Forwarded-Host yourdomain.com
}

View File

@ -14,6 +14,7 @@ from fastapi.staticfiles import StaticFiles
from loguru import logger
from slowapi import Limiter
from slowapi.util import get_remote_address
from starlette.middleware.gzip import GZipMiddleware
from starlette.middleware.sessions import SessionMiddleware
from lnbits.core.crud import (
@ -55,7 +56,6 @@ from .core.models.extensions import Extension, ExtensionMeta, InstallableExtensi
from .core.services import check_admin_settings, check_webpush_settings
from .middleware import (
AuditMiddleware,
CustomGZipMiddleware,
ExtensionsRedirectMiddleware,
InstalledExtensionMiddleware,
add_first_install_middleware,
@ -153,10 +153,7 @@ def create_app() -> FastAPI:
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
app.add_middleware(
CustomGZipMiddleware, minimum_size=1000, exclude_paths=["/api/v1/payments/sse"]
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(AuditMiddleware, audit_queue=audit_queue)

View File

@ -11,7 +11,6 @@ from .views.extension_api import extension_router
from .views.generic import generic_router
from .views.node_api import node_router, public_node_router, super_node_router
from .views.payment_api import payment_router
from .views.public_api import public_router
from .views.tinyurl_api import tinyurl_router
from .views.user_api import users_router
from .views.wallet_api import wallet_router
@ -31,7 +30,6 @@ def init_core_routers(app: FastAPI):
app.include_router(extension_router)
app.include_router(super_node_router)
app.include_router(public_node_router)
app.include_router(public_router)
app.include_router(payment_router)
app.include_router(wallet_router)
app.include_router(api_router)

View File

@ -218,16 +218,16 @@ async def send_payment_notification(wallet: Wallet, payment: Payment):
# TODO: figure out why we send the balance with the payment here.
# cleaner would be to have a separate message for the balance
# and send it with the id of the wallet so wallets can subscribe to it
await websocket_manager.send_data(
json.dumps(
{
"wallet_balance": wallet.balance,
# use pydantic json serialization to get the correct datetime format
"payment": json.loads(payment.json()),
},
),
wallet.inkey,
payment_notification = json.dumps(
{
"wallet_balance": wallet.balance,
# use pydantic json serialization to get the correct datetime format
"payment": json.loads(payment.json()),
},
)
await websocket_manager.send_data(payment_notification, wallet.inkey)
await websocket_manager.send_data(payment_notification, wallet.adminkey)
await websocket_manager.send_data(
json.dumps({"pending": payment.pending}), payment.payment_hash
)

View File

@ -1,5 +1,4 @@
import asyncio
from typing import Dict
import httpx
from loguru import logger
@ -21,7 +20,6 @@ from lnbits.settings import get_funding_source, settings
from lnbits.tasks import send_push_notification
from lnbits.utils.exchange_rates import btc_rates
api_invoice_listeners: Dict[str, asyncio.Queue] = {}
audit_queue: asyncio.Queue = asyncio.Queue()
@ -85,8 +83,6 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue):
while settings.lnbits_running:
payment = await invoice_paid_queue.get()
logger.trace("received invoice paid event")
# dispatch api_invoice_listeners
await dispatch_api_invoice_listeners(payment)
# payment notification
wallet = await get_wallet(payment.wallet_id)
if wallet:
@ -98,21 +94,6 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue):
await send_payment_push_notification(payment)
async def dispatch_api_invoice_listeners(payment: Payment):
"""
Emits events to invoice listener subscribed from the API.
"""
for chan_name, send_channel in api_invoice_listeners.items():
try:
logger.debug(f"api invoice listener: sending paid event to {chan_name}")
send_channel.put_nowait(payment)
except asyncio.QueueFull:
logger.error(
f"api invoice listener: QueueFull, removing {send_channel}:{chan_name}"
)
api_invoice_listeners.pop(chan_name)
async def dispatch_webhook(payment: Payment):
"""
Dispatches the webhook to the webhook url.

View File

@ -37,12 +37,7 @@
<h3 class="q-my-none text-no-wrap">
<strong v-text="formattedBalance"></strong>
<small> {{LNBITS_DENOMINATION}}</small>
<lnbits-update-balance
:wallet_id="this.g.wallet.id"
flat
:callback="updateBalanceCallback"
round
/>
<lnbits-update-balance :wallet_id="this.g.wallet.id" flat round />
</h3>
<div class="row">
<div class="col">

View File

@ -30,9 +30,6 @@ self.addEventListener('activate', evt =>
// from the network before returning it to the page.
self.addEventListener('fetch', event => {
if (
!event.request.url.startsWith(
self.location.origin + '/api/v1/payments/sse'
) &&
event.request.url.startsWith(self.location.origin) &&
event.request.method == 'GET'
) {

View File

@ -14,7 +14,6 @@ from lnbits.core.services import (
get_balance_delta,
update_cached_settings,
)
from lnbits.core.tasks import api_invoice_listeners
from lnbits.decorators import check_admin, check_super_user
from lnbits.server import server_restart
from lnbits.settings import AdminSettings, Settings, UpdateSettings, settings
@ -45,7 +44,6 @@ async def api_auditor():
async def api_monitor():
return {
"invoice_listeners": list(invoice_listeners.keys()),
"api_invoice_listeners": list(api_invoice_listeners.keys()),
}

View File

@ -1,6 +1,4 @@
import asyncio
import json
import uuid
from http import HTTPStatus
from math import ceil
from typing import List, Optional
@ -13,11 +11,9 @@ from fastapi import (
Header,
HTTPException,
Query,
Request,
)
from fastapi.responses import JSONResponse
from loguru import logger
from sse_starlette.sse import EventSourceResponse
from lnbits import bolt11
from lnbits.core.models import (
@ -57,7 +53,6 @@ from ..services import (
pay_invoice,
update_pending_payments,
)
from ..tasks import api_invoice_listeners
payment_router = APIRouter(prefix="/api/v1/payments", tags=["Payments"])
@ -313,47 +308,6 @@ async def api_payments_pay_lnurl(
return payment
async def subscribe_wallet_invoices(request: Request, wallet: Wallet):
"""
Subscribe to new invoices for a wallet. Can be wrapped in EventSourceResponse.
Listenes invoming payments for a wallet and yields jsons with payment details.
"""
this_wallet_id = wallet.id
payment_queue: asyncio.Queue[Payment] = asyncio.Queue(0)
uid = f"{this_wallet_id}_{str(uuid.uuid4())[:8]}"
logger.debug(f"adding sse listener for wallet: {uid}")
api_invoice_listeners[uid] = payment_queue
try:
while settings.lnbits_running:
if await request.is_disconnected():
await request.close()
break
payment: Payment = await payment_queue.get()
if payment.wallet_id == this_wallet_id:
logger.debug("sse listener: payment received", payment)
yield {"data": payment.json(), "event": "payment-received"}
except asyncio.CancelledError:
logger.debug(f"removing listener for wallet {uid}")
except Exception as exc:
logger.error(f"Error in sse: {exc}")
finally:
api_invoice_listeners.pop(uid)
@payment_router.get("/sse")
async def api_payments_sse(
request: Request, key_info: WalletTypeInfo = Depends(require_invoice_key)
):
return EventSourceResponse(
subscribe_wallet_invoices(request, key_info.wallet),
ping=20,
media_type="text/event-stream",
)
# TODO: refactor this route into a public and admin one
@payment_router.get("/{payment_hash}")
async def api_payment(payment_hash, x_api_key: Optional[str] = Header(None)):

View File

@ -1,59 +0,0 @@
import asyncio
from http import HTTPStatus
from fastapi import APIRouter, HTTPException
from loguru import logger
from lnbits import bolt11
from ..crud import get_standalone_payment
from ..tasks import api_invoice_listeners
public_router = APIRouter(tags=["Core"])
@public_router.get("/public/v1/payment/{payment_hash}")
async def api_public_payment_longpolling(payment_hash):
payment = await get_standalone_payment(payment_hash)
if not payment:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Payment does not exist."
)
# TODO: refactor to use PaymentState
if payment.success:
return {"status": "paid"}
try:
invoice = bolt11.decode(payment.bolt11)
if invoice.has_expired():
return {"status": "expired"}
except Exception as exc:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Invalid bolt11 invoice."
) from exc
payment_queue = asyncio.Queue(0)
logger.debug(f"adding standalone invoice listener for hash: {payment_hash}")
api_invoice_listeners[payment_hash] = payment_queue
response = None
async def payment_info_receiver():
for payment in await payment_queue.get():
if payment.payment_hash == payment_hash:
nonlocal response
response = {"status": "paid"}
async def timeouter(cancel_scope):
await asyncio.sleep(45)
cancel_scope.cancel()
cancel_scope = asyncio.create_task(payment_info_receiver())
asyncio.create_task(timeouter(cancel_scope)) # noqa: RUF006
if response:
return response
else:
raise HTTPException(status_code=HTTPStatus.REQUEST_TIMEOUT, detail="timeout")

View File

@ -11,7 +11,6 @@ from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.gzip import GZipMiddleware
from starlette.types import ASGIApp, Receive, Scope, Send
from lnbits.core.db import core_app_extra
@ -93,18 +92,6 @@ class InstalledExtensionMiddleware:
)
class CustomGZipMiddleware(GZipMiddleware):
def __init__(self, *args, exclude_paths=None, **kwargs):
super().__init__(*args, **kwargs)
self.exclude_paths = exclude_paths or []
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if "path" in scope and scope["path"] in self.exclude_paths:
await self.app(scope, receive, send)
return
await super().__call__(scope, receive, send)
class ExtensionsRedirectMiddleware:
# Extensions are allowed to specify redirect paths. A call to a path outside the
# scope of the extension can be redirected to one of the extension's endpoints.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -6,6 +6,10 @@ window.i18n = new VueI18n.createI18n({
messages: window.localisation
})
const websocketPrefix =
window.location.protocol === 'http:' ? 'ws://' : 'wss://'
const websocketUrl = `${websocketPrefix}${window.location.host}/api/v1/ws`
window.LNbits = {
api: {
request(method, url, apiKey, data) {
@ -176,36 +180,14 @@ window.LNbits = {
},
events: {
onInvoicePaid(wallet, cb) {
let listener = ev => {
cb(JSON.parse(ev.data))
}
this.listenersCount = this.listenersCount || {[wallet.inkey]: 0}
this.listenersCount[wallet.inkey]++
this.listeners = this.listeners || {}
if (!(wallet.inkey in this.listeners)) {
this.listeners[wallet.inkey] = new EventSource(
'/api/v1/payments/sse?api-key=' + wallet.inkey
)
}
this.listeners[wallet.inkey].addEventListener(
'payment-received',
listener
)
return () => {
this.listeners[wallet.inkey].removeEventListener(
'payment-received',
listener
)
this.listenersCount[wallet.inkey]--
if (this.listenersCount[wallet.inkey] <= 0) {
this.listeners[wallet.inkey].close()
delete this.listeners[wallet.inkey]
ws = new WebSocket(`${websocketUrl}/${wallet.inkey}`)
ws.onmessage = ev => {
const data = JSON.parse(ev.data)
if (data.payment) {
cb(data)
}
}
return ws.onclose
}
},
map: {

View File

@ -484,7 +484,7 @@ window.app.component('lnbits-dynamic-chips', {
window.app.component('lnbits-update-balance', {
template: '#lnbits-update-balance',
mixins: [window.windowMixin],
props: ['wallet_id', 'callback'],
props: ['wallet_id'],
computed: {
denomination() {
return LNBITS_DENOMINATION
@ -498,21 +498,19 @@ window.app.component('lnbits-update-balance', {
credit: 0
}
},
watch: {
credit(val) {
this.updateBalance(val)
}
},
methods: {
updateBalance(credit) {
LNbits.api
.updateBalance(credit, this.wallet_id)
.then(res => {
if (res.data.status !== 'Success') {
if (res.data.success !== true) {
throw new Error(res.data)
}
this.callback({
success: true,
credit: parseInt(credit),
wallet_id: this.wallet_id
})
})
.then(_ => {
credit = parseInt(credit)
Quasar.Notify.create({
type: 'positive',

View File

@ -1,28 +1,19 @@
function eventReactionWebocket(event_id) {
function eventReaction(amount) {
localUrl = ''
reaction = localStorage.getItem('lnbits.reactions')
if (!reaction || reaction === 'None') {
return
}
if (location.protocol !== 'http:') {
localUrl = 'wss://' + location.host + '/api/v1/ws/' + event_id
} else {
localUrl = 'ws://' + location.host + '/api/v1/ws/' + event_id
}
connection = new WebSocket(localUrl)
connection.onmessage = function (e) {
try {
const parsedData = JSON.parse(e.data)
if (parsedData.payment.amount < 0) {
return
}
reaction = localStorage.getItem('lnbits.reactions')
if (reaction) {
window[reaction.split('|')[1]]()
}
} catch (e) {
console.log(e)
try {
if (amount < 0) {
return
}
reaction = localStorage.getItem('lnbits.reactions')
if (reaction) {
window[reaction.split('|')[1]]()
}
} catch (e) {
console.log(e)
}
}
function confettiBothSides() {

View File

@ -536,19 +536,6 @@ window.app = Vue.createApp({
})
})
},
fetchBalance() {
LNbits.api.getWallet(this.g.wallet).then(response => {
this.balance = Math.floor(response.data.balance / 1000)
document.dispatchEvent(
new CustomEvent('updateWalletBalance', {
detail: [this.g.wallet.id, this.balance]
})
)
})
if (this.g.wallet.currency) {
this.updateFiatBalance()
}
},
updateFiatBalance() {
if (!this.g.wallet.currency) return 0
LNbits.api
@ -561,11 +548,6 @@ window.app = Vue.createApp({
})
.catch(e => console.error(e))
},
updateBalanceCallback(res) {
if (res.success && wallet.id === res.wallet_id) {
this.balance += res.credit
}
},
pasteToTextArea() {
this.$refs.textArea.focus() // Set cursor to textarea
navigator.clipboard.readText().then(text => {
@ -687,7 +669,7 @@ window.app = Vue.createApp({
},
watch: {
updatePayments() {
this.fetchBalance()
this.updateFiatBalance()
}
},
mounted() {
@ -697,10 +679,14 @@ window.app = Vue.createApp({
this.$q.localStorage.set('lnbits.disclaimerShown', true)
}
// listen to incoming payments
LNbits.events.onInvoicePaid(this.g.wallet, payment => {
this.onPaymentReceived(payment.payment_hash)
LNbits.events.onInvoicePaid(this.g.wallet, data => {
console.log('Payment received:', data.payment.payment_hash)
console.log('Wallet balance:', data.wallet_balance)
console.log('Wallet ID:', this.g.wallet)
this.onPaymentReceived(data.payment.payment_hash)
this.balance = data.wallet_balance
eventReaction(data.payment.amount)
})
eventReactionWebocket(wallet.inkey)
}
})

View File

@ -515,7 +515,7 @@
v-model="scope.value"
dense
autofocus
@keyup.enter="updateBalance(scope.value)"
@keyup.enter="scope.set"
>
<template v-slot:append>
<q-icon name="edit" />

View File

@ -4,6 +4,7 @@ from typing import AsyncGenerator, Dict, Optional
import httpx
from loguru import logger
from websockets.client import connect
from lnbits.settings import settings
@ -36,6 +37,7 @@ class LNbitsWallet(Wallet):
"missing lnbits_key or lnbits_admin_key or lnbits_invoice_key"
)
self.endpoint = self.normalize_endpoint(settings.lnbits_endpoint)
self.ws_url = f"{self.endpoint.replace('http', 'ws', 1)}/api/v1/ws/{key}"
self.headers = {"X-Api-Key": key, "User-Agent": settings.user_agent}
self.client = httpx.AsyncClient(base_url=self.endpoint, headers=self.headers)
@ -194,38 +196,24 @@ class LNbitsWallet(Wallet):
return PaymentPendingStatus()
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
url = f"{self.endpoint}/api/v1/payments/sse"
while settings.lnbits_running:
try:
async with httpx.AsyncClient(
timeout=None, headers=self.headers
) as client:
del client.headers[
"accept-encoding"
] # we have to disable compression for SSEs
async with client.stream(
"GET", url, content="text/event-stream"
) as r:
sse_trigger = False
async for line in r.aiter_lines():
# The data we want to listen to is of this shape:
# event: payment-received
# data: {.., "payment_hash" : "asd"}
if line.startswith("event: payment-received"):
sse_trigger = True
continue
elif sse_trigger and line.startswith("data:"):
data = json.loads(line[len("data:") :])
sse_trigger = False
yield data["payment_hash"]
else:
sse_trigger = False
except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout):
pass
logger.error(
"lost connection to lnbits /payments/sse, retrying in 5 seconds"
)
await asyncio.sleep(5)
async with connect(self.ws_url) as ws:
logger.info("connected to LNbits fundingsource websocket.")
while settings.lnbits_running:
message = await ws.recv()
message_dict = json.loads(message)
if (
message_dict
and message_dict.get("payment")
and message_dict["payment"].get("payment_hash")
):
payment_hash = message_dict["payment"]["payment_hash"]
logger.info(f"payment-received: {payment_hash}")
yield payment_hash
except Exception as exc:
logger.error(
f"lost connection to LNbits fundingsource websocket: '{exc}'"
"retrying in 5 seconds"
)
await asyncio.sleep(5)

View File

@ -1,26 +0,0 @@
import pytest
from lnbits.core.models import Payment
# check if the client is working
@pytest.mark.anyio
async def test_core_views_generic(client):
response = await client.get("/")
assert response.status_code == 200
# check GET /public/v1/payment/{payment_hash}: correct hash [should pass]
@pytest.mark.anyio
async def test_api_public_payment_longpolling(client, invoice: Payment):
response = await client.get(f"/public/v1/payment/{invoice.payment_hash}")
assert response.status_code < 300
assert response.json()["status"] == "paid"
# check GET /public/v1/payment/{payment_hash}: wrong hash [should fail]
@pytest.mark.anyio
async def test_api_public_payment_longpolling_wrong_hash(client, invoice: Payment):
response = await client.get(f"/public/v1/payment/{invoice.payment_hash + '0'*64}")
assert response.status_code == 404
assert response.json()["detail"] == "Payment does not exist."