From 43efa9da9482a3db773e5872c874d176fbacb99b Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Thu, 31 Aug 2023 17:43:03 -0700 Subject: [PATCH] Mark incomplete Index Attempts as Failed on job restart (#371) --- backend/danswer/background/update.py | 7 +++++++ backend/danswer/db/connector_credential_pair.py | 15 +++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 54d4baf48..e2e036cbd 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -19,6 +19,7 @@ from danswer.datastores.indexing_pipeline import build_indexing_pipeline from danswer.db.connector import disable_connector from danswer.db.connector import fetch_connectors from danswer.db.connector_credential_pair import get_last_successful_attempt_time +from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.credentials import backend_update_credential_json from danswer.db.engine import get_db_current_time @@ -410,6 +411,12 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non client = Client(cluster) existing_jobs: dict[int, Future] = {} engine = get_sqlalchemy_engine() + + with Session(engine) as db_session: + # Previous version did not always clean up cc-pairs well leaving some connectors undeleteable + # This ensures that bad states get cleaned up + mark_all_in_progress_cc_pairs_failed(db_session) + while True: start = time.time() start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index 92c899069..ac6d5dcc0 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -3,6 +3,7 @@ from datetime import datetime from fastapi import HTTPException from sqlalchemy import delete from sqlalchemy import select +from sqlalchemy import update from sqlalchemy.orm import Session from danswer.db.connector import fetch_connector_by_id @@ -92,6 +93,20 @@ def delete_connector_credential_pair( db_session.execute(stmt) +def mark_all_in_progress_cc_pairs_failed( + db_session: Session, +) -> None: + stmt = ( + update(ConnectorCredentialPair) + .where( + ConnectorCredentialPair.last_attempt_status == IndexingStatus.IN_PROGRESS + ) + .values(last_attempt_status=IndexingStatus.FAILED) + ) + db_session.execute(stmt) + db_session.commit() + + def add_credential_to_connector( connector_id: int, credential_id: int,