mirror of
https://github.com/lnbits/lnbits.git
synced 2025-10-10 20:42:32 +02:00
added a helper to get the subscription id
if returned that can be used for the checking id
This commit is contained in:
@@ -26,6 +26,22 @@ from .base import (
|
|||||||
|
|
||||||
FiatMethod = Literal["checkout", "terminal"]
|
FiatMethod = Literal["checkout", "terminal"]
|
||||||
|
|
||||||
|
# ---- NEW: normalized subscription status type ----
|
||||||
|
StripeStatus = Literal[
|
||||||
|
"active",
|
||||||
|
"trialing",
|
||||||
|
"past_due",
|
||||||
|
"unpaid",
|
||||||
|
"canceled",
|
||||||
|
"incomplete",
|
||||||
|
"incomplete_expired",
|
||||||
|
"paused",
|
||||||
|
"not_found",
|
||||||
|
"pending",
|
||||||
|
"error",
|
||||||
|
"unknown",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class StripeTerminalOptions(BaseModel):
|
class StripeTerminalOptions(BaseModel):
|
||||||
class Config:
|
class Config:
|
||||||
@@ -365,6 +381,104 @@ class StripeWallet(FiatProvider):
|
|||||||
ok=False, error_message=f"Unable to connect to {self.endpoint}."
|
ok=False, error_message=f"Unable to connect to {self.endpoint}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ---------- Subscription status helpers (NEW) ----------
|
||||||
|
async def get_subscription_status(self, sub_or_session_id: str) -> StripeStatus:
|
||||||
|
"""
|
||||||
|
Accepts either a 'sub_...' or 'cs_...' id. If it's a 'cs_...',
|
||||||
|
returns 'pending' until the subscription exists; once it does,
|
||||||
|
returns the mapped subscription status.
|
||||||
|
"""
|
||||||
|
sid = self._normalize_stripe_id(sub_or_session_id)
|
||||||
|
try:
|
||||||
|
if sid.startswith("sub_"):
|
||||||
|
r = await self.client.get(f"/v1/subscriptions/{sid}")
|
||||||
|
if r.status_code == 404:
|
||||||
|
return "not_found"
|
||||||
|
r.raise_for_status()
|
||||||
|
return self._status_from_subscription(r.json())
|
||||||
|
|
||||||
|
if sid.startswith("cs_"):
|
||||||
|
r = await self.client.get(f"/v1/checkout/sessions/{sid}")
|
||||||
|
if r.status_code == 404:
|
||||||
|
return "not_found"
|
||||||
|
r.raise_for_status()
|
||||||
|
data = r.json()
|
||||||
|
subscription_id = data.get("subscription")
|
||||||
|
if not subscription_id:
|
||||||
|
return "pending"
|
||||||
|
r2 = await self.client.get(f"/v1/subscriptions/{subscription_id}")
|
||||||
|
if r2.status_code == 404:
|
||||||
|
return "not_found"
|
||||||
|
r2.raise_for_status()
|
||||||
|
return self._status_from_subscription(r2.json())
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
except httpx.HTTPStatusError:
|
||||||
|
return "error"
|
||||||
|
except Exception:
|
||||||
|
return "error"
|
||||||
|
|
||||||
|
async def get_subscription_status_and_promote(
|
||||||
|
self, sub_or_session_id: str
|
||||||
|
) -> tuple[StripeStatus, str]:
|
||||||
|
"""
|
||||||
|
Returns (status, effective_id). If given a 'cs_...' and the Checkout
|
||||||
|
Session has created a subscription, returns the subscription status
|
||||||
|
AND the promoted 'sub_...' id so you can persist it. If given a 'sub_...',
|
||||||
|
returns its status and the same id.
|
||||||
|
"""
|
||||||
|
sid = self._normalize_stripe_id(sub_or_session_id)
|
||||||
|
try:
|
||||||
|
if sid.startswith("sub_"):
|
||||||
|
r = await self.client.get(f"/v1/subscriptions/{sid}")
|
||||||
|
if r.status_code == 404:
|
||||||
|
return ("not_found", sid)
|
||||||
|
r.raise_for_status()
|
||||||
|
return (self._status_from_subscription(r.json()), sid)
|
||||||
|
|
||||||
|
if sid.startswith("cs_"):
|
||||||
|
r = await self.client.get(f"/v1/checkout/sessions/{sid}")
|
||||||
|
if r.status_code == 404:
|
||||||
|
return ("not_found", sid)
|
||||||
|
r.raise_for_status()
|
||||||
|
data = r.json()
|
||||||
|
subscription_id = data.get("subscription")
|
||||||
|
if not subscription_id:
|
||||||
|
return ("pending", sid)
|
||||||
|
|
||||||
|
# Promote to the subscription id
|
||||||
|
r2 = await self.client.get(f"/v1/subscriptions/{subscription_id}")
|
||||||
|
if r2.status_code == 404:
|
||||||
|
return ("not_found", subscription_id)
|
||||||
|
r2.raise_for_status()
|
||||||
|
return (self._status_from_subscription(r2.json()), subscription_id)
|
||||||
|
|
||||||
|
return ("unknown", sid)
|
||||||
|
|
||||||
|
except httpx.HTTPStatusError:
|
||||||
|
return ("error", sid)
|
||||||
|
except Exception:
|
||||||
|
return ("error", sid)
|
||||||
|
|
||||||
|
def _status_from_subscription(self, sub: dict) -> StripeStatus:
|
||||||
|
status = (sub or {}).get("status")
|
||||||
|
if not status:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
status = str(status).lower().strip()
|
||||||
|
known: set[str] = {
|
||||||
|
"active",
|
||||||
|
"trialing",
|
||||||
|
"past_due",
|
||||||
|
"unpaid",
|
||||||
|
"canceled",
|
||||||
|
"incomplete",
|
||||||
|
"incomplete_expired",
|
||||||
|
"paused",
|
||||||
|
}
|
||||||
|
return status if status in known else "unknown"
|
||||||
|
|
||||||
# ---------- Helpers ----------
|
# ---------- Helpers ----------
|
||||||
async def _get_price_id_by_lookup_key(self, lookup_key: str) -> str | None:
|
async def _get_price_id_by_lookup_key(self, lookup_key: str) -> str | None:
|
||||||
params = {"active": "true", "expand[]": "data.product", "limit": "1"}
|
params = {"active": "true", "expand[]": "data.product", "limit": "1"}
|
||||||
|
Reference in New Issue
Block a user