From a2156dd83653d9c4c6852ca2b9497257d14c5850 Mon Sep 17 00:00:00 2001 From: Weves Date: Sat, 27 Apr 2024 15:42:01 -0700 Subject: [PATCH] Cancel scheduled indexing attempts on deletion request --- backend/danswer/background/celery/celery.py | 13 ++-- .../background/indexing/run_indexing.py | 63 ++++++++++++------- backend/danswer/db/deletion_attempt.py | 30 ++++++--- backend/danswer/db/index_attempt.py | 5 +- backend/danswer/server/documents/connector.py | 7 ++- .../danswer/server/manage/administrative.py | 14 +++-- 6 files changed, 86 insertions(+), 46 deletions(-) diff --git a/backend/danswer/background/celery/celery.py b/backend/danswer/background/celery/celery.py index dd5bc3080..2d9df4a58 100644 --- a/backend/danswer/background/celery/celery.py +++ b/backend/danswer/background/celery/celery.py @@ -67,15 +67,16 @@ def cleanup_connector_credential_pair_task( connector_id=connector_id, credential_id=credential_id, ) - if not cc_pair or not check_deletion_attempt_is_allowed( - connector_credential_pair=cc_pair - ): + if not cc_pair: raise ValueError( - "Cannot run deletion attempt - connector_credential_pair is not deletable. " - "This is likely because there is an ongoing / planned indexing attempt OR the " - "connector is not disabled." + f"Cannot run deletion attempt - connector_credential_pair with Connector ID: " + f"{connector_id} and Credential ID: {credential_id} does not exist." ) + deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) + if deletion_attempt_disallowed_reason: + raise ValueError(deletion_attempt_disallowed_reason) + try: # The bulk of the work is in here, updates Postgres and Vespa curr_ind_name, sec_ind_name = get_both_index_names(db_session) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 4f7241832..e78d5558d 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -26,7 +26,7 @@ from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import mark_attempt_failed -from danswer.db.index_attempt import mark_attempt_in_progress +from danswer.db.index_attempt import mark_attempt_in_progress__no_commit from danswer.db.index_attempt import mark_attempt_succeeded from danswer.db.index_attempt import update_docs_indexed from danswer.db.models import IndexAttempt @@ -115,16 +115,6 @@ def _run_indexing( # Secondary index syncs at the end when swapping is_primary = index_attempt.embedding_model.status == IndexModelStatus.PRESENT - # Mark as started - mark_attempt_in_progress(index_attempt, db_session) - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=index_attempt.connector.id, - credential_id=index_attempt.credential.id, - attempt_status=IndexingStatus.IN_PROGRESS, - ) - # Indexing is only done into one index at a time document_index = get_default_document_index( primary_index_name=index_name, secondary_index_name=None @@ -331,6 +321,42 @@ def _run_indexing( ) +def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexAttempt: + # make sure that the index attempt can't change in between checking the + # status and marking it as in_progress. This setting will be discarded + # after the next commit: + # https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions + db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore + + attempt = get_index_attempt( + db_session=db_session, + index_attempt_id=index_attempt_id, + ) + if attempt is None: + raise RuntimeError(f"Unable to find IndexAttempt for ID '{index_attempt_id}'") + + if attempt.status != IndexingStatus.NOT_STARTED: + raise RuntimeError( + f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. " + f"Current status is '{attempt.status}'." + ) + + # only commit once, to make sure this all happens in a single transaction + mark_attempt_in_progress__no_commit(attempt) + is_primary = attempt.embedding_model.status == IndexModelStatus.PRESENT + if is_primary: + update_connector_credential_pair( + db_session=db_session, + connector_id=attempt.connector.id, + credential_id=attempt.credential.id, + attempt_status=IndexingStatus.IN_PROGRESS, + ) + else: + db_session.commit() + + return attempt + + def run_indexing_entrypoint(index_attempt_id: int) -> None: """Entrypoint for indexing run when using dask distributed. Wraps the actual logic in a `try` block so that we can catch any exceptions @@ -341,13 +367,9 @@ def run_indexing_entrypoint(index_attempt_id: int) -> None: IndexAttemptSingleton.set_index_attempt_id(index_attempt_id) with Session(get_sqlalchemy_engine()) as db_session: - attempt = get_index_attempt( - db_session=db_session, index_attempt_id=index_attempt_id - ) - if attempt is None: - raise RuntimeError( - f"Unable to find IndexAttempt for ID '{index_attempt_id}'" - ) + # make sure that it is valid to run this indexing attempt + mark it + # as in progress + attempt = _prepare_index_attempt(db_session, index_attempt_id) logger.info( f"Running indexing attempt for connector: '{attempt.connector.name}', " @@ -355,10 +377,7 @@ def run_indexing_entrypoint(index_attempt_id: int) -> None: f"with credentials: '{attempt.credential_id}'" ) - _run_indexing( - db_session=db_session, - index_attempt=attempt, - ) + _run_indexing(db_session, attempt) logger.info( f"Completed indexing attempt for connector: '{attempt.connector.name}', " diff --git a/backend/danswer/db/deletion_attempt.py b/backend/danswer/db/deletion_attempt.py index 00ae13d5b..e324c1c27 100644 --- a/backend/danswer/db/deletion_attempt.py +++ b/backend/danswer/db/deletion_attempt.py @@ -4,17 +4,31 @@ from danswer.db.models import IndexingStatus def check_deletion_attempt_is_allowed( connector_credential_pair: ConnectorCredentialPair, -) -> bool: + allow_scheduled: bool = False, +) -> str | None: """ To be deletable: (1) connector should be disabled (2) there should be no in-progress/planned index attempts + + Returns an error message if the deletion attempt is not allowed, otherwise None. """ - return bool( - connector_credential_pair.connector.disabled - and ( - connector_credential_pair.last_attempt_status != IndexingStatus.IN_PROGRESS - and connector_credential_pair.last_attempt_status - != IndexingStatus.NOT_STARTED - ) + base_error_msg = ( + f"Connector with ID '{connector_credential_pair.connector_id}' and credential ID " + f"'{connector_credential_pair.credential_id}' is not deletable." ) + + if not connector_credential_pair.connector.disabled: + return base_error_msg + " Connector must be paused." + + if connector_credential_pair.last_attempt_status == IndexingStatus.IN_PROGRESS or ( + connector_credential_pair.last_attempt_status == IndexingStatus.NOT_STARTED + and not allow_scheduled + ): + return ( + base_error_msg + + " There is an ongoing / planned indexing attempt. " + + "The indexing attempt must be completed or cancelled before deletion." + ) + + return None diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index df42d869d..51c41c719 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -75,14 +75,11 @@ def get_not_started_index_attempts(db_session: Session) -> list[IndexAttempt]: return list(new_attempts.all()) -def mark_attempt_in_progress( +def mark_attempt_in_progress__no_commit( index_attempt: IndexAttempt, - db_session: Session, ) -> None: index_attempt.status = IndexingStatus.IN_PROGRESS index_attempt.time_started = index_attempt.time_started or func.now() # type: ignore - db_session.add(index_attempt) - db_session.commit() def mark_attempt_succeeded( diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index 7bc8593d9..0f51fe39c 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -430,8 +430,11 @@ def get_connector_indexing_status( db_session=db_session, ), is_deletable=check_deletion_attempt_is_allowed( - connector_credential_pair=cc_pair - ), + connector_credential_pair=cc_pair, + # allow scheduled indexing attempts here, since on deletion request we will cancel them + allow_scheduled=True, + ) + is None, ) ) diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index 9b3f8df21..6949649ee 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -25,6 +25,7 @@ from danswer.db.feedback import fetch_docs_ranked_by_boost from danswer.db.feedback import update_document_boost from danswer.db.feedback import update_document_hidden from danswer.db.file_store import get_default_file_store +from danswer.db.index_attempt import cancel_indexing_attempts_for_connector from danswer.db.models import User from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index @@ -168,12 +169,17 @@ def create_deletion_attempt_for_connector_id( f"'{credential_id}' does not exist. Has it already been deleted?", ) - if not check_deletion_attempt_is_allowed(connector_credential_pair=cc_pair): + # Cancel any scheduled indexing attempts + cancel_indexing_attempts_for_connector( + connector_id=connector_id, db_session=db_session, include_secondary_index=True + ) + + # Check if the deletion attempt should be allowed + deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) + if deletion_attempt_disallowed_reason: raise HTTPException( status_code=400, - detail=f"Connector with ID '{connector_id}' and credential ID " - f"'{credential_id}' is not deletable. It must be both disabled AND have " - "no ongoing / planned indexing attempts.", + detail=deletion_attempt_disallowed_reason, ) cleanup_connector_credential_pair_task.apply_async(