refactor: move registering of tasks into app.py (#2290)

* refactor: move registering of tasks into `app.py`

let the app have the responsibility to create the tasks not a helper function inside tasks.py.
makes it way easier to follow on how those background tasks are ran.
This commit is contained in:
dni ⚡
2024-03-22 12:29:05 +01:00
committed by GitHub
parent 2a1505bc0d
commit 1398857688
2 changed files with 30 additions and 46 deletions

View File

@@ -26,12 +26,16 @@ from starlette.responses import JSONResponse
from lnbits.core.crud import get_dbversions, get_installed_extensions from lnbits.core.crud import get_dbversions, get_installed_extensions
from lnbits.core.helpers import migrate_extension_database from lnbits.core.helpers import migrate_extension_database
from lnbits.core.services import websocketUpdater from lnbits.core.services import websocketUpdater
from lnbits.core.tasks import ( # register_watchdog,; unregister_watchdog, from lnbits.core.tasks import ( # watchdog_task
register_killswitch, killswitch_task,
register_task_listeners, wait_for_paid_invoices,
) )
from lnbits.settings import settings from lnbits.settings import settings
from lnbits.tasks import cancel_all_tasks, create_permanent_task from lnbits.tasks import (
cancel_all_tasks,
create_permanent_task,
register_invoice_listener,
)
from lnbits.utils.cache import cache from lnbits.utils.cache import cache
from lnbits.wallets import get_wallet_class, set_wallet_class from lnbits.wallets import get_wallet_class, set_wallet_class
@@ -470,9 +474,15 @@ def register_async_tasks(app):
create_permanent_task(invoice_listener) create_permanent_task(invoice_listener)
create_permanent_task(internal_invoice_listener) create_permanent_task(internal_invoice_listener)
create_permanent_task(cache.invalidate_forever) create_permanent_task(cache.invalidate_forever)
register_task_listeners()
register_killswitch() # core invoice listener
# await run_deferred_async() # calle: doesn't do anyting? invoice_queue = asyncio.Queue(5)
register_invoice_listener(invoice_queue, "core")
create_permanent_task(lambda: wait_for_paid_invoices(invoice_queue))
# TODO: implement watchdog properly
# create_permanent_task(watchdog_task)
create_permanent_task(killswitch_task)
def register_exception_handlers(app: FastAPI): def register_exception_handlers(app: FastAPI):

View File

@@ -17,26 +17,16 @@ from lnbits.core.services import (
switch_to_voidwallet, switch_to_voidwallet,
) )
from lnbits.settings import get_wallet_class, settings from lnbits.settings import get_wallet_class, settings
from lnbits.tasks import ( from lnbits.tasks import send_push_notification
create_permanent_task,
create_task,
register_invoice_listener,
send_push_notification,
)
api_invoice_listeners: Dict[str, asyncio.Queue] = {} api_invoice_listeners: Dict[str, asyncio.Queue] = {}
def register_killswitch(): async def killswitch_task():
""" """
Registers a killswitch which will check lnbits-status repository for a signal from killswitch will check lnbits-status repository for a signal from
LNbits and will switch to VoidWallet if the killswitch is triggered. LNbits and will switch to VoidWallet if the killswitch is triggered.
""" """
logger.debug("Starting killswitch task")
create_permanent_task(killswitch_task)
async def killswitch_task():
while True: while True:
WALLET = get_wallet_class() WALLET = get_wallet_class()
if settings.lnbits_killswitch and WALLET.__class__.__name__ != "VoidWallet": if settings.lnbits_killswitch and WALLET.__class__.__name__ != "VoidWallet":
@@ -59,17 +49,11 @@ async def killswitch_task():
await asyncio.sleep(settings.lnbits_killswitch_interval * 60) await asyncio.sleep(settings.lnbits_killswitch_interval * 60)
async def register_watchdog(): async def watchdog_task():
""" """
Registers a watchdog which will check lnbits balance and nodebalance Registers a watchdog which will check lnbits balance and nodebalance
and will switch to VoidWallet if the watchdog delta is reached. and will switch to VoidWallet if the watchdog delta is reached.
""" """
# TODO: implement watchdog properly
# logger.debug("Starting watchdog task")
# create_permanent_task(watchdog_task)
async def watchdog_task():
while True: while True:
WALLET = get_wallet_class() WALLET = get_wallet_class()
if settings.lnbits_watchdog and WALLET.__class__.__name__ != "VoidWallet": if settings.lnbits_watchdog and WALLET.__class__.__name__ != "VoidWallet":
@@ -84,36 +68,23 @@ async def watchdog_task():
await asyncio.sleep(settings.lnbits_watchdog_interval * 60) await asyncio.sleep(settings.lnbits_watchdog_interval * 60)
def register_task_listeners():
"""
Registers an invoice listener queue for the core tasks. Incoming payments in this
queue will eventually trigger the signals sent to all other extensions
and fulfill other core tasks such as dispatching webhooks.
"""
invoice_paid_queue = asyncio.Queue(5)
# we register invoice_paid_queue to receive all incoming invoices
register_invoice_listener(invoice_paid_queue, "core/tasks.py")
# register a worker that will react to invoices
create_task(wait_for_paid_invoices(invoice_paid_queue))
async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue):
""" """
This worker dispatches events to all extensions, This task dispatches events to all api_invoice_listeners,
dispatches webhooks and balance notifys. webhooks, push notifications and balance notifications.
""" """
while True: while True:
payment = await invoice_paid_queue.get() payment = await invoice_paid_queue.get()
logger.trace("received invoice paid event") logger.trace("received invoice paid event")
# send information to sse channel # dispatch api_invoice_listeners
await dispatch_api_invoice_listeners(payment) await dispatch_api_invoice_listeners(payment)
# payment notification
wallet = await get_wallet(payment.wallet_id) wallet = await get_wallet(payment.wallet_id)
if wallet: if wallet:
await send_payment_notification(wallet, payment) await send_payment_notification(wallet, payment)
# dispatch webhook # dispatch webhook
if payment.webhook and not payment.webhook_status: if payment.webhook and not payment.webhook_status:
await dispatch_webhook(payment) await dispatch_webhook(payment)
# dispatch balance_notify # dispatch balance_notify
url = await get_balance_notify(payment.wallet_id) url = await get_balance_notify(payment.wallet_id)
if url: if url:
@@ -136,6 +107,7 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue):
logger.warning(f"Could not send balance_notify to {url}") logger.warning(f"Could not send balance_notify to {url}")
logger.warning(exc) logger.warning(exc)
# dispatch push notification
await send_payment_push_notification(payment) await send_payment_push_notification(payment)
@@ -145,10 +117,12 @@ async def dispatch_api_invoice_listeners(payment: Payment):
""" """
for chan_name, send_channel in api_invoice_listeners.items(): for chan_name, send_channel in api_invoice_listeners.items():
try: try:
logger.debug(f"sending invoice paid event to {chan_name}") logger.debug(f"api invoice listener: sending paid event to {chan_name}")
send_channel.put_nowait(payment) send_channel.put_nowait(payment)
except asyncio.QueueFull: except asyncio.QueueFull:
logger.error(f"removing sse listener {send_channel}:{chan_name}") logger.error(
f"api invoice listener: QueueFull, removing {send_channel}:{chan_name}"
)
api_invoice_listeners.pop(chan_name) api_invoice_listeners.pop(chan_name)