Cancel scheduled indexing attempts on deletion request

This commit is contained in:
Weves 2024-04-27 15:42:01 -07:00 committed by Chris Weaver
parent a19290cb27
commit a2156dd836
6 changed files with 86 additions and 46 deletions

View File

@ -67,15 +67,16 @@ def cleanup_connector_credential_pair_task(
connector_id=connector_id,
credential_id=credential_id,
)
if not cc_pair or not check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair
):
if not cc_pair:
raise ValueError(
"Cannot run deletion attempt - connector_credential_pair is not deletable. "
"This is likely because there is an ongoing / planned indexing attempt OR the "
"connector is not disabled."
f"Cannot run deletion attempt - connector_credential_pair with Connector ID: "
f"{connector_id} and Credential ID: {credential_id} does not exist."
)
deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair)
if deletion_attempt_disallowed_reason:
raise ValueError(deletion_attempt_disallowed_reason)
try:
# The bulk of the work is in here, updates Postgres and Vespa
curr_ind_name, sec_ind_name = get_both_index_names(db_session)

View File

@ -26,7 +26,7 @@ from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.index_attempt import mark_attempt_in_progress
from danswer.db.index_attempt import mark_attempt_in_progress__no_commit
from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import IndexAttempt
@ -115,16 +115,6 @@ def _run_indexing(
# Secondary index syncs at the end when swapping
is_primary = index_attempt.embedding_model.status == IndexModelStatus.PRESENT
# Mark as started
mark_attempt_in_progress(index_attempt, db_session)
if is_primary:
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector.id,
credential_id=index_attempt.credential.id,
attempt_status=IndexingStatus.IN_PROGRESS,
)
# Indexing is only done into one index at a time
document_index = get_default_document_index(
primary_index_name=index_name, secondary_index_name=None
@ -331,6 +321,42 @@ def _run_indexing(
)
def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexAttempt:
# make sure that the index attempt can't change in between checking the
# status and marking it as in_progress. This setting will be discarded
# after the next commit:
# https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions
db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore
attempt = get_index_attempt(
db_session=db_session,
index_attempt_id=index_attempt_id,
)
if attempt is None:
raise RuntimeError(f"Unable to find IndexAttempt for ID '{index_attempt_id}'")
if attempt.status != IndexingStatus.NOT_STARTED:
raise RuntimeError(
f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. "
f"Current status is '{attempt.status}'."
)
# only commit once, to make sure this all happens in a single transaction
mark_attempt_in_progress__no_commit(attempt)
is_primary = attempt.embedding_model.status == IndexModelStatus.PRESENT
if is_primary:
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector.id,
credential_id=attempt.credential.id,
attempt_status=IndexingStatus.IN_PROGRESS,
)
else:
db_session.commit()
return attempt
def run_indexing_entrypoint(index_attempt_id: int) -> None:
"""Entrypoint for indexing run when using dask distributed.
Wraps the actual logic in a `try` block so that we can catch any exceptions
@ -341,13 +367,9 @@ def run_indexing_entrypoint(index_attempt_id: int) -> None:
IndexAttemptSingleton.set_index_attempt_id(index_attempt_id)
with Session(get_sqlalchemy_engine()) as db_session:
attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)
if attempt is None:
raise RuntimeError(
f"Unable to find IndexAttempt for ID '{index_attempt_id}'"
)
# make sure that it is valid to run this indexing attempt + mark it
# as in progress
attempt = _prepare_index_attempt(db_session, index_attempt_id)
logger.info(
f"Running indexing attempt for connector: '{attempt.connector.name}', "
@ -355,10 +377,7 @@ def run_indexing_entrypoint(index_attempt_id: int) -> None:
f"with credentials: '{attempt.credential_id}'"
)
_run_indexing(
db_session=db_session,
index_attempt=attempt,
)
_run_indexing(db_session, attempt)
logger.info(
f"Completed indexing attempt for connector: '{attempt.connector.name}', "

View File

@ -4,17 +4,31 @@ from danswer.db.models import IndexingStatus
def check_deletion_attempt_is_allowed(
connector_credential_pair: ConnectorCredentialPair,
) -> bool:
allow_scheduled: bool = False,
) -> str | None:
"""
To be deletable:
(1) connector should be disabled
(2) there should be no in-progress/planned index attempts
Returns an error message if the deletion attempt is not allowed, otherwise None.
"""
return bool(
connector_credential_pair.connector.disabled
and (
connector_credential_pair.last_attempt_status != IndexingStatus.IN_PROGRESS
and connector_credential_pair.last_attempt_status
!= IndexingStatus.NOT_STARTED
)
base_error_msg = (
f"Connector with ID '{connector_credential_pair.connector_id}' and credential ID "
f"'{connector_credential_pair.credential_id}' is not deletable."
)
if not connector_credential_pair.connector.disabled:
return base_error_msg + " Connector must be paused."
if connector_credential_pair.last_attempt_status == IndexingStatus.IN_PROGRESS or (
connector_credential_pair.last_attempt_status == IndexingStatus.NOT_STARTED
and not allow_scheduled
):
return (
base_error_msg
+ " There is an ongoing / planned indexing attempt. "
+ "The indexing attempt must be completed or cancelled before deletion."
)
return None

View File

@ -75,14 +75,11 @@ def get_not_started_index_attempts(db_session: Session) -> list[IndexAttempt]:
return list(new_attempts.all())
def mark_attempt_in_progress(
def mark_attempt_in_progress__no_commit(
index_attempt: IndexAttempt,
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.IN_PROGRESS
index_attempt.time_started = index_attempt.time_started or func.now() # type: ignore
db_session.add(index_attempt)
db_session.commit()
def mark_attempt_succeeded(

View File

@ -430,8 +430,11 @@ def get_connector_indexing_status(
db_session=db_session,
),
is_deletable=check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair
),
connector_credential_pair=cc_pair,
# allow scheduled indexing attempts here, since on deletion request we will cancel them
allow_scheduled=True,
)
is None,
)
)

View File

@ -25,6 +25,7 @@ from danswer.db.feedback import fetch_docs_ranked_by_boost
from danswer.db.feedback import update_document_boost
from danswer.db.feedback import update_document_hidden
from danswer.db.file_store import get_default_file_store
from danswer.db.index_attempt import cancel_indexing_attempts_for_connector
from danswer.db.models import User
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
@ -168,12 +169,17 @@ def create_deletion_attempt_for_connector_id(
f"'{credential_id}' does not exist. Has it already been deleted?",
)
if not check_deletion_attempt_is_allowed(connector_credential_pair=cc_pair):
# Cancel any scheduled indexing attempts
cancel_indexing_attempts_for_connector(
connector_id=connector_id, db_session=db_session, include_secondary_index=True
)
# Check if the deletion attempt should be allowed
deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair)
if deletion_attempt_disallowed_reason:
raise HTTPException(
status_code=400,
detail=f"Connector with ID '{connector_id}' and credential ID "
f"'{credential_id}' is not deletable. It must be both disabled AND have "
"no ongoing / planned indexing attempts.",
detail=deletion_attempt_disallowed_reason,
)
cleanup_connector_credential_pair_task.apply_async(