Enable non-admin credentials + add page for google drive (#84)

* Enable non-admin credentials + add page for google drive

* Return one indexing status entry for each connector / credential pair

* Remove some logs

* Small fixes

* Sort index status by source
This commit is contained in:
Chris Weaver
2023-06-04 11:26:50 -07:00
committed by GitHub
parent 8c9b3079aa
commit 7cc64efc3a
33 changed files with 1072 additions and 644 deletions

View File

@@ -61,9 +61,7 @@ def verify_csrf(credential_id: int, state: str) -> None:
)
def get_auth_url(
credential_id: int,
) -> str:
def get_auth_url(credential_id: int) -> str:
creds_str = str(get_dynamic_config_store().load(GOOGLE_DRIVE_CRED_KEY))
credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config(

View File

@@ -47,7 +47,6 @@ def create_collection(
raise RuntimeError("Could not create Qdrant collection")
@log_function_time()
def get_document_whitelists(
doc_chunk_id: str, collection_name: str, q_client: QdrantClient
) -> tuple[int, list[str], list[str]]:
@@ -66,7 +65,6 @@ def get_document_whitelists(
return len(results), payload[ALLOWED_USERS], payload[ALLOWED_GROUPS]
@log_function_time()
def delete_doc_chunks(
document_id: str, collection_name: str, q_client: QdrantClient
) -> None:

View File

@@ -272,10 +272,12 @@ def fetch_latest_index_attempts_by_status(
subquery = (
db_session.query(
IndexAttempt.connector_id,
IndexAttempt.credential_id,
IndexAttempt.status,
func.max(IndexAttempt.time_updated).label("time_updated"),
)
.group_by(IndexAttempt.connector_id)
.group_by(IndexAttempt.credential_id)
.group_by(IndexAttempt.status)
.subquery()
)
@@ -286,6 +288,7 @@ def fetch_latest_index_attempts_by_status(
alias,
and_(
IndexAttempt.connector_id == alias.connector_id,
IndexAttempt.credential_id == alias.credential_id,
IndexAttempt.status == alias.status,
IndexAttempt.time_updated == alias.time_updated,
),

View File

@@ -96,9 +96,6 @@ def update_credential_json(
user: User,
db_session: Session,
) -> Credential | None:
logger.info("HIIII")
logger.info(credential_id)
logger.info(credential_json)
credential = fetch_credential_by_id(credential_id, user, db_session)
if credential is None:
return None

View File

@@ -12,9 +12,9 @@ from danswer.configs.app_configs import SECRET
from danswer.configs.app_configs import WEB_DOMAIN
from danswer.datastores.qdrant.indexing import list_collections
from danswer.db.credentials import create_initial_public_credential
from danswer.server.admin import router as admin_router
from danswer.server.event_loading import router as event_processing_router
from danswer.server.health import router as health_router
from danswer.server.manage import router as admin_router
from danswer.server.search_backend import router as backend_router
from danswer.utils.logging import setup_logger
from fastapi import FastAPI

View File

@@ -2,6 +2,7 @@ from collections import defaultdict
from typing import cast
from danswer.auth.users import current_admin_user
from danswer.auth.users import current_user
from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import OPENAI_API_KEY_STORAGE_KEY
@@ -32,6 +33,7 @@ from danswer.db.credentials import mask_credential_dict
from danswer.db.credentials import update_credential
from danswer.db.engine import get_session
from danswer.db.index_attempt import create_index_attempt
from danswer.db.models import Connector
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
@@ -61,12 +63,16 @@ from fastapi import Request
from fastapi import Response
from sqlalchemy.orm import Session
router = APIRouter(prefix="/admin")
router = APIRouter(prefix="/manage")
logger = setup_logger()
_GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME = "google_drive_credential_id"
@router.get("/connector/google-drive/app-credential")
"""Admin only API endpoints"""
@router.get("/admin/connector/google-drive/app-credential")
def check_google_app_credentials_exist(
_: User = Depends(current_admin_user),
) -> dict[str, str]:
@@ -76,7 +82,7 @@ def check_google_app_credentials_exist(
raise HTTPException(status_code=404, detail="Google App Credentials not found")
@router.put("/connector/google-drive/app-credential")
@router.put("/admin/connector/google-drive/app-credential")
def update_google_app_credentials(
app_credentials: GoogleAppCredentials, _: User = Depends(current_admin_user)
) -> StatusResponse:
@@ -90,7 +96,7 @@ def update_google_app_credentials(
)
@router.get("/connector/google-drive/check-auth/{credential_id}")
@router.get("/admin/connector/google-drive/check-auth/{credential_id}")
def check_drive_tokens(
credential_id: int,
user: User = Depends(current_admin_user),
@@ -109,11 +115,8 @@ def check_drive_tokens(
return AuthStatus(authenticated=True)
_GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME = "google_drive_credential_id"
@router.get("/connector/google-drive/authorize/{credential_id}", response_model=AuthUrl)
def google_drive_auth(
@router.get("/admin/connector/google-drive/authorize/{credential_id}")
def admin_google_drive_auth(
response: Response, credential_id: str, _: User = Depends(current_admin_user)
) -> AuthUrl:
# set a cookie that we can read in the callback (used for `verify_csrf`)
@@ -123,35 +126,10 @@ def google_drive_auth(
httponly=True,
max_age=600,
)
return AuthUrl(auth_url=get_auth_url(int(credential_id)))
return AuthUrl(auth_url=get_auth_url(credential_id=int(credential_id)))
@router.get("/connector/google-drive/callback")
def google_drive_callback(
request: Request,
callback: GDriveCallback = Depends(),
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse:
credential_id_cookie = request.cookies.get(_GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME)
if credential_id_cookie is None or not credential_id_cookie.isdigit():
raise HTTPException(
status_code=401, detail="Request did not pass CSRF verification."
)
credential_id = int(credential_id_cookie)
verify_csrf(credential_id, callback.state)
if (
update_credential_access_tokens(callback.code, credential_id, user, db_session)
is None
):
raise HTTPException(
status_code=500, detail="Unable to fetch Google Drive access tokens"
)
return StatusResponse(success=True, message="Updated Google Drive access tokens")
@router.get("/latest-index-attempt", response_model=list[IndexAttemptSnapshot])
@router.get("/admin/latest-index-attempt")
def list_all_index_attempts(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
@@ -173,7 +151,7 @@ def list_all_index_attempts(
]
@router.get("/latest-index-attempt/{source}", response_model=list[IndexAttemptSnapshot])
@router.get("/admin/latest-index-attempt/{source}")
def list_index_attempts(
source: DocumentSource,
_: User = Depends(current_admin_user),
@@ -196,38 +174,39 @@ def list_index_attempts(
]
@router.get("/connector", response_model=list[ConnectorSnapshot])
def get_connectors(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[ConnectorSnapshot]:
connectors = fetch_connectors(db_session)
return [
ConnectorSnapshot.from_connector_db_model(connector) for connector in connectors
]
@router.get("/connector/indexing-status")
@router.get("/admin/connector/indexing-status")
def get_connector_indexing_status(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[ConnectorIndexingStatus]:
connector_id_to_connector = {
connector_id_to_connector: dict[int, Connector] = {
connector.id: connector for connector in fetch_connectors(db_session)
}
index_attempts = fetch_latest_index_attempts_by_status(db_session)
connector_to_index_attempts: dict[int, list[IndexAttempt]] = defaultdict(list)
connector_credential_pair_to_index_attempts: dict[
tuple[int, int], list[IndexAttempt]
] = defaultdict(list)
for index_attempt in index_attempts:
# don't consider index attempts where the connector has been deleted
if index_attempt.connector_id:
connector_to_index_attempts[index_attempt.connector_id].append(
index_attempt
)
# or the credential has been deleted
if index_attempt.connector_id and index_attempt.credential_id:
connector_credential_pair_to_index_attempts[
(index_attempt.connector_id, index_attempt.credential_id)
].append(index_attempt)
indexing_statuses: list[ConnectorIndexingStatus] = []
for connector_id, index_attempts in connector_to_index_attempts.items():
for (
connector_id,
credential_id,
), index_attempts in connector_credential_pair_to_index_attempts.items():
# NOTE: index_attempts is guaranteed to be length > 0
connector = connector_id_to_connector[connector_id]
credential = [
credential_association.credential
for credential_association in connector.credentials
if credential_association.credential_id == credential_id
][0]
index_attempts_sorted = sorted(
index_attempts, key=lambda x: x.time_updated, reverse=True
)
@@ -239,6 +218,8 @@ def get_connector_indexing_status(
indexing_statuses.append(
ConnectorIndexingStatus(
connector=ConnectorSnapshot.from_connector_db_model(connector),
public_doc=credential.public_doc,
owner=credential.user.email if credential.user else "",
last_status=index_attempts_sorted[0].status,
last_success=successful_index_attempts_sorted[0].time_updated
if successful_index_attempts_sorted
@@ -250,53 +231,28 @@ def get_connector_indexing_status(
),
)
# add in the connector that haven't started indexing yet
# add in the connectors that haven't started indexing yet
for connector in connector_id_to_connector.values():
if connector.id not in connector_to_index_attempts:
indexing_statuses.append(
ConnectorIndexingStatus(
connector=ConnectorSnapshot.from_connector_db_model(connector),
last_status=IndexingStatus.NOT_STARTED,
last_success=None,
docs_indexed=0,
),
)
for credential_association in connector.credentials:
if (
connector.id,
credential_association.credential_id,
) not in connector_credential_pair_to_index_attempts:
indexing_statuses.append(
ConnectorIndexingStatus(
connector=ConnectorSnapshot.from_connector_db_model(connector),
public_doc=credential_association.credential.public_doc,
owner=credential.user.email if credential.user else "",
last_status=IndexingStatus.NOT_STARTED,
last_success=None,
docs_indexed=0,
),
)
return indexing_statuses
@router.get(
"/connector/{connector_id}",
response_model=ConnectorSnapshot | StatusResponse[int],
)
def get_connector_by_id(
connector_id: int,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> ConnectorSnapshot | StatusResponse[int]:
connector = fetch_connector_by_id(connector_id, db_session)
if connector is None:
raise HTTPException(
status_code=404, detail=f"Connector {connector_id} does not exist"
)
return ConnectorSnapshot(
id=connector.id,
name=connector.name,
source=connector.source,
input_type=connector.input_type,
connector_specific_config=connector.connector_specific_config,
refresh_freq=connector.refresh_freq,
credential_ids=[
association.credential.id for association in connector.credentials
],
time_created=connector.time_created,
time_updated=connector.time_updated,
disabled=connector.disabled,
)
@router.post("/connector", response_model=ObjectCreationIdResponse)
@router.post("/admin/connector")
def create_connector_from_model(
connector_info: ConnectorBase,
_: User = Depends(current_admin_user),
@@ -308,10 +264,7 @@ def create_connector_from_model(
raise HTTPException(status_code=400, detail=str(e))
@router.patch(
"/connector/{connector_id}",
response_model=ConnectorSnapshot | StatusResponse[int],
)
@router.patch("/admin/connector/{connector_id}")
def update_connector_from_model(
connector_id: int,
connector_data: ConnectorBase,
@@ -340,7 +293,7 @@ def update_connector_from_model(
)
@router.delete("/connector/{connector_id}", response_model=StatusResponse[int])
@router.delete("/admin/connector/{connector_id}", response_model=StatusResponse[int])
def delete_connector_by_id(
connector_id: int,
_: User = Depends(current_admin_user),
@@ -349,128 +302,7 @@ def delete_connector_by_id(
return delete_connector(connector_id, db_session)
@router.get("/credential", response_model=list[CredentialSnapshot])
def get_credentials(
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[CredentialSnapshot]:
credentials = fetch_credentials(user, db_session)
return [
CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
for credential in credentials
]
@router.get(
"/credential/{credential_id}",
response_model=CredentialSnapshot | StatusResponse[int],
)
def get_credential_by_id(
credential_id: int,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> CredentialSnapshot | StatusResponse[int]:
credential = fetch_credential_by_id(credential_id, user, db_session)
if credential is None:
raise HTTPException(
status_code=401,
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
return CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
@router.post("/credential", response_model=ObjectCreationIdResponse)
def create_credential_from_model(
connector_info: CredentialBase,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> ObjectCreationIdResponse:
return create_credential(connector_info, user, db_session)
@router.patch(
"/credential/{credential_id}",
response_model=CredentialSnapshot | StatusResponse[int],
)
def update_credential_from_model(
credential_id: int,
credential_data: CredentialBase,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> CredentialSnapshot | StatusResponse[int]:
updated_credential = update_credential(
credential_id, credential_data, user, db_session
)
if updated_credential is None:
raise HTTPException(
status_code=401,
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
return CredentialSnapshot(
id=updated_credential.id,
credential_json=updated_credential.credential_json,
user_id=updated_credential.user_id,
public_doc=updated_credential.public_doc,
time_created=updated_credential.time_created,
time_updated=updated_credential.time_updated,
)
@router.delete("/credential/{credential_id}", response_model=StatusResponse[int])
def delete_credential_by_id(
credential_id: int,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse:
delete_credential(credential_id, user, db_session)
return StatusResponse(
success=True, message="Credential deleted successfully", data=credential_id
)
@router.put("/connector/{connector_id}/credential/{credential_id}")
def associate_credential_to_connector(
connector_id: int,
credential_id: int,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
return add_credential_to_connector(connector_id, credential_id, user, db_session)
@router.delete("/connector/{connector_id}/credential/{credential_id}")
def dissociate_credential_from_connector(
connector_id: int,
credential_id: int,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
return remove_credential_from_connector(
connector_id, credential_id, user, db_session
)
@router.post("/connector/run-once")
@router.post("/admin/connector/run-once")
def connector_run_once(
run_info: RunConnectorRequest,
_: User = Depends(current_admin_user),
@@ -516,7 +348,7 @@ def connector_run_once(
)
@router.head("/openai-api-key/validate")
@router.head("/admin/openai-api-key/validate")
def validate_existing_openai_api_key(
_: User = Depends(current_admin_user),
) -> None:
@@ -532,7 +364,7 @@ def validate_existing_openai_api_key(
raise HTTPException(status_code=400, detail="Invalid API key provided")
@router.get("/openai-api-key", response_model=ApiKey)
@router.get("/admin/openai-api-key", response_model=ApiKey)
def get_openai_api_key_from_dynamic_config_store(
_: User = Depends(current_admin_user),
) -> ApiKey:
@@ -550,7 +382,7 @@ def get_openai_api_key_from_dynamic_config_store(
raise HTTPException(status_code=404, detail="Key not found")
@router.post("/openai-api-key")
@router.put("/admin/openai-api-key")
def store_openai_api_key(
request: ApiKey,
_: User = Depends(current_admin_user),
@@ -564,8 +396,204 @@ def store_openai_api_key(
raise HTTPException(400, str(e))
@router.delete("/openai-api-key")
@router.delete("/admin/openai-api-key")
def delete_openai_api_key(
_: User = Depends(current_admin_user),
) -> None:
get_dynamic_config_store().delete(OPENAI_API_KEY_STORAGE_KEY)
"""Endpoints for all!"""
@router.get("/connector/google-drive/authorize/{credential_id}")
def google_drive_auth(
response: Response, credential_id: str, _: User = Depends(current_user)
) -> AuthUrl:
# set a cookie that we can read in the callback (used for `verify_csrf`)
response.set_cookie(
key=_GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME,
value=credential_id,
httponly=True,
max_age=600,
)
return AuthUrl(auth_url=get_auth_url(int(credential_id)))
@router.get("/connector/google-drive/callback")
def google_drive_callback(
request: Request,
callback: GDriveCallback = Depends(),
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> StatusResponse:
credential_id_cookie = request.cookies.get(_GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME)
if credential_id_cookie is None or not credential_id_cookie.isdigit():
raise HTTPException(
status_code=401, detail="Request did not pass CSRF verification."
)
credential_id = int(credential_id_cookie)
verify_csrf(credential_id, callback.state)
if (
update_credential_access_tokens(callback.code, credential_id, user, db_session)
is None
):
raise HTTPException(
status_code=500, detail="Unable to fetch Google Drive access tokens"
)
return StatusResponse(success=True, message="Updated Google Drive access tokens")
@router.get("/connector")
def get_connectors(
_: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> list[ConnectorSnapshot]:
connectors = fetch_connectors(db_session)
return [
ConnectorSnapshot.from_connector_db_model(connector) for connector in connectors
]
@router.get("/connector/{connector_id}")
def get_connector_by_id(
connector_id: int,
_: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> ConnectorSnapshot | StatusResponse[int]:
connector = fetch_connector_by_id(connector_id, db_session)
if connector is None:
raise HTTPException(
status_code=404, detail=f"Connector {connector_id} does not exist"
)
return ConnectorSnapshot(
id=connector.id,
name=connector.name,
source=connector.source,
input_type=connector.input_type,
connector_specific_config=connector.connector_specific_config,
refresh_freq=connector.refresh_freq,
credential_ids=[
association.credential.id for association in connector.credentials
],
time_created=connector.time_created,
time_updated=connector.time_updated,
disabled=connector.disabled,
)
@router.get("/credential")
def get_credentials(
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> list[CredentialSnapshot]:
credentials = fetch_credentials(user, db_session)
return [
CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
for credential in credentials
]
@router.get("/credential/{credential_id}")
def get_credential_by_id(
credential_id: int,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> CredentialSnapshot | StatusResponse[int]:
credential = fetch_credential_by_id(credential_id, user, db_session)
if credential is None:
raise HTTPException(
status_code=401,
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
return CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
@router.post("/credential")
def create_credential_from_model(
connector_info: CredentialBase,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> ObjectCreationIdResponse:
return create_credential(connector_info, user, db_session)
@router.patch("/credential/{credential_id}")
def update_credential_from_model(
credential_id: int,
credential_data: CredentialBase,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> CredentialSnapshot | StatusResponse[int]:
updated_credential = update_credential(
credential_id, credential_data, user, db_session
)
if updated_credential is None:
raise HTTPException(
status_code=401,
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
return CredentialSnapshot(
id=updated_credential.id,
credential_json=updated_credential.credential_json,
user_id=updated_credential.user_id,
public_doc=updated_credential.public_doc,
time_created=updated_credential.time_created,
time_updated=updated_credential.time_updated,
)
@router.delete("/credential/{credential_id}")
def delete_credential_by_id(
credential_id: int,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> StatusResponse:
delete_credential(credential_id, user, db_session)
return StatusResponse(
success=True, message="Credential deleted successfully", data=credential_id
)
@router.put("/connector/{connector_id}/credential/{credential_id}")
def associate_credential_to_connector(
connector_id: int,
credential_id: int,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
return add_credential_to_connector(connector_id, credential_id, user, db_session)
@router.delete("/connector/{connector_id}/credential/{credential_id}")
def dissociate_credential_from_connector(
connector_id: int,
credential_id: int,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
return remove_credential_from_connector(
connector_id, credential_id, user, db_session
)

View File

@@ -137,6 +137,8 @@ class ConnectorIndexingStatus(BaseModel):
"""Represents the latest indexing status of a connector"""
connector: ConnectorSnapshot
owner: str
public_doc: bool
last_status: IndexingStatus
last_success: datetime | None
docs_indexed: int