mirror of
https://github.com/lnbits/lnbits.git
synced 2025-09-28 04:46:18 +02:00
feat: run_interval
in tasks and refactor check_pending_payments (#3245)
This commit is contained in:
@@ -57,6 +57,7 @@ from .core import init_core_routers
|
||||
from .core.db import core_app_extra
|
||||
from .core.models.extensions import Extension, ExtensionMeta, InstallableExtension
|
||||
from .core.services import check_admin_settings, check_webpush_settings
|
||||
from .core.services.payments import check_pending_payments
|
||||
from .middleware import (
|
||||
AuditMiddleware,
|
||||
ExtensionsRedirectMiddleware,
|
||||
@@ -66,9 +67,9 @@ from .middleware import (
|
||||
add_ratelimit_middleware,
|
||||
)
|
||||
from .tasks import (
|
||||
check_pending_payments,
|
||||
internal_invoice_listener,
|
||||
invoice_listener,
|
||||
run_interval,
|
||||
)
|
||||
|
||||
|
||||
@@ -466,7 +467,7 @@ def register_async_tasks():
|
||||
create_permanent_task(wait_for_audit_data)
|
||||
create_permanent_task(wait_notification_messages)
|
||||
|
||||
create_permanent_task(check_pending_payments)
|
||||
create_permanent_task(run_interval(30 * 60, check_pending_payments))
|
||||
create_permanent_task(invoice_listener)
|
||||
create_permanent_task(internal_invoice_listener)
|
||||
create_permanent_task(cache.invalidate_forever)
|
||||
|
@@ -336,6 +336,48 @@ async def update_pending_payment(payment: Payment) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
async def check_pending_payments():
|
||||
"""
|
||||
check_pending_payments is called during startup to check for pending payments with
|
||||
the backend and also to delete expired invoices. Incoming payments will be
|
||||
checked only once, outgoing pending payments will be checked regularly.
|
||||
"""
|
||||
funding_source = get_funding_source()
|
||||
if funding_source.__class__.__name__ == "VoidWallet":
|
||||
logger.warning("Task: skipping pending check for VoidWallet")
|
||||
return
|
||||
start_time = time.time()
|
||||
pending_payments = await get_payments(
|
||||
since=(int(time.time()) - 60 * 60 * 24 * 15), # 15 days ago
|
||||
complete=False,
|
||||
pending=True,
|
||||
exclude_uncheckable=True,
|
||||
)
|
||||
count = len(pending_payments)
|
||||
if count > 0:
|
||||
logger.info(f"Task: checking {count} pending payments of last 15 days...")
|
||||
for i, payment in enumerate(pending_payments):
|
||||
status = await payment.check_status()
|
||||
prefix = f"payment ({i+1} / {count})"
|
||||
if status.failed:
|
||||
payment.status = PaymentState.FAILED
|
||||
await update_payment(payment)
|
||||
logger.debug(f"{prefix} failed {payment.checking_id}")
|
||||
elif status.success:
|
||||
payment.fee = status.fee_msat or 0
|
||||
payment.preimage = status.preimage
|
||||
payment.status = PaymentState.SUCCESS
|
||||
await update_payment(payment)
|
||||
logger.debug(f"{prefix} success {payment.checking_id}")
|
||||
else:
|
||||
logger.debug(f"{prefix} pending {payment.checking_id}")
|
||||
await asyncio.sleep(0.01) # to avoid complete blocking
|
||||
logger.info(
|
||||
f"Task: pending check finished for {count} payments"
|
||||
f" (took {time.time() - start_time:0.3f} s)"
|
||||
)
|
||||
|
||||
|
||||
def fee_reserve_total(amount_msat: int, internal: bool = False) -> int:
|
||||
return fee_reserve(amount_msat, internal) + service_fee(amount_msat, internal)
|
||||
|
||||
|
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from collections.abc import Coroutine
|
||||
@@ -11,7 +10,6 @@ from typing import (
|
||||
from loguru import logger
|
||||
|
||||
from lnbits.core.crud import (
|
||||
get_payments,
|
||||
get_standalone_payment,
|
||||
update_payment,
|
||||
)
|
||||
@@ -153,51 +151,18 @@ def wait_for_paid_invoices(
|
||||
return wrapper
|
||||
|
||||
|
||||
async def check_pending_payments():
|
||||
"""
|
||||
check_pending_payments is called during startup to check for pending payments with
|
||||
the backend and also to delete expired invoices. Incoming payments will be
|
||||
checked only once, outgoing pending payments will be checked regularly.
|
||||
"""
|
||||
sleep_time = 60 * 30 # 30 minutes
|
||||
def run_interval(
|
||||
interval_seconds: int,
|
||||
func: Callable[[], Coroutine],
|
||||
) -> Callable[[], Coroutine]:
|
||||
"""Run a function at a specified interval in seconds, while the server is running"""
|
||||
|
||||
while settings.lnbits_running:
|
||||
funding_source = get_funding_source()
|
||||
if funding_source.__class__.__name__ == "VoidWallet":
|
||||
logger.warning("Task: skipping pending check for VoidWallet")
|
||||
await asyncio.sleep(sleep_time)
|
||||
continue
|
||||
start_time = time.time()
|
||||
pending_payments = await get_payments(
|
||||
since=(int(time.time()) - 60 * 60 * 24 * 15), # 15 days ago
|
||||
complete=False,
|
||||
pending=True,
|
||||
exclude_uncheckable=True,
|
||||
)
|
||||
count = len(pending_payments)
|
||||
if count > 0:
|
||||
logger.info(f"Task: checking {count} pending payments of last 15 days...")
|
||||
for i, payment in enumerate(pending_payments):
|
||||
status = await payment.check_status()
|
||||
prefix = f"payment ({i+1} / {count})"
|
||||
if status.failed:
|
||||
payment.status = PaymentState.FAILED
|
||||
await update_payment(payment)
|
||||
logger.debug(f"{prefix} failed {payment.checking_id}")
|
||||
elif status.success:
|
||||
payment.fee = status.fee_msat or 0
|
||||
payment.preimage = status.preimage
|
||||
payment.status = PaymentState.SUCCESS
|
||||
await update_payment(payment)
|
||||
logger.debug(f"{prefix} success {payment.checking_id}")
|
||||
else:
|
||||
logger.debug(f"{prefix} pending {payment.checking_id}")
|
||||
await asyncio.sleep(0.01) # to avoid complete blocking
|
||||
logger.info(
|
||||
f"Task: pending check finished for {count} payments"
|
||||
f" (took {time.time() - start_time:0.3f} s)"
|
||||
)
|
||||
await asyncio.sleep(sleep_time)
|
||||
async def wrapper() -> None:
|
||||
while settings.lnbits_running:
|
||||
await func()
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
async def invoice_callback_dispatcher(checking_id: str, is_internal: bool = False):
|
||||
|
Reference in New Issue
Block a user