chore: fix code format

This commit is contained in:
Vlad Stan 2025-05-14 13:00:01 +03:00
parent d58198b16c
commit 7db2b52051

View File

@ -1,24 +1,24 @@
import asyncio
import random
import time
import traceback
import uuid
from decimal import Decimal
from typing import Any, AsyncGenerator, Dict, List, Optional
from typing import Any, AsyncGenerator, Dict, Optional
import httpx
from loguru import logger
from lnbits.settings import settings
from .base import (
InvoiceResponse,
PaymentFailedStatus,
PaymentPendingStatus,
PaymentResponse,
PaymentStatus,
PaymentSuccessStatus,
StatusResponse,
Wallet,
PaymentFailedStatus,
)
@ -26,10 +26,11 @@ class TokenBucket:
"""
Token bucket rate limiter for Strike API endpoints.
"""
def __init__(self, rate: int, period_seconds: int):
"""
Initialize a token bucket.
Args:
rate: Max requests allowed in the period
period_seconds: Time period in seconds.
@ -39,25 +40,25 @@ class TokenBucket:
self.tokens = rate
self.last_refill = time.monotonic()
self.lock = asyncio.Lock()
async def consume(self) -> None:
"""Wait until a token is available and consume it."""
async with self.lock:
# Refill tokens based on elapsed time
now = time.monotonic()
elapsed = now - self.last_refill
if elapsed > 0:
new_tokens = (elapsed / self.period) * self.rate
self.tokens = min(self.rate, self.tokens + new_tokens)
self.last_refill = now
# If no tokens available, calculate wait time
if self.tokens < 1:
wait_time = (self.period / self.rate) * (1 - self.tokens)
await asyncio.sleep(wait_time)
self.tokens = 0 # Reset after waiting
# Consume a token
self.tokens -= 1
@ -147,7 +148,7 @@ class StrikeWallet(Wallet):
await self._payment_limiter.consume()
else:
await self._general_limiter.consume()
async with self._sem:
start = time.perf_counter()
@ -156,12 +157,8 @@ class StrikeWallet(Wallet):
resp = await self.client.request(method, path, **kw)
resp.raise_for_status()
logger.trace(
"Strike {m} {p} {s} in {t:.1f} ms".format(
m=method.upper(),
p=path,
s=resp.status_code,
t=(time.perf_counter() - start) * 1000,
)
f"Strike {method.upper()} {path} - {resp.status_code} "
f"in {(time.perf_counter() - start) * 1000:.1f} ms"
)
return resp
@ -172,30 +169,26 @@ class StrikeWallet(Wallet):
):
raise
logger.warning(
"Strike {m} {p} -> {c}; retry {a}/{n}".format(
m=method.upper(),
p=path,
c=e.response.status_code,
a=attempt + 1,
n=self._MAX_RETRIES,
)
f"Strike {method.upper()} {path} -> {e.response.status_code}; "
f"retry {attempt + 1}/{self._MAX_RETRIES}"
)
except httpx.TransportError as e:
if attempt == self._MAX_RETRIES: # No more retries left.
raise
logger.warning(
"Transport error contacting Strike ({err}); retry {a}/{n}".format(
err=e, a=attempt + 1, n=self._MAX_RETRIES
)
f"Transport error contacting Strike ({e}); "
f"retry {attempt + 1}/{self._MAX_RETRIES}"
)
delay = (self._RETRY_BACKOFF_BASE ** attempt) + (0.1 * random.random()) # Exponential backoff with jitter.
delay = (self._RETRY_BACKOFF_BASE**attempt) + (
0.1 * random.random()
) # Exponential backoff with jitter.
await asyncio.sleep(delay)
raise RuntimeError("exceeded retry budget in _req")
# Typed wrappers so call-sites stay tidy.
# Typed wrappers - so call-sites stay tidy.
async def _get(self, path: str, **kw) -> httpx.Response: # GET request.
return await self._req("GET", path, **kw)
@ -223,14 +216,24 @@ class StrikeWallet(Wallet):
try:
r = await self._get("/balances") # Get balances from Strike API.
data = r.json() # Parse JSON response.
balances = data.get("data", []) if isinstance(data, dict) else data # Extract balances or use an empty list.
btc = next((b for b in balances if b.get("currency") == "BTC"), None) # Find BTC balance.
if btc and "available" in btc: # Check if BTC balance and available amount exist.
balances = (
data.get("data", []) if isinstance(data, dict) else data
) # Extract balances or use an empty list.
btc = next(
(b for b in balances if b.get("currency") == "BTC"), None
) # Find BTC balance.
if (
btc and "available" in btc
): # Check if BTC balance and available amount exist.
available_btc = Decimal(btc["available"]) # Get available BTC amount.
msats = int(available_btc * Decimal(1e11)) # Convert BTC to millisatoshis.
msats = int(
available_btc * Decimal(1e11)
) # Convert BTC to millisatoshis.
self._cached_balance = msats # Update cached balance.
self._cached_balance_ts = now # Update cache timestamp.
return StatusResponse(None, msats) # Return successful status with balance.
return StatusResponse(
None, msats
) # Return successful status with balance.
# No BTC balance found.
return StatusResponse(None, 0)
except httpx.HTTPStatusError as e:
@ -249,11 +252,18 @@ class StrikeWallet(Wallet):
**kwargs,
) -> InvoiceResponse:
try:
idem = kwargs.get("idempotency_key") or str(uuid.uuid4()) # Use provided idempotency key or generate a new one.
btc_amt = (Decimal(amount) / Decimal(1e8)).quantize(Decimal("0.00000001")) # Convert amount from millisatoshis to BTC.
idem = kwargs.get("idempotency_key") or str(
uuid.uuid4()
) # Use provided idempotency key or generate a new one.
btc_amt = (Decimal(amount) / Decimal(1e8)).quantize(
Decimal("0.00000001")
) # Convert amount from millisatoshis to BTC.
payload: Dict[str, Any] = {
"bolt11": {
"amount": {"currency": "BTC", "amount": str(btc_amt)}, # Set amount in BTC.
"amount": {
"currency": "BTC",
"amount": str(btc_amt),
}, # Set amount in BTC.
"description": memo or "",
},
"targetCurrency": "BTC",
@ -269,13 +279,21 @@ class StrikeWallet(Wallet):
resp = r.json() # Parse JSON response.
invoice_id = resp.get("receiveRequestId") # Get the receive request ID.
bolt11 = resp.get("bolt11", {}).get("invoice") # Get the bolt11 invoice.
if not invoice_id or not bolt11: # Check if both invoice ID and bolt11 are present.
if (
not invoice_id or not bolt11
): # Check if both invoice ID and bolt11 are present.
return InvoiceResponse(False, None, None, "Invalid invoice response")
self.pending_invoices.append(invoice_id) # Add invoice ID to pending invoices.
return InvoiceResponse(True, invoice_id, bolt11, None) # Return successful invoice response.
self.pending_invoices.append(
invoice_id
) # Add invoice ID to pending invoices.
return InvoiceResponse(
True, invoice_id, bolt11, None
) # Return successful invoice response.
except httpx.HTTPStatusError as e:
msg = e.response.json().get("message", e.response.text) # Get error message from response.
msg = e.response.json().get(
"message", e.response.text
) # Get error message from response.
return InvoiceResponse(False, None, None, f"Strike API error: {msg}")
except Exception:
logger.exception("Error in create_invoice()")
@ -298,33 +316,55 @@ class StrikeWallet(Wallet):
# 2) Execute the payment quote.
e = await self._patch(f"/payment-quotes/{quote_id}/execute")
data = e.json() if e.content else {} # Parse JSON response or use an empty dictionary.
data = (
e.json() if e.content else {}
) # Parse JSON response or use an empty dictionary.
payment_id = data.get("paymentId") # Get the payment ID.
state = data.get("state", "").upper() # Get the payment state and convert it to uppercase.
state = data.get(
"state", ""
).upper() # Get the payment state and convert it to uppercase.
# Network fee → msat.
fee_obj = data.get("lightningNetworkFee") or data.get("totalFee") or {} # Get fee object.
fee_obj = (
data.get("lightningNetworkFee") or data.get("totalFee") or {}
) # Get fee object.
fee_btc = Decimal(fee_obj.get("amount", "0")) # Get fee amount in BTC.
fee_msat = int(fee_btc * Decimal(1e11)) # Convert fee from BTC to millisatoshis.
fee_msat = int(
fee_btc * Decimal(1e11)
) # Convert fee from BTC to millisatoshis.
# Store mapping for later polling.
if payment_id: # If payment ID is present.
self.pending_payments[payment_id] = quote_id
if state in {"SUCCEEDED", "COMPLETED"}: # If payment succeeded.
preimage = data.get("preimage") or data.get("preImage") # Get payment preimage.
return PaymentResponse(True, payment_id, fee_msat, preimage, None) # Return successful payment response.
preimage = data.get("preimage") or data.get(
"preImage"
) # Get payment preimage.
return PaymentResponse(
True, payment_id, fee_msat, preimage, None
) # Return successful payment response.
# Explicitly check for known failure states.
failed_states = {"CANCELED", "FAILED", "TIMED_OUT"} # Add any other known failure states here.
failed_states = {
"CANCELED",
"FAILED",
"TIMED_OUT",
} # Add any other known failure states here.
if state in failed_states:
return PaymentResponse(False, payment_id, None, None, f"State: {state}") # Return failed payment response with state.
return PaymentResponse(
False, payment_id, None, None, f"State: {state}"
) # Return failed payment response with state.
# Treat all other states as pending (including unknown states).
return PaymentResponse(None, payment_id, None, None, None) # Return pending payment response.
return PaymentResponse(
None, payment_id, None, None, None
) # Return pending payment response.
except httpx.HTTPStatusError as e:
error_message = e.response.json().get("message", e.response.text) # Get error message from response.
error_message = e.response.json().get(
"message", e.response.text
) # Get error message from response.
return PaymentResponse(
ok=False,
checking_id=None,
@ -336,10 +376,11 @@ class StrikeWallet(Wallet):
logger.exception("Error in pay_invoice()")
return PaymentResponse(False, None, None, None, "Connection error")
async def get_invoice_status(self, checking_id: str) -> PaymentStatus:
try:
r = await self._get(f"/receive-requests/{checking_id}/receives") # Get receive requests for the invoice.
r = await self._get(
f"/receive-requests/{checking_id}/receives"
) # Get receive requests for the invoice.
for itm in r.json().get("items", []): # Iterate through received items.
if itm.get("state") == "COMPLETED": # If an item is completed.
# Extract preimage from lightning object if available
@ -347,15 +388,23 @@ class StrikeWallet(Wallet):
lightning_data = itm.get("lightning")
if lightning_data:
preimage = lightning_data.get("preimage")
return PaymentSuccessStatus(fee_msat=0, preimage=preimage) # Return successful payment status with preimage.
return PaymentPendingStatus() # Return pending payment status if no completed items.
return PaymentSuccessStatus(
fee_msat=0, preimage=preimage
) # Return successful payment status with preimage.
return (
PaymentPendingStatus()
) # Return pending payment status if no completed items.
except httpx.HTTPStatusError as e:
if e.response.status_code == 404: # If invoice not found.
try:
r2 = await self._get(f"/v1/invoices/{checking_id}") # Try getting invoice from the old endpoint with correct path.
r2 = await self._get(
f"/v1/invoices/{checking_id}"
) # Try getting invoice from the old endpoint with correct path.
st = r2.json().get("state", "") # Get invoice state.
if st == "PAID": # If invoice is paid.
return PaymentSuccessStatus(fee_msat=0) # Return successful payment status.
return PaymentSuccessStatus(
fee_msat=0
) # Return successful payment status.
if st == "CANCELLED": # If invoice is cancelled.
return PaymentStatus(False) # Return failed payment status.
except Exception:
@ -369,16 +418,24 @@ class StrikeWallet(Wallet):
quote_id = self.pending_payments.get(checking_id)
if not quote_id: # Payment not found in pending list.
if checking_id in self.failed_payments:
return PaymentFailedStatus(paid=False) # Payment is known to have failed.
return PaymentFailedStatus(
paid=False
) # Payment is known to have failed.
return PaymentPendingStatus()
try: # Try to get payment quote.
r = await self._get(f"/payment-quotes/{quote_id}") # Get payment quote from Strike API.
r = await self._get(
f"/payment-quotes/{quote_id}"
) # Get payment quote from Strike API.
r.raise_for_status()
data = r.json() # Parse JSON response.
state = data.get("state") # Get payment state.
preimage = data.get("preimage") or data.get("preImage") # Get payment preimage.
preimage = data.get("preimage") or data.get(
"preImage"
) # Get payment preimage.
if state in ("SUCCEEDED", "COMPLETED"): # If payment succeeded.
return PaymentSuccessStatus(fee_msat=0, preimage=preimage) # Return successful payment status.
return PaymentSuccessStatus(
fee_msat=0, preimage=preimage
) # Return successful payment status.
if state == "PENDING": # If payment is pending.
return PaymentPendingStatus() # Return pending payment status.
return PaymentStatus(False) # Return failed payment status.
@ -399,10 +456,10 @@ class StrikeWallet(Wallet):
Uses dynamic adjustment of polling frequency based on activity.
"""
MIN_POLL, MAX_POLL = 1, 15
min_poll, max_poll = 1, 15
# 1,000 requests / 10 minutes = ~100 requests/minute.
RATE_LIMIT = 100
sleep_s = MIN_POLL
rate_limit = 100
sleep_s = min_poll
# Main loop for polling invoices.
self._running = True
@ -410,7 +467,9 @@ class StrikeWallet(Wallet):
loop_start = time.time()
had_activity = False
req_budget = max(1, RATE_LIMIT * sleep_s // 60) # Calculate request budget based on sleep time.
req_budget = max(
1, rate_limit * sleep_s // 60
) # Calculate request budget based on sleep time.
processed = 0
for inv in list(self.pending_invoices): # Iterate through pending invoices.
@ -419,21 +478,27 @@ class StrikeWallet(Wallet):
status = await self.get_invoice_status(inv) # Get invoice status.
processed += 1 # Increment processed count.
if status.success or status.failed: # If invoice is either successful or failed.
self.pending_invoices.remove(inv) # Remove invoice from pending list.
if (
status.success or status.failed
): # If invoice is either successful or failed.
self.pending_invoices.remove(
inv
) # Remove invoice from pending list.
if status.success: # If invoice is successful.
had_activity = True # Set activity flag.
yield inv # Yield the invoice.
# Dynamic adjustment of polling frequency based on activity.
sleep_s = (
max(MIN_POLL, sleep_s // 2) if had_activity else min(MAX_POLL, sleep_s * 2)
max(min_poll, sleep_s // 2)
if had_activity
else min(max_poll, sleep_s * 2)
)
# Sleep to respect rate limits.
elapsed = time.time() - loop_start
# Ensure we respect the rate limit, even with dynamic adjustment.
min_sleep_for_rate = processed * 60 / RATE_LIMIT - elapsed
min_sleep_for_rate = processed * 60 / rate_limit - elapsed
await asyncio.sleep(max(sleep_s, min_sleep_for_rate, 0))
# ------------------------------------------------------------------ #
@ -442,22 +507,24 @@ class StrikeWallet(Wallet):
async def get_invoices(
self,
filter: Optional[str] = None,
filters: Optional[str] = None,
orderby: Optional[str] = None,
skip: Optional[int] = None,
top: Optional[int] = None,
) -> Dict[str, Any]:
try:
params: Dict[str, Any] = {}
if filter:
params["$filter"] = filter
if filters:
params["$filter"] = filters
if orderby:
params["$orderby"] = orderby
if skip is not None:
params["$skip"] = skip
if top is not None:
params["$top"] = top
r = await self._get("/invoices", params=params) # Get invoices from Strike API.
r = await self._get(
"/invoices", params=params
) # Get invoices from Strike API.
return r.json()
except Exception:
logger.exception("Error in get_invoices()")
@ -465,7 +532,9 @@ class StrikeWallet(Wallet):
async def cancel_invoice(self, invoice_id: str) -> Dict[str, Any]:
try:
r = await self._patch(f"/invoices/{invoice_id}/cancel") # Cancel invoice on Strike.
r = await self._patch(
f"/invoices/{invoice_id}/cancel"
) # Cancel invoice on Strike.
return r.json()
except Exception:
logger.exception("Error in cancel_invoice()")
@ -473,7 +542,9 @@ class StrikeWallet(Wallet):
async def get_account_profile_by_handle(self, handle: str) -> Dict[str, Any]:
try:
r = await self._get(f"/accounts/handle/{handle}") # Get account profile by handle from Strike API.
r = await self._get(
f"/accounts/handle/{handle}"
) # Get account profile by handle from Strike API.
return r.json()
except Exception:
logger.exception("Error in get_account_profile_by_handle()")