fix some tests

This commit is contained in:
dni ⚡ 2024-09-30 12:02:28 +02:00 committed by Vlad Stan
parent 3c67b0cf71
commit 4e76a62a7a
6 changed files with 114 additions and 103 deletions

View File

@ -788,9 +788,9 @@ async def create_payment(
f"""
INSERT INTO apipayments
(wallet_id, checking_id, bolt11, payment_hash, preimage,
amount, status, memo, fee, extra, webhook, expiry, pending)
amount, status, memo, fee, extra, webhook, expiry)
VALUES (:wallet_id, :checking_id, :bolt11, :hash, :preimage,
:amount, :status, :memo, :fee, :extra, :webhook, {expiry_ph}, :pending)
:amount, :status, :memo, :fee, :extra, :webhook, {expiry_ph})
""",
{
"wallet_id": data.wallet_id,
@ -809,7 +809,6 @@ async def create_payment(
),
"webhook": data.webhook,
"expiry": data.expiry if data.expiry else None,
"pending": False, # TODO: remove this in next release
},
)

View File

@ -5,7 +5,7 @@ from http import HTTPStatus
from typing import List
from fastapi import APIRouter, Depends
from starlette.exceptions import HTTPException
from fastapi.exceptions import HTTPException
from lnbits.core.crud import (
delete_account,
@ -48,24 +48,25 @@ async def api_get_users(
async def api_users_delete_user(
user_id: str, user: User = Depends(check_admin)
) -> None:
try:
wallets = await get_wallets(user_id)
if len(wallets) > 0:
raise Exception("Cannot delete user with wallets.")
if user_id == settings.super_user:
raise Exception("Cannot delete super user.")
if user_id in settings.lnbits_admin_users and not user.super_user:
raise Exception("Only super_user can delete admin user.")
await delete_account(user_id)
except Exception as exc:
wallets = await get_wallets(user_id)
if len(wallets) > 0:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"{exc!s}",
) from exc
status_code=HTTPStatus.BAD_REQUEST,
detail="Cannot delete user with wallets.",
)
if user_id == settings.super_user:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Cannot delete super user.",
)
if user_id in settings.lnbits_admin_users and not user.super_user:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Only super_user can delete admin user.",
)
await delete_account(user_id)
@users_router.put(
@ -88,66 +89,53 @@ async def api_users_reset_password(user_id: str) -> str:
@users_router.get("/user/{user_id}/admin", dependencies=[Depends(check_super_user)])
async def api_users_toggle_admin(user_id: str) -> None:
try:
if user_id == settings.super_user:
raise Exception("Cannot change super user.")
if user_id in settings.lnbits_admin_users:
settings.lnbits_admin_users.remove(user_id)
else:
settings.lnbits_admin_users.append(user_id)
update_settings = EditableSettings(
lnbits_admin_users=settings.lnbits_admin_users
)
await update_admin_settings(update_settings)
except Exception as exc:
if user_id == settings.super_user:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Could not update admin settings. {exc}",
) from exc
status_code=HTTPStatus.BAD_REQUEST,
detail="Cannot change super user.",
)
if user_id in settings.lnbits_admin_users:
settings.lnbits_admin_users.remove(user_id)
else:
settings.lnbits_admin_users.append(user_id)
update_settings = EditableSettings(lnbits_admin_users=settings.lnbits_admin_users)
await update_admin_settings(update_settings)
@users_router.get("/user/{user_id}/wallet")
async def api_users_get_user_wallet(user_id: str) -> List[Wallet]:
try:
return await get_wallets(user_id)
except Exception as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Could not fetch user wallets. {exc}",
) from exc
return await get_wallets(user_id)
@users_router.get("/user/{user_id}/wallet/{wallet}/undelete")
async def api_users_undelete_user_wallet(user_id: str, wallet: str) -> None:
try:
wal = await get_wallet(wallet)
if not wal:
raise Exception("Wallet does not exist.")
if user_id != wal.user:
raise Exception("Wallet does not belong to user.")
if wal.deleted:
await delete_wallet(user_id=user_id, wallet_id=wallet, deleted=False)
except Exception as exc:
wal = await get_wallet(wallet)
if not wal:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"{exc!s}",
) from exc
status_code=HTTPStatus.NOT_FOUND,
detail="Wallet does not exist.",
)
if user_id != wal.user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Wallet does not belong to user.",
)
if wal.deleted:
await delete_wallet(user_id=user_id, wallet_id=wallet, deleted=False)
@users_router.delete("/user/{user_id}/wallet/{wallet}")
async def api_users_delete_user_wallet(user_id: str, wallet: str) -> None:
try:
wal = await get_wallet(wallet)
if not wal:
raise Exception("Wallet does not exist.")
if wal.deleted:
await force_delete_wallet(wallet)
await delete_wallet(user_id=user_id, wallet_id=wallet)
except Exception as exc:
wal = await get_wallet(wallet)
if not wal:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"{exc!s}",
) from exc
status_code=HTTPStatus.NOT_FOUND,
detail="Wallet does not exist.",
)
if wal.deleted:
await force_delete_wallet(wallet)
await delete_wallet(user_id=user_id, wallet_id=wallet)
@users_router.put(
@ -157,14 +145,9 @@ async def api_users_delete_user_wallet(user_id: str, wallet: str) -> None:
dependencies=[Depends(check_super_user)],
)
async def api_topup_balance(data: CreateTopup) -> dict[str, str]:
try:
await get_wallet(data.id)
if settings.lnbits_backend_wallet_class == "VoidWallet":
raise Exception("VoidWallet active")
await get_wallet(data.id)
if settings.lnbits_backend_wallet_class == "VoidWallet":
raise Exception("VoidWallet active")
await update_wallet_balance(wallet_id=data.id, amount=int(data.amount))
return {"status": "Success"}
except Exception as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"{exc!s}"
) from exc
await update_wallet_balance(wallet_id=data.id, amount=int(data.amount))
return {"status": "Success"}

View File

@ -607,10 +607,14 @@ def model_to_dict(model: BaseModel) -> dict:
type_ = model.__fields__[key].type_
if type(type_) is type(BaseModel):
_dict[key] = json.dumps(value)
continue
_dict[key] = value
return _dict
def dict_to_submodel(model: type[TModel], value: Union[dict, str]) -> Optional[TModel]:
"""convert a dictionary or JSON string to a Pydantic model"""
if isinstance(value, str):
if value == "null":
return None
@ -635,10 +639,11 @@ def dict_to_model(_row: dict, model: type[TModel]) -> TModel:
if key not in model.__fields__:
logger.warning(f"Converting {key} to model `{model}`.")
continue
if not value:
continue
type_ = model.__fields__[key].type_
if issubclass(type_, BaseModel):
if issubclass(type_, bool):
_dict[key] = bool(value)
continue
if issubclass(type_, BaseModel) and value:
_dict[key] = dict_to_submodel(type_, value)
continue
_dict[key] = value

View File

@ -1,4 +1,5 @@
import hashlib
import json
import pytest
@ -144,7 +145,7 @@ async def test_create_invoice_fiat_amount(client, inkey_headers_to):
)
assert response.is_success
res_data = response.json()
extra = res_data["details"]["extra"]
extra = json.loads(res_data["details"]["extra"])
assert extra["fiat_amount"] == data["amount"]
assert extra["fiat_currency"] == data["unit"]
assert extra["fiat_rate"]
@ -501,13 +502,15 @@ async def test_fiat_tracking(client, adminkey_headers_from):
settings.lnbits_default_accounting_currency = "USD"
payment = await create_invoice()
assert payment["extra"]["wallet_fiat_currency"] == "USD"
assert payment["extra"]["wallet_fiat_amount"] != payment["amount"]
assert payment["extra"]["wallet_fiat_rate"]
extra = json.loads(payment["extra"])
assert extra["wallet_fiat_currency"] == "USD"
assert extra["wallet_fiat_amount"] != payment["amount"]
assert extra["wallet_fiat_rate"]
await update_currency("EUR")
payment = await create_invoice()
assert payment["extra"]["wallet_fiat_currency"] == "EUR"
assert payment["extra"]["wallet_fiat_amount"] != payment["amount"]
assert payment["extra"]["wallet_fiat_rate"]
extra = json.loads(payment["extra"])
assert extra["wallet_fiat_currency"] == "EUR"
assert extra["wallet_fiat_amount"] != payment["amount"]
assert extra["wallet_fiat_rate"]

View File

@ -24,6 +24,12 @@ class DbTestModel2(BaseModel):
child: DbTestModel
class DbTestModel3(BaseModel):
id: int
child: DbTestModel2
active: bool = False
def get_random_string(iterations: int = 10):
return "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits)

View File

@ -8,48 +8,63 @@ from lnbits.db import (
model_to_dict,
update_query,
)
from tests.helpers import DbTestModel, DbTestModel2
from tests.helpers import DbTestModel, DbTestModel2, DbTestModel3
test_data = DbTestModel2(
test_data = DbTestModel3(
id=1,
label="test",
description="mydesc",
child=DbTestModel(id=2, name="myname", value="myvalue"),
child=DbTestModel2(
id=2,
label="test",
description="mydesc",
child=DbTestModel(id=3, name="myname", value="myvalue"),
),
active=True,
)
@pytest.mark.asyncio
async def test_helpers_insert_query():
q = insert_query("test_helpers_query", test_data)
assert (
q == "INSERT INTO test_helpers_query (id, label, description, child) "
"VALUES (:id, :label, :description, :child)"
assert q == (
"INSERT INTO test_helpers_query (id, child, active) "
"VALUES (:id, :child, :active)"
)
@pytest.mark.asyncio
async def test_helpers_update_query():
q = update_query("test_helpers_query", test_data)
assert (
q == "UPDATE test_helpers_query "
"SET id = :id, label = :label, description = :description, child = :child "
"WHERE id = :id"
assert q == (
"UPDATE test_helpers_query "
"SET id = :id, child = :child, active = :active WHERE id = :id"
)
child_dict = json.dumps({"id": 2, "name": "myname", "value": "myvalue"})
test_dict = {"id": 1, "label": "test", "description": "mydesc", "child": child_dict}
child_json = json.dumps(
{
"id": 2,
"label": "test",
"description": "mydesc",
"child": {"id": 3, "name": "myname", "value": "myvalue"},
}
)
test_dict = {"id": 1, "child": child_json, "active": True}
@pytest.mark.asyncio
async def test_helpers_model_to_dict():
d = model_to_dict(test_data)
assert d.get("id") == test_data.id
assert d.get("active") == test_data.active
assert d.get("child") == child_json
assert d == test_dict
@pytest.mark.asyncio
async def test_helpers_dict_to_model():
m = dict_to_model(test_dict, DbTestModel2)
m = dict_to_model(test_dict, DbTestModel3)
assert m == test_data
assert type(m) is DbTestModel2
assert type(m.child) is DbTestModel
assert type(m) is DbTestModel3
assert m.active is True
assert type(m.child) is DbTestModel2
assert type(m.child.child) is DbTestModel