From 87f304dfd0fa9157043869a12bd51b4e60acea13 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Fri, 19 Apr 2024 10:38:15 -0700 Subject: [PATCH] Swap Index Early (#1353) --- backend/danswer/background/update.py | 51 +----------------------- backend/danswer/db/swap_index.py | 58 ++++++++++++++++++++++++++++ backend/danswer/main.py | 2 + 3 files changed, 62 insertions(+), 49 deletions(-) create mode 100644 backend/danswer/db/swap_index.py diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 6042e02b1..ae6296965 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -17,19 +17,12 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.db.connector import fetch_connectors -from danswer.db.connector_credential_pair import get_connector_credential_pairs from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed -from danswer.db.connector_credential_pair import resync_cc_pair from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model -from danswer.db.embedding_model import update_embedding_model_status from danswer.db.engine import get_db_current_time from danswer.db.engine import get_sqlalchemy_engine -from danswer.db.index_attempt import cancel_indexing_attempts_past_model -from danswer.db.index_attempt import ( - count_unique_cc_pairs_with_successful_index_attempts, -) from danswer.db.index_attempt import create_index_attempt from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import get_inprogress_index_attempts @@ -41,6 +34,7 @@ from danswer.db.models import EmbeddingModel from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus from danswer.db.models import IndexModelStatus +from danswer.db.swap_index import check_index_swap from danswer.search.search_nlp_models import warm_up_encoders from danswer.utils.logger import setup_logger from shared_configs.configs import INDEXING_MODEL_SERVER_HOST @@ -354,51 +348,10 @@ def kickoff_indexing_jobs( return existing_jobs_copy -def check_index_swap(db_session: Session) -> None: - """Get count of cc-pairs and count of successful index_attempts for the - new model grouped by connector + credential, if it's the same, then assume - new index is done building. If so, swap the indices and expire the old one.""" - # Default CC-pair created for Ingestion API unused here - all_cc_pairs = get_connector_credential_pairs(db_session) - cc_pair_count = len(all_cc_pairs) - 1 - embedding_model = get_secondary_db_embedding_model(db_session) - - if not embedding_model: - return - - unique_cc_indexings = count_unique_cc_pairs_with_successful_index_attempts( - embedding_model_id=embedding_model.id, db_session=db_session - ) - - if unique_cc_indexings > cc_pair_count: - raise RuntimeError("More unique indexings than cc pairs, should not occur") - - if cc_pair_count == unique_cc_indexings: - # Swap indices - now_old_embedding_model = get_current_db_embedding_model(db_session) - update_embedding_model_status( - embedding_model=now_old_embedding_model, - new_status=IndexModelStatus.PAST, - db_session=db_session, - ) - - update_embedding_model_status( - embedding_model=embedding_model, - new_status=IndexModelStatus.PRESENT, - db_session=db_session, - ) - - # Expire jobs for the now past index/embedding model - cancel_indexing_attempts_past_model(db_session) - - # Recount aggregates - for cc_pair in all_cc_pairs: - resync_cc_pair(cc_pair, db_session=db_session) - - def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> None: engine = get_sqlalchemy_engine() with Session(engine) as db_session: + check_index_swap(db_session=db_session) db_embedding_model = get_current_db_embedding_model(db_session) # So that the first time users aren't surprised by really slow speed of first diff --git a/backend/danswer/db/swap_index.py b/backend/danswer/db/swap_index.py new file mode 100644 index 000000000..93eb4714a --- /dev/null +++ b/backend/danswer/db/swap_index.py @@ -0,0 +1,58 @@ +from sqlalchemy.orm import Session + +from danswer.db.connector_credential_pair import get_connector_credential_pairs +from danswer.db.connector_credential_pair import resync_cc_pair +from danswer.db.embedding_model import get_current_db_embedding_model +from danswer.db.embedding_model import get_secondary_db_embedding_model +from danswer.db.embedding_model import update_embedding_model_status +from danswer.db.enums import IndexModelStatus +from danswer.db.index_attempt import cancel_indexing_attempts_past_model +from danswer.db.index_attempt import ( + count_unique_cc_pairs_with_successful_index_attempts, +) +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +def check_index_swap(db_session: Session) -> None: + """Get count of cc-pairs and count of successful index_attempts for the + new model grouped by connector + credential, if it's the same, then assume + new index is done building. If so, swap the indices and expire the old one.""" + # Default CC-pair created for Ingestion API unused here + all_cc_pairs = get_connector_credential_pairs(db_session) + cc_pair_count = max(len(all_cc_pairs) - 1, 0) + embedding_model = get_secondary_db_embedding_model(db_session) + + if not embedding_model: + return + + unique_cc_indexings = count_unique_cc_pairs_with_successful_index_attempts( + embedding_model_id=embedding_model.id, db_session=db_session + ) + + if unique_cc_indexings > cc_pair_count: + raise RuntimeError("More unique indexings than cc pairs, should not occur") + + if cc_pair_count == 0 or cc_pair_count == unique_cc_indexings: + # Swap indices + now_old_embedding_model = get_current_db_embedding_model(db_session) + update_embedding_model_status( + embedding_model=now_old_embedding_model, + new_status=IndexModelStatus.PAST, + db_session=db_session, + ) + + update_embedding_model_status( + embedding_model=embedding_model, + new_status=IndexModelStatus.PRESENT, + db_session=db_session, + ) + + if cc_pair_count > 0: + # Expire jobs for the now past index/embedding model + cancel_indexing_attempts_past_model(db_session) + + # Recount aggregates + for cc_pair in all_cc_pairs: + resync_cc_pair(cc_pair, db_session=db_session) diff --git a/backend/danswer/main.py b/backend/danswer/main.py index e8afa0838..8e27482ae 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -46,6 +46,7 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import expire_index_attempts +from danswer.db.swap_index import check_index_swap from danswer.document_index.factory import get_default_document_index from danswer.dynamic_configs.port_configs import port_filesystem_to_postgres from danswer.llm.factory import get_default_llm @@ -180,6 +181,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: ) with Session(engine) as db_session: + check_index_swap(db_session=db_session) db_embedding_model = get_current_db_embedding_model(db_session) secondary_db_embedding_model = get_secondary_db_embedding_model(db_session)