From 139885768882fc142a0713bfef03074a0fc36cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?dni=20=E2=9A=A1?= Date: Fri, 22 Mar 2024 12:29:05 +0100 Subject: [PATCH] refactor: move registering of tasks into `app.py` (#2290) * refactor: move registering of tasks into `app.py` let the app have the responsibility to create the tasks not a helper function inside tasks.py. makes it way easier to follow on how those background tasks are ran. --- lnbits/app.py | 24 ++++++++++++++------ lnbits/core/tasks.py | 52 +++++++++++--------------------------------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/lnbits/app.py b/lnbits/app.py index c3a6dc989..ba6f7aef6 100644 --- a/lnbits/app.py +++ b/lnbits/app.py @@ -26,12 +26,16 @@ from starlette.responses import JSONResponse from lnbits.core.crud import get_dbversions, get_installed_extensions from lnbits.core.helpers import migrate_extension_database from lnbits.core.services import websocketUpdater -from lnbits.core.tasks import ( # register_watchdog,; unregister_watchdog, - register_killswitch, - register_task_listeners, +from lnbits.core.tasks import ( # watchdog_task + killswitch_task, + wait_for_paid_invoices, ) from lnbits.settings import settings -from lnbits.tasks import cancel_all_tasks, create_permanent_task +from lnbits.tasks import ( + cancel_all_tasks, + create_permanent_task, + register_invoice_listener, +) from lnbits.utils.cache import cache from lnbits.wallets import get_wallet_class, set_wallet_class @@ -470,9 +474,15 @@ def register_async_tasks(app): create_permanent_task(invoice_listener) create_permanent_task(internal_invoice_listener) create_permanent_task(cache.invalidate_forever) - register_task_listeners() - register_killswitch() - # await run_deferred_async() # calle: doesn't do anyting? + + # core invoice listener + invoice_queue = asyncio.Queue(5) + register_invoice_listener(invoice_queue, "core") + create_permanent_task(lambda: wait_for_paid_invoices(invoice_queue)) + + # TODO: implement watchdog properly + # create_permanent_task(watchdog_task) + create_permanent_task(killswitch_task) def register_exception_handlers(app: FastAPI): diff --git a/lnbits/core/tasks.py b/lnbits/core/tasks.py index 46edd2653..c35a5ad3f 100644 --- a/lnbits/core/tasks.py +++ b/lnbits/core/tasks.py @@ -17,26 +17,16 @@ from lnbits.core.services import ( switch_to_voidwallet, ) from lnbits.settings import get_wallet_class, settings -from lnbits.tasks import ( - create_permanent_task, - create_task, - register_invoice_listener, - send_push_notification, -) +from lnbits.tasks import send_push_notification api_invoice_listeners: Dict[str, asyncio.Queue] = {} -def register_killswitch(): +async def killswitch_task(): """ - Registers a killswitch which will check lnbits-status repository for a signal from + killswitch will check lnbits-status repository for a signal from LNbits and will switch to VoidWallet if the killswitch is triggered. """ - logger.debug("Starting killswitch task") - create_permanent_task(killswitch_task) - - -async def killswitch_task(): while True: WALLET = get_wallet_class() if settings.lnbits_killswitch and WALLET.__class__.__name__ != "VoidWallet": @@ -59,17 +49,11 @@ async def killswitch_task(): await asyncio.sleep(settings.lnbits_killswitch_interval * 60) -async def register_watchdog(): +async def watchdog_task(): """ Registers a watchdog which will check lnbits balance and nodebalance and will switch to VoidWallet if the watchdog delta is reached. """ - # TODO: implement watchdog properly - # logger.debug("Starting watchdog task") - # create_permanent_task(watchdog_task) - - -async def watchdog_task(): while True: WALLET = get_wallet_class() if settings.lnbits_watchdog and WALLET.__class__.__name__ != "VoidWallet": @@ -84,36 +68,23 @@ async def watchdog_task(): await asyncio.sleep(settings.lnbits_watchdog_interval * 60) -def register_task_listeners(): - """ - Registers an invoice listener queue for the core tasks. Incoming payments in this - queue will eventually trigger the signals sent to all other extensions - and fulfill other core tasks such as dispatching webhooks. - """ - invoice_paid_queue = asyncio.Queue(5) - # we register invoice_paid_queue to receive all incoming invoices - register_invoice_listener(invoice_paid_queue, "core/tasks.py") - # register a worker that will react to invoices - create_task(wait_for_paid_invoices(invoice_paid_queue)) - - async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): """ - This worker dispatches events to all extensions, - dispatches webhooks and balance notifys. + This task dispatches events to all api_invoice_listeners, + webhooks, push notifications and balance notifications. """ while True: payment = await invoice_paid_queue.get() logger.trace("received invoice paid event") - # send information to sse channel + # dispatch api_invoice_listeners await dispatch_api_invoice_listeners(payment) + # payment notification wallet = await get_wallet(payment.wallet_id) if wallet: await send_payment_notification(wallet, payment) # dispatch webhook if payment.webhook and not payment.webhook_status: await dispatch_webhook(payment) - # dispatch balance_notify url = await get_balance_notify(payment.wallet_id) if url: @@ -136,6 +107,7 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): logger.warning(f"Could not send balance_notify to {url}") logger.warning(exc) + # dispatch push notification await send_payment_push_notification(payment) @@ -145,10 +117,12 @@ async def dispatch_api_invoice_listeners(payment: Payment): """ for chan_name, send_channel in api_invoice_listeners.items(): try: - logger.debug(f"sending invoice paid event to {chan_name}") + logger.debug(f"api invoice listener: sending paid event to {chan_name}") send_channel.put_nowait(payment) except asyncio.QueueFull: - logger.error(f"removing sse listener {send_channel}:{chan_name}") + logger.error( + f"api invoice listener: QueueFull, removing {send_channel}:{chan_name}" + ) api_invoice_listeners.pop(chan_name)