From ce12dd4a5a25472bdb17636cf76facd9276caed2 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Mon, 29 Jan 2024 19:34:25 -0800 Subject: [PATCH] Fix Secondary Index Polling (#1020) --- .../background/indexing/run_indexing.py | 1 + .../danswer/db/connector_credential_pair.py | 38 ++++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index ebf752b015..13ecb1d0d8 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -136,6 +136,7 @@ def _run_indexing( last_successful_index_time = get_last_successful_attempt_time( connector_id=db_connector.id, credential_id=db_credential.id, + embedding_model=index_attempt.embedding_model, db_session=db_session, ) diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index dd8da7856e..25c646072e 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -56,20 +56,40 @@ def get_connector_credential_pair_from_id( def get_last_successful_attempt_time( connector_id: int, credential_id: int, + embedding_model: EmbeddingModel, db_session: Session, ) -> float: """Gets the timestamp of the last successful index run stored in the CC Pair row in the database""" - connector_credential_pair = get_connector_credential_pair( - connector_id, credential_id, db_session + if embedding_model.status == IndexModelStatus.PRESENT: + connector_credential_pair = get_connector_credential_pair( + connector_id, credential_id, db_session + ) + if ( + connector_credential_pair is None + or connector_credential_pair.last_successful_index_time is None + ): + return 0.0 + + return connector_credential_pair.last_successful_index_time.timestamp() + + # For Secondary Index we don't keep track of the latest success, so have to calculate it live + attempt = ( + db_session.query(IndexAttempt) + .filter( + IndexAttempt.connector_id == connector_id, + IndexAttempt.credential_id == credential_id, + IndexAttempt.embedding_model_id == embedding_model.id, + IndexAttempt.status == IndexingStatus.SUCCESS, + ) + .order_by(IndexAttempt.time_started.desc()) + .first() ) - if ( - connector_credential_pair is None - or connector_credential_pair.last_successful_index_time is None - ): + + if not attempt or not attempt.time_started: return 0.0 - return connector_credential_pair.last_successful_index_time.timestamp() + return attempt.time_started.timestamp() def update_connector_credential_pair( @@ -262,7 +282,7 @@ def resync_cc_pair( if only_include_success: query = query.filter(IndexAttempt.status == IndexingStatus.SUCCESS) - latest_index_attempt = query.order_by(desc(IndexAttempt.time_updated)).first() + latest_index_attempt = query.order_by(desc(IndexAttempt.time_started)).first() return latest_index_attempt @@ -274,7 +294,7 @@ def resync_cc_pair( ) cc_pair.last_successful_index_time = ( - last_success.time_updated if last_success else None + last_success.time_started if last_success else None ) last_run = find_latest_index_attempt(