better handling around index attempts that don't exist and remove unn… (#3417)

* better handling around index attempts that don't exist and remove unnecessary index attempt deletions

* don't delete index attempts, just update them

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer
2024-12-11 11:32:04 -08:00
committed by GitHub
parent b88cb388b7
commit 71421bb782
4 changed files with 36 additions and 21 deletions

View File

@@ -598,7 +598,7 @@ def connector_indexing_proxy_task(
db_session, db_session,
"Connector termination signal detected", "Connector termination signal detected",
) )
finally: except Exception:
# if the DB exceptions, we'll just get an unfriendly failure message # if the DB exceptions, we'll just get an unfriendly failure message
# in the UI instead of the cancellation message # in the UI instead of the cancellation message
logger.exception( logger.exception(

View File

@@ -680,17 +680,28 @@ def monitor_ccpair_indexing_taskset(
) )
task_logger.warning(msg) task_logger.warning(msg)
index_attempt = get_index_attempt(db_session, payload.index_attempt_id) try:
if index_attempt: index_attempt = get_index_attempt(
if ( db_session, payload.index_attempt_id
index_attempt.status != IndexingStatus.CANCELED )
and index_attempt.status != IndexingStatus.FAILED if index_attempt:
): if (
mark_attempt_failed( index_attempt.status != IndexingStatus.CANCELED
index_attempt_id=payload.index_attempt_id, and index_attempt.status != IndexingStatus.FAILED
db_session=db_session, ):
failure_reason=msg, mark_attempt_failed(
) index_attempt_id=payload.index_attempt_id,
db_session=db_session,
failure_reason=msg,
)
except Exception:
task_logger.exception(
"monitor_ccpair_indexing_taskset - transient exception marking index attempt as failed: "
f"attempt={payload.index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
redis_connector_index.reset() redis_connector_index.reset()
return return

View File

@@ -522,12 +522,16 @@ def expire_index_attempts(
search_settings_id: int, search_settings_id: int,
db_session: Session, db_session: Session,
) -> None: ) -> None:
delete_query = ( not_started_query = (
delete(IndexAttempt) update(IndexAttempt)
.where(IndexAttempt.search_settings_id == search_settings_id) .where(IndexAttempt.search_settings_id == search_settings_id)
.where(IndexAttempt.status == IndexingStatus.NOT_STARTED) .where(IndexAttempt.status == IndexingStatus.NOT_STARTED)
.values(
status=IndexingStatus.CANCELED,
error_msg="Canceled, likely due to model swap",
)
) )
db_session.execute(delete_query) db_session.execute(not_started_query)
update_query = ( update_query = (
update(IndexAttempt) update(IndexAttempt)
@@ -549,9 +553,14 @@ def cancel_indexing_attempts_for_ccpair(
include_secondary_index: bool = False, include_secondary_index: bool = False,
) -> None: ) -> None:
stmt = ( stmt = (
delete(IndexAttempt) update(IndexAttempt)
.where(IndexAttempt.connector_credential_pair_id == cc_pair_id) .where(IndexAttempt.connector_credential_pair_id == cc_pair_id)
.where(IndexAttempt.status == IndexingStatus.NOT_STARTED) .where(IndexAttempt.status == IndexingStatus.NOT_STARTED)
.values(
status=IndexingStatus.CANCELED,
error_msg="Canceled by user",
time_started=datetime.now(timezone.utc),
)
) )
if not include_secondary_index: if not include_secondary_index:

View File

@@ -33,8 +33,6 @@ from danswer.db.engine import get_current_tenant_id
from danswer.db.engine import get_session from danswer.db.engine import get_session
from danswer.db.enums import AccessType from danswer.db.enums import AccessType
from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import count_index_attempts_for_connector from danswer.db.index_attempt import count_index_attempts_for_connector
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id
@@ -193,9 +191,6 @@ def update_cc_pair_status(
db_session db_session
) )
cancel_indexing_attempts_for_ccpair(cc_pair_id, db_session)
cancel_indexing_attempts_past_model(db_session)
redis_connector = RedisConnector(tenant_id, cc_pair_id) redis_connector = RedisConnector(tenant_id, cc_pair_id)
try: try: