refactor: generalize SSO auth (#2263)

* refactor: first extraction of providers
* refactor: remove unused property
* refactor: extract `_find_auth_provider_class`
* fix: return type
* feat: prepare for `keycloak`
This commit is contained in:
Vlad Stan 2024-02-14 08:57:50 +02:00 committed by GitHub
parent 293b6267be
commit b8d295a5b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 68 additions and 94 deletions

View File

@ -1097,6 +1097,8 @@ async def get_admin_settings(is_super_user: bool = False) -> Optional[AdminSetti
return None
row_dict = dict(sets)
row_dict.pop("super_user")
row_dict.pop("auth_all_methods")
admin_settings = AdminSettings(
is_super_user=is_super_user,
lnbits_allowed_funding_sources=settings.lnbits_allowed_funding_sources,

View File

@ -1,10 +1,9 @@
from typing import Optional
import importlib
from typing import Callable, Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi_sso.sso.base import OpenID
from fastapi_sso.sso.github import GithubSSO
from fastapi_sso.sso.google import GoogleSSO
from fastapi_sso.sso.base import OpenID, SSOBase
from loguru import logger
from starlette.status import (
HTTP_400_BAD_REQUEST,
@ -94,72 +93,37 @@ async def login_usr(data: LoginUsr) -> JSONResponse:
raise HTTPException(HTTP_500_INTERNAL_SERVER_ERROR, "Cannot login.")
@auth_router.get("/api/v1/auth/google", description="Google SSO")
async def login_with_google(request: Request, user_id: Optional[str] = None):
google_sso = _new_google_sso()
if not google_sso:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login by 'Google' not allowed.")
google_sso.redirect_uri = str(request.base_url) + "api/v1/auth/google/token"
with google_sso:
state = encrypt_internal_message(user_id)
return await google_sso.get_login_redirect(state=state)
@auth_router.get("/api/v1/auth/github", description="Github SSO")
async def login_with_github(request: Request, user_id: Optional[str] = None):
github_sso = _new_github_sso()
if not github_sso:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login by 'GitHub' not allowed.")
github_sso.redirect_uri = str(request.base_url) + "api/v1/auth/github/token"
with github_sso:
state = decrypt_internal_message(user_id)
return await github_sso.get_login_redirect(state=state)
@auth_router.get(
"/api/v1/auth/google/token", description="Handle Google OAuth callback"
)
async def handle_google_token(request: Request) -> RedirectResponse:
google_sso = _new_google_sso()
if not google_sso:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login by 'Google' not allowed.")
try:
with google_sso:
userinfo = await google_sso.verify_and_process(request)
assert userinfo is not None
user_id = decrypt_internal_message(google_sso.state)
request.session.pop("user", None)
return await _handle_sso_login(userinfo, user_id)
except HTTPException as e:
raise e
except ValueError as e:
raise HTTPException(HTTP_403_FORBIDDEN, str(e))
except Exception as e:
logger.debug(e)
@auth_router.get("/api/v1/auth/{provider}", description="SSO Provider")
async def login_with_sso_provider(
request: Request, provider: str, user_id: Optional[str] = None
):
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot authenticate user with Google Auth."
HTTP_401_UNAUTHORIZED, f"Login by '{provider}' not allowed."
)
provider_sso.redirect_uri = str(request.base_url) + f"api/v1/auth/{provider}/token"
with provider_sso:
state = encrypt_internal_message(user_id)
return await provider_sso.get_login_redirect(state=state)
@auth_router.get(
"/api/v1/auth/github/token", description="Handle Github OAuth callback"
)
async def handle_github_token(request: Request) -> RedirectResponse:
github_sso = _new_github_sso()
if not github_sso:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login by 'GitHub' not allowed.")
@auth_router.get("/api/v1/auth/{provider}/token", description="Handle OAuth callback")
async def handle_oauth_token(request: Request, provider: str) -> RedirectResponse:
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTP_401_UNAUTHORIZED, f"Login by '{provider}' not allowed."
)
try:
with github_sso:
userinfo = await github_sso.verify_and_process(request)
with provider_sso:
userinfo = await provider_sso.verify_and_process(request)
assert userinfo is not None
user_id = decrypt_internal_message(github_sso.state)
user_id = decrypt_internal_message(provider_sso.state)
request.session.pop("user", None)
return await _handle_sso_login(userinfo, user_id)
except HTTPException as e:
raise e
except ValueError as e:
@ -167,7 +131,8 @@ async def handle_github_token(request: Request) -> RedirectResponse:
except Exception as e:
logger.debug(e)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot authenticate user with GitHub Auth."
HTTP_500_INTERNAL_SERVER_ERROR,
f"Cannot authenticate user with {provider} Auth.",
)
@ -340,29 +305,44 @@ def _auth_redirect_response(path: str, email: str) -> RedirectResponse:
return response
def _new_google_sso() -> Optional[GoogleSSO]:
if not settings.is_auth_method_allowed(AuthMethods.google_auth):
return None
if not settings.is_google_auth_configured:
logger.warning("Google Auth allowed but not configured.")
return None
return GoogleSSO(
settings.google_client_id,
settings.google_client_secret,
None,
allow_insecure_http=True,
)
def _new_sso(provider: str) -> Optional[SSOBase]:
try:
if not settings.is_auth_method_allowed(AuthMethods(f"{provider}-auth")):
return None
client_id = getattr(settings, f"{provider}_client_id", None)
client_secret = getattr(settings, f"{provider}_client_secret", None)
discovery_url = getattr(settings, f"{provider}_discovery_url", None)
if not client_id or not client_secret:
logger.warning(f"{provider} auth allowed but not configured.")
return None
SSOProviderClass = _find_auth_provider_class(provider)
ssoProvider = SSOProviderClass(
client_id, client_secret, None, allow_insecure_http=True
)
if (
discovery_url
and getattr(ssoProvider, "discovery_url", discovery_url) != discovery_url
):
ssoProvider.discovery_url = discovery_url
return ssoProvider
except Exception as e:
logger.warning(e)
return None
def _new_github_sso() -> Optional[GithubSSO]:
if not settings.is_auth_method_allowed(AuthMethods.github_auth):
return None
if not settings.is_github_auth_configured:
logger.warning("Github Auth allowed but not configured.")
return None
return GithubSSO(
settings.github_client_id,
settings.github_client_secret,
None,
allow_insecure_http=True,
)
def _find_auth_provider_class(provider: str) -> Callable:
sso_modules = ["lnbits.core.sso", "fastapi_sso.sso"]
for module in sso_modules:
try:
provider_module = importlib.import_module(f"{module}.{provider}")
ProviderClass = getattr(provider_module, f"{provider.title()}SSO")
if ProviderClass:
return ProviderClass
except Exception:
pass
raise ValueError(f"No SSO provider found for '{provider}'.")

View File

@ -282,19 +282,11 @@ class GoogleAuthSettings(LNbitsSettings):
google_client_id: str = Field(default="")
google_client_secret: str = Field(default="")
@property
def is_google_auth_configured(self):
return self.google_client_id != "" and self.google_client_secret != ""
class GitHubAuthSettings(LNbitsSettings):
github_client_id: str = Field(default="")
github_client_secret: str = Field(default="")
@property
def is_github_auth_configured(self):
return self.github_client_id != "" and self.github_client_secret != ""
class EditableSettings(
UsersSettings,