fix: settings

This commit is contained in:
Vlad Stan 2024-10-02 19:08:27 +03:00 committed by dni ⚡
parent 979ed64682
commit ce3c260e57
No known key found for this signature in database
GPG Key ID: D1F416F29AD26E87
4 changed files with 78 additions and 63 deletions

View File

@ -198,20 +198,19 @@ async def update_pubkey(
) -> Optional[User]:
if data.user_id != user.id:
raise HTTPException(HTTPStatus.BAD_REQUEST, "Invalid user ID.")
_validate_auth_timeout(payload.auth_time)
if (
data.pubkey
and data.pubkey != user.pubkey
and await get_account_by_pubkey(data.pubkey)
):
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Public key already in use."
)
raise HTTPException(HTTPStatus.BAD_REQUEST, "Public key already in use.")
account = await get_account(user.id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Account not found."
)
_validate_auth_timeout(payload.auth_time)
raise HTTPException(HTTPStatus.NOT_FOUND, "Account not found.")
account.pubkey = normalize_public_key(data.pubkey)
await update_account(account)
return await get_user(account)
@ -223,9 +222,8 @@ async def update_password(
user: User = Depends(check_user_exists),
payload: AccessTokenPayload = Depends(access_token_payload),
) -> Optional[User]:
if data.user_id != user.id:
raise HTTPException(HTTPStatus.BAD_REQUEST, "Invalid user ID.")
_validate_auth_timeout(payload.auth_time)
assert data.user_id == user.id, "Invalid user ID."
if (
data.username
and user.username != data.username

View File

@ -6,7 +6,7 @@ import pytest
from lnbits import bolt11
from lnbits.core.models import CreateInvoice, Payment
from lnbits.core.views.payment_api import api_payment
from lnbits.settings import settings
from lnbits.settings import Settings
from ..helpers import (
get_random_invoice_data,
@ -15,7 +15,7 @@ from ..helpers import (
# create account POST /api/v1/account
@pytest.mark.asyncio
async def test_create_account(client):
async def test_create_account(client, settings: Settings):
settings.lnbits_allow_new_accounts = False
response = await client.post("/api/v1/account", json={"name": "test"})
assert response.status_code == 403
@ -476,7 +476,7 @@ async def test_update_wallet(client, adminkey_headers_from):
@pytest.mark.asyncio
async def test_fiat_tracking(client, adminkey_headers_from):
async def test_fiat_tracking(client, adminkey_headers_from, settings: Settings):
async def create_invoice():
data = await get_random_invoice_data()
response = await client.post(

View File

@ -11,7 +11,7 @@ from httpx import AsyncClient
from lnbits.core.models import AccessTokenPayload, User
from lnbits.core.views.user_api import api_users_reset_password
from lnbits.settings import AuthMethods, settings
from lnbits.settings import AuthMethods, Settings
from lnbits.utils.nostr import hex_to_npub, sign_event
nostr_event = {
@ -29,8 +29,6 @@ private_key = secp256k1.PrivateKey(
)
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
settings.auth_allowed_methods = AuthMethods.all()
################################ LOGIN ################################
@pytest.mark.asyncio
@ -63,16 +61,18 @@ async def test_login_alan_usr(user_alan: User, http_client: AsyncClient):
@pytest.mark.asyncio
async def test_login_usr_not_allowed(user_alan: User, http_client: AsyncClient):
async def test_login_usr_not_allowed(
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
# exclude 'user_id_only'
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
lnbits_settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 401, "Login method not allowed."
assert response.json().get("detail") == "Login by 'User ID' not allowed."
settings.auth_allowed_methods = AuthMethods.all()
lnbits_settings.auth_allowed_methods = AuthMethods.all()
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Login with 'usr' allowed."
@ -83,7 +83,7 @@ async def test_login_usr_not_allowed(user_alan: User, http_client: AsyncClient):
@pytest.mark.asyncio
async def test_login_alan_username_password_ok(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
@ -93,7 +93,7 @@ async def test_login_alan_username_password_ok(
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.sub == "alan", "Subject is Alan."
@ -142,10 +142,10 @@ async def test_login_alan_password_nok(user_alan: User, http_client: AsyncClient
@pytest.mark.asyncio
async def test_login_username_password_not_allowed(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
# exclude 'username_password'
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
lnbits_settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
@ -156,7 +156,7 @@ async def test_login_username_password_not_allowed(
response.json().get("detail") == "Login by 'Username and Password' not allowed."
)
settings.auth_allowed_methods = AuthMethods.all()
lnbits_settings.auth_allowed_methods = AuthMethods.all()
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
@ -167,7 +167,7 @@ async def test_login_username_password_not_allowed(
@pytest.mark.asyncio
async def test_login_alan_change_auth_secret_key(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
@ -177,9 +177,9 @@ async def test_login_alan_change_auth_secret_key(
access_token = response.json().get("access_token")
assert access_token is not None
initial_auth_secret_key = settings.auth_secret_key
initial_auth_secret_key = lnbits_settings.auth_secret_key
settings.auth_secret_key = shortuuid.uuid()
lnbits_settings.auth_secret_key = shortuuid.uuid()
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
@ -187,7 +187,7 @@ async def test_login_alan_change_auth_secret_key(
assert response.status_code == 401, "Access token not valid anymore."
assert response.json().get("detail") == "Invalid access token."
settings.auth_secret_key = initial_auth_secret_key
lnbits_settings.auth_secret_key = initial_auth_secret_key
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
@ -326,7 +326,7 @@ async def test_register_bad_email(http_client: AsyncClient):
################################ CHANGE PASSWORD ################################
@pytest.mark.asyncio
async def test_change_password_ok(http_client: AsyncClient):
async def test_change_password_ok(http_client: AsyncClient, lnbits_settings: Settings):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
@ -342,7 +342,7 @@ async def test_change_password_ok(http_client: AsyncClient):
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
response = await http_client.put(
@ -447,7 +447,7 @@ async def test_alan_change_password_different_user(
@pytest.mark.asyncio
async def test_alan_change_password_auth_threshold_expired(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
@ -456,7 +456,7 @@ async def test_alan_change_password_auth_threshold_expired(
access_token = response.json().get("access_token")
assert access_token is not None
settings.auth_credetials_update_threshold = 1
lnbits_settings.auth_credetials_update_threshold = 1
time.sleep(1.1)
response = await http_client.put(
"/api/v1/auth/password",
@ -482,7 +482,7 @@ async def test_alan_change_password_auth_threshold_expired(
@pytest.mark.asyncio
async def test_register_nostr_ok(http_client: AsyncClient):
async def test_register_nostr_ok(http_client: AsyncClient, lnbits_settings: Settings):
event = {**nostr_event}
event["created_at"] = int(time.time())
@ -498,7 +498,7 @@ async def test_register_nostr_ok(http_client: AsyncClient):
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.auth_time, "Auth time should be set by server."
assert (
@ -522,9 +522,11 @@ async def test_register_nostr_ok(http_client: AsyncClient):
@pytest.mark.asyncio
async def test_register_nostr_not_allowed(http_client: AsyncClient):
async def test_register_nostr_not_allowed(
http_client: AsyncClient, lnbits_settings: Settings
):
# exclude 'nostr_auth_nip98'
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
lnbits_settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
response = await http_client.post(
"/api/v1/auth/nostr",
json={},
@ -533,7 +535,7 @@ async def test_register_nostr_not_allowed(http_client: AsyncClient):
assert response.status_code == 401, "User not authenticated."
assert response.json().get("detail") == "Login with Nostr Auth not allowed."
settings.auth_allowed_methods = AuthMethods.all()
lnbits_settings.auth_allowed_methods = AuthMethods.all()
@pytest.mark.asyncio
@ -560,8 +562,10 @@ async def test_register_nostr_bad_header(http_client: AsyncClient):
@pytest.mark.asyncio
async def test_register_nostr_bad_event(http_client: AsyncClient):
settings.auth_allowed_methods = AuthMethods.all()
async def test_register_nostr_bad_event(
http_client: AsyncClient, lnbits_settings: Settings
):
lnbits_settings.auth_allowed_methods = AuthMethods.all()
base64_event = base64.b64encode(json.dumps(nostr_event).encode()).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
@ -570,7 +574,7 @@ async def test_register_nostr_bad_event(http_client: AsyncClient):
assert response.status_code == 400, "Nostr event expired."
assert (
response.json().get("detail")
== f"More than {settings.auth_credetials_update_threshold}"
== f"More than {lnbits_settings.auth_credetials_update_threshold}"
" seconds have passed since the event was signed."
)
@ -672,7 +676,9 @@ async def test_register_nostr_bad_event_tag_menthod(http_client: AsyncClient):
################################ CHANGE PUBLIC KEY ################################
async def test_change_pubkey_npub_ok(http_client: AsyncClient):
async def test_change_pubkey_npub_ok(
http_client: AsyncClient, lnbits_settings: Settings
):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
@ -688,7 +694,7 @@ async def test_change_pubkey_npub_ok(http_client: AsyncClient):
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
@ -712,7 +718,9 @@ async def test_change_pubkey_npub_ok(http_client: AsyncClient):
@pytest.mark.asyncio
async def test_change_pubkey_ok(http_client: AsyncClient, user_alan: User):
async def test_change_pubkey_ok(
http_client: AsyncClient, user_alan: User, lnbits_settings: Settings
):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
@ -728,7 +736,7 @@ async def test_change_pubkey_ok(http_client: AsyncClient, user_alan: User):
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
@ -834,7 +842,7 @@ async def test_change_pubkey_other_user(http_client: AsyncClient, user_alan: Use
@pytest.mark.asyncio
async def test_alan_change_pubkey_auth_threshold_expired(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
@ -843,8 +851,8 @@ async def test_alan_change_pubkey_auth_threshold_expired(
access_token = response.json().get("access_token")
assert access_token is not None
settings.auth_credetials_update_threshold = 1
time.sleep(1.1)
lnbits_settings.auth_credetials_update_threshold = 1
time.sleep(2.1)
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
@ -864,7 +872,9 @@ async def test_alan_change_pubkey_auth_threshold_expired(
################################ RESET PASSWORD ################################
@pytest.mark.asyncio
async def test_request_reset_key_ok(http_client: AsyncClient):
async def test_request_reset_key_ok(
http_client: AsyncClient, lnbits_settings: Settings
):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
@ -880,7 +890,7 @@ async def test_request_reset_key_ok(http_client: AsyncClient):
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
payload: dict = jwt.decode(access_token, lnbits_settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.usr, "User id set."
@ -936,9 +946,11 @@ async def test_request_reset_key_user_not_found(http_client: AsyncClient):
@pytest.mark.asyncio
async def test_reset_username_password_not_allowed(http_client: AsyncClient):
async def test_reset_username_password_not_allowed(
http_client: AsyncClient, lnbits_settings: Settings
):
# exclude 'username_password'
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
lnbits_settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
user_id = "926abb2ab59a48ebb2485bcceb58d05e"
reset_key = await api_users_reset_password(user_id)
@ -952,7 +964,7 @@ async def test_reset_username_password_not_allowed(http_client: AsyncClient):
"password_repeat": "secret0000",
},
)
settings.auth_allowed_methods = AuthMethods.all()
lnbits_settings.auth_allowed_methods = AuthMethods.all()
assert response.status_code == 401, "Login method not allowed."
assert (
@ -998,13 +1010,13 @@ async def test_reset_username_password_bad_key(http_client: AsyncClient):
@pytest.mark.asyncio
async def test_reset_password_auth_threshold_expired(
user_alan: User, http_client: AsyncClient
user_alan: User, http_client: AsyncClient, lnbits_settings: Settings
):
reset_key = await api_users_reset_password(user_alan.id)
assert reset_key, "Reset key created."
settings.auth_credetials_update_threshold = 1
lnbits_settings.auth_credetials_update_threshold = 1
time.sleep(1.1)
response = await http_client.put(
"/api/v1/auth/reset",

View File

@ -27,7 +27,7 @@ from lnbits.core.models import Account, CreateInvoice, PaymentState
from lnbits.core.services import create_user_account, update_wallet_balance
from lnbits.core.views.payment_api import api_payments_create_invoice
from lnbits.db import DB_TYPE, SQLITE, Database
from lnbits.settings import AuthMethods, settings
from lnbits.settings import AuthMethods, Settings, settings
from tests.helpers import (
get_random_invoice_data,
)
@ -40,19 +40,24 @@ settings.lnbits_extensions_default_install = []
settings.lnbits_extensions_deactivate_all = True
@pytest_asyncio.fixture(scope="session")
def lnbits_settings():
return settings
@pytest.fixture(autouse=True)
def run_before_and_after_tests():
def run_before_and_after_tests(lnbits_settings: Settings):
"""Fixture to execute asserts before and after a test is run"""
##### BEFORE TEST RUN #####
settings.lnbits_allow_new_accounts = True
settings.lnbits_allowed_users = []
settings.auth_allowed_methods = AuthMethods.all()
settings.auth_credetials_update_threshold = 120
settings.lnbits_reserve_fee_percent = 1
settings.lnbits_reserve_fee_min = 2000
settings.lnbits_service_fee = 0
settings.lnbits_wallet_limit_daily_max_withdraw = 0
lnbits_settings.lnbits_allow_new_accounts = True
lnbits_settings.lnbits_allowed_users = []
lnbits_settings.auth_allowed_methods = AuthMethods.all()
lnbits_settings.auth_credetials_update_threshold = 120
lnbits_settings.lnbits_reserve_fee_percent = 1
lnbits_settings.lnbits_reserve_fee_min = 2000
lnbits_settings.lnbits_service_fee = 0
lnbits_settings.lnbits_wallet_limit_daily_max_withdraw = 0
yield # this is where the testing happens