fix: better differentiation between UNAUTHORIZED and FORBIDDEN (#3139)

This commit is contained in:
Vlad Stan 2025-05-05 11:55:58 +03:00 committed by GitHub
parent 6a9089fd98
commit 3529f9152f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 39 additions and 45 deletions

View File

@ -74,7 +74,7 @@ async def get_auth_user(user: User = Depends(check_user_exists)) -> User:
async def login(data: LoginUsernamePassword) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "Login by 'Username and Password' not allowed."
HTTPStatus.FORBIDDEN, "Login by 'Username and Password' not allowed."
)
account = await get_account_by_username_or_email(data.username)
if not account or not account.verify_password(data.password):
@ -85,9 +85,7 @@ async def login(data: LoginUsernamePassword) -> JSONResponse:
@auth_router.post("/nostr", description="Login via Nostr")
async def nostr_login(request: Request) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.nostr_auth_nip98):
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "Login with Nostr Auth not allowed."
)
raise HTTPException(HTTPStatus.FORBIDDEN, "Login with Nostr Auth not allowed.")
event = _nostr_nip98_event(request)
account = await get_account_by_pubkey(event["pubkey"])
if not account:
@ -104,7 +102,7 @@ async def nostr_login(request: Request) -> JSONResponse:
async def login_usr(data: LoginUsr) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.user_id_only):
raise HTTPException(
HTTPStatus.UNAUTHORIZED,
HTTPStatus.FORBIDDEN,
"Login by 'User ID' not allowed.",
)
account = await get_account(data.usr)
@ -112,7 +110,7 @@ async def login_usr(data: LoginUsr) -> JSONResponse:
raise HTTPException(HTTPStatus.UNAUTHORIZED, "User ID does not exist.")
if account.is_admin:
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "Admin users cannot login with user id only."
HTTPStatus.FORBIDDEN, "Admin users cannot login with user id only."
)
return _auth_success_response(account.username, account.id, account.email)
@ -242,7 +240,7 @@ async def login_with_sso_provider(
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTPStatus.UNAUTHORIZED,
HTTPStatus.FORBIDDEN,
f"Login by '{provider}' not allowed.",
)
@ -257,7 +255,7 @@ async def handle_oauth_token(request: Request, provider: str) -> RedirectRespons
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTPStatus.UNAUTHORIZED,
HTTPStatus.FORBIDDEN,
f"Login by '{provider}' not allowed.",
)
@ -285,7 +283,7 @@ async def logout() -> JSONResponse:
async def register(data: RegisterUser) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTPStatus.UNAUTHORIZED,
HTTPStatus.FORBIDDEN,
"Register by 'Username and Password' not allowed.",
)
@ -375,7 +373,7 @@ async def update_password(
async def reset_password(data: ResetUserPassword) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "Auth by 'Username and Password' not allowed."
HTTPStatus.FORBIDDEN, "Auth by 'Username and Password' not allowed."
)
assert data.password == data.password_repeat, "Passwords do not match."
@ -449,7 +447,7 @@ async def update(
@auth_router.put("/first_install")
async def first_install(data: UpdateSuperuserPassword) -> JSONResponse:
if not settings.first_install:
raise HTTPException(HTTPStatus.UNAUTHORIZED, "This is not your first install")
raise HTTPException(HTTPStatus.FORBIDDEN, "This is not your first install")
account = await get_account(settings.super_user)
if not account:
raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, "Superuser not found.")
@ -472,10 +470,10 @@ async def _handle_sso_login(userinfo: OpenID, verified_user_id: Optional[str] =
if verified_user_id:
if account:
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Email already used.")
raise HTTPException(HTTPStatus.FORBIDDEN, "Email already used.")
account = await get_account(verified_user_id)
if not account:
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Cannot verify user email.")
raise HTTPException(HTTPStatus.FORBIDDEN, "Cannot verify user email.")
redirect_path = "/account"
if account:
@ -580,10 +578,10 @@ def _find_auth_provider_class(provider: str) -> Callable:
def _nostr_nip98_event(request: Request) -> dict:
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Nostr Auth header missing.")
raise HTTPException(HTTPStatus.BAD_REQUEST, "Nostr Auth header missing.")
scheme, token = auth_header.split()
if scheme.lower() != "nostr":
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Invalid Authorization scheme.")
raise HTTPException(HTTPStatus.BAD_REQUEST, "Invalid Authorization scheme.")
event = None
try:
event_json = base64.b64decode(token.encode("ascii"))

View File

@ -155,7 +155,7 @@ async def api_payments_daily_stats(
if not user.admin:
exc = HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
status_code=HTTPStatus.FORBIDDEN,
detail="Missing wallet id.",
)
wallet_filter = next(
@ -322,7 +322,7 @@ async def api_payments_create(
return await _api_payments_create_invoice(invoice_data, wallet.wallet)
else:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
status_code=HTTPStatus.FORBIDDEN,
detail="Invoice (or Admin) key required.",
)

View File

@ -102,7 +102,7 @@ class KeyChecker(SecurityBase):
request.scope["user_id"] = wallet.user
if self.expected_key_type is KeyType.admin and wallet.adminkey != key_value:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
status_code=HTTPStatus.FORBIDDEN,
detail="Invalid adminkey.",
)
@ -165,7 +165,7 @@ async def check_user_exists(
r.scope["user_id"] = account.id
if not settings.is_user_allowed(account.id):
raise HTTPException(HTTPStatus.UNAUTHORIZED, "User not allowed.")
raise HTTPException(HTTPStatus.FORBIDDEN, "User not allowed.")
user = await get_user_from_account(account)
if not user:
@ -201,7 +201,7 @@ async def access_token_payload(
async def check_admin(user: Annotated[User, Depends(check_user_exists)]) -> User:
if user.id != settings.super_user and user.id not in settings.lnbits_admin_users:
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "User not authorized. No admin privileges."
HTTPStatus.FORBIDDEN, "User not authorized. No admin privileges."
)
if not user.has_password:
raise HTTPException(
@ -214,7 +214,7 @@ async def check_admin(user: Annotated[User, Depends(check_user_exists)]) -> User
async def check_super_user(user: Annotated[User, Depends(check_user_exists)]) -> User:
if user.id != settings.super_user:
raise HTTPException(
HTTPStatus.UNAUTHORIZED, "User not authorized. No super user privileges."
HTTPStatus.FORBIDDEN, "User not authorized. No super user privileges."
)
if not user.has_password:
raise HTTPException(

View File

@ -65,19 +65,15 @@
goToExtension() {
window.location.href = `/extensions#${this.extension}`
},
logOut() {
LNbits.utils
.confirmDialog(
'Do you really want to logout?'
)
.onOk( async () => {
try {
await LNbits.api.logout()
window.location = '/'
} catch (e) {
LNbits.utils.notifyApiError(e)
}
})
async logOut() {
try {
await LNbits.api.logout()
window.location = '/'
} catch (e) {
LNbits.utils.notifyApiError(e)
}
},
},
computed: {
@ -86,13 +82,13 @@
if (this.message.startsWith('Extension ')) return true
}
},
created() {
async created() {
this.err = '{{ err }}'
const statusCode = '{{ status_code }}' || 404
this.message = String({{ message | tojson }}) || 'Page not found'
if (statusCode == 401) {
console.warn(`Unauthorized: ${this.message}`)
this.goHome()
this.logOut()
return
}
this.statusCode = statusCode

View File

@ -7,7 +7,7 @@ from lnbits.settings import Settings
@pytest.mark.anyio
async def test_admin_get_settings_permission_denied(client, from_user):
response = await client.get(f"/admin/api/v1/settings?usr={from_user.id}")
assert response.status_code == 401
assert response.status_code == 403
@pytest.mark.anyio

View File

@ -102,7 +102,7 @@ async def test_login_usr_not_allowed_for_admin_without_credentials(
# Attempt to login with user ID for admin
response = await http_client.post("/api/v1/auth/usr", json=login_data.dict())
assert response.status_code == 401
assert response.status_code == 403
assert (
response.json().get("detail") == "Admin users cannot login with user id only."
)
@ -135,7 +135,7 @@ async def test_login_usr_not_allowed(
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 401, "Login method not allowed."
assert response.status_code == 403, "Login method not allowed."
assert response.json().get("detail") == "Login by 'User ID' not allowed."
settings.auth_allowed_methods = AuthMethods.all()
@ -217,7 +217,7 @@ async def test_login_username_password_not_allowed(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 401, "Login method not allowed."
assert response.status_code == 403, "Login method not allowed."
assert (
response.json().get("detail") == "Login by 'Username and Password' not allowed."
)
@ -597,7 +597,7 @@ async def test_register_nostr_not_allowed(http_client: AsyncClient, settings: Se
json={},
)
assert response.status_code == 401, "User not authenticated."
assert response.status_code == 403, "User not authenticated."
assert response.json().get("detail") == "Login with Nostr Auth not allowed."
settings.auth_allowed_methods = AuthMethods.all()
@ -607,7 +607,7 @@ async def test_register_nostr_not_allowed(http_client: AsyncClient, settings: Se
async def test_register_nostr_bad_header(http_client: AsyncClient):
response = await http_client.post("/api/v1/auth/nostr")
assert response.status_code == 401, "Missing header."
assert response.status_code == 400, "Missing header."
assert response.json().get("detail") == "Nostr Auth header missing."
response = await http_client.post(
@ -615,7 +615,7 @@ async def test_register_nostr_bad_header(http_client: AsyncClient):
headers={"Authorization": "Bearer xyz"},
)
assert response.status_code == 401, "Non nostr header."
assert response.status_code == 400, "Non nostr header."
assert response.json().get("detail") == "Invalid Authorization scheme."
response = await http_client.post(
@ -1028,7 +1028,7 @@ async def test_reset_username_password_not_allowed(
)
settings.auth_allowed_methods = AuthMethods.all()
assert response.status_code == 401, "Login method not allowed."
assert response.status_code == 403, "Login method not allowed."
assert (
response.json().get("detail") == "Auth by 'Username and Password' not allowed."
)

View File

@ -76,7 +76,7 @@ async def test_check_user_exists_with_user_not_allowed(user_alan: User):
settings.lnbits_allowed_users = ["only_this_user_id"]
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=None, usr=UUID4(user_alan.id))
assert exc_info.value.status_code == 401
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "User not allowed."