diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 2399facf1..70d64d880 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -185,6 +185,7 @@ def _run_indexing( db_session.refresh(index_attempt) if index_attempt.status != IndexingStatus.IN_PROGRESS: + # Likely due to user manually disabling it or model swap raise RuntimeError("Index Attempt was canceled") logger.debug( diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 7c28e8d4c..b77ddee85 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -14,6 +14,7 @@ from danswer.background.indexing.job_client import SimpleJobClient from danswer.background.indexing.run_indexing import run_indexing_entrypoint from danswer.configs.app_configs import CLEANUP_INDEXING_JOBS_TIMEOUT from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED +from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import LOG_LEVEL from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.configs.model_configs import MIN_THREADS_ML_MODELS @@ -69,8 +70,15 @@ def _should_create_new_indexing( connector: Connector, last_index: IndexAttempt | None, model: EmbeddingModel, + secondary_index_building: bool, db_session: Session, ) -> bool: + # User can still manually create single indexing attempts via the UI for the + # currently in use index + if DISABLE_INDEX_UPDATE_ON_SWAP: + if model.status == IndexModelStatus.PRESENT and secondary_index_building: + return False + # When switching over models, always index at least once if model.status == IndexModelStatus.FUTURE and not last_index: if connector.id == 0: # Ingestion API @@ -186,7 +194,11 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None: connector.id, credential.id, model.id, db_session ) if not _should_create_new_indexing( - connector, last_attempt, model, db_session + connector=connector, + last_index=last_attempt, + model=model, + secondary_index_building=len(embedding_models) > 1, + db_session=db_session, ): continue @@ -255,6 +267,9 @@ def cleanup_indexing_jobs( ) for index_attempt in in_progress_indexing_attempts: if index_attempt.id in existing_jobs: + # If index attempt is canceled, stop the run + if index_attempt.status == IndexingStatus.FAILED: + existing_jobs[index_attempt.id].cancel() # check to see if the job has been updated in last `timeout_hours` hours, if not # assume it to frozen in some bad state and just mark it as failed. Note: this relies # on the fact that the `time_updated` field is constantly updated every diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index d0159c432..7a9ef380f 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -177,6 +177,11 @@ EXPERIMENTAL_CHECKPOINTING_ENABLED = ( CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get( "CONTINUE_ON_CONNECTOR_FAILURE", "" ).lower() not in ["false", ""] +# When swapping to a new embedding model, a secondary index is created in the background, to conserve +# resources, we pause updates on the primary index by default while the secondary index is created +DISABLE_INDEX_UPDATE_ON_SWAP = ( + os.environ.get("DISABLE_INDEX_UPDATE_ON_SWAP", "").lower() == "true" +) # Controls how many worker processes we spin up to index documents in the # background. This is useful for speeding up indexing, but does require a # fairly large amount of memory in order to increase substantially, since diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 7d8b38959..05a23395d 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -229,13 +229,24 @@ def expire_index_attempts( embedding_model_id: int, db_session: Session, ) -> None: + delete_query = ( + delete(IndexAttempt) + .where(IndexAttempt.embedding_model_id == embedding_model_id) + .where(IndexAttempt.status == IndexingStatus.NOT_STARTED) + ) + db_session.execute(delete_query) + update_query = ( update(IndexAttempt) .where(IndexAttempt.embedding_model_id == embedding_model_id) .where(IndexAttempt.status != IndexingStatus.SUCCESS) - .values(status=IndexingStatus.FAILED, error_msg="Embedding model swapped") + .values( + status=IndexingStatus.FAILED, + error_msg="Canceled due to embedding model swap", + ) ) db_session.execute(update_query) + db_session.commit() diff --git a/backend/danswer/main.py b/backend/danswer/main.py index cd5addac6..e7b6e8491 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -25,6 +25,7 @@ from danswer.configs.app_configs import APP_HOST from danswer.configs.app_configs import APP_PORT from danswer.configs.app_configs import AUTH_TYPE from danswer.configs.app_configs import DISABLE_GENERATIVE_AI +from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import MODEL_SERVER_HOST from danswer.configs.app_configs import MODEL_SERVER_PORT from danswer.configs.app_configs import OAUTH_CLIENT_ID @@ -41,12 +42,15 @@ from danswer.configs.model_configs import GEN_AI_MODEL_VERSION from danswer.db.chat import delete_old_default_personas from danswer.db.connector import create_initial_default_connector from danswer.db.connector_credential_pair import associate_default_cc_pair +from danswer.db.connector_credential_pair import get_connector_credential_pairs +from danswer.db.connector_credential_pair import resync_cc_pair from danswer.db.credentials import create_initial_public_credential from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.embedding_model import insert_initial_embedding_models from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import cancel_indexing_attempts_past_model +from danswer.db.index_attempt import expire_index_attempts from danswer.document_index.factory import get_default_document_index from danswer.llm.factory import get_default_llm from danswer.search.search_nlp_models import warm_up_models @@ -257,7 +261,16 @@ def get_application() -> FastAPI: secondary_db_embedding_model = get_secondary_db_embedding_model(db_session) - # cleanup "NOT_STARTED" indexing attempts for embedding models that are no longer used + # Break bad state for thrashing indexes + if secondary_db_embedding_model and DISABLE_INDEX_UPDATE_ON_SWAP: + expire_index_attempts( + embedding_model_id=db_embedding_model.id, db_session=db_session + ) + + for cc_pair in get_connector_credential_pairs(db_session): + resync_cc_pair(cc_pair, db_session=db_session) + + # Expire all old embedding models indexing attempts, technically redundant cancel_indexing_attempts_past_model(db_session) logger.info(f'Using Embedding model: "{db_embedding_model.model_name}"') diff --git a/backend/danswer/server/manage/secondary_index.py b/backend/danswer/server/manage/secondary_index.py index 2013cfc5b..c4c51c0e3 100644 --- a/backend/danswer/server/manage/secondary_index.py +++ b/backend/danswer/server/manage/secondary_index.py @@ -6,6 +6,9 @@ from sqlalchemy.orm import Session from danswer.auth.users import current_admin_user from danswer.auth.users import current_user +from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP +from danswer.db.connector_credential_pair import get_connector_credential_pairs +from danswer.db.connector_credential_pair import resync_cc_pair from danswer.db.embedding_model import create_embedding_model from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model @@ -78,6 +81,14 @@ def set_new_embedding_model( secondary_index_embedding_dim=new_model.model_dim, ) + # Pause index attempts for the currently in use index to preserve resources + if DISABLE_INDEX_UPDATE_ON_SWAP: + expire_index_attempts( + embedding_model_id=current_model.id, db_session=db_session + ) + for cc_pair in get_connector_credential_pairs(db_session): + resync_cc_pair(cc_pair, db_session=db_session) + return IdReturn(id=new_model.id) diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 3f638772e..347e03aa9 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -132,6 +132,7 @@ services: - MIN_THREADS_ML_MODELS=${MIN_THREADS_ML_MODELS:-} # Indexing Configs - NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-} + - DISABLE_INDEX_UPDATE_ON_SWAP=${DISABLE_INDEX_UPDATE_ON_SWAP:-} - DASK_JOB_CLIENT_ENABLED=${DASK_JOB_CLIENT_ENABLED:-} - CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-} - EXPERIMENTAL_CHECKPOINTING_ENABLED=${EXPERIMENTAL_CHECKPOINTING_ENABLED:-} diff --git a/deployment/kubernetes/env-configmap.yaml b/deployment/kubernetes/env-configmap.yaml index 31e65955e..4ecfeeb7a 100644 --- a/deployment/kubernetes/env-configmap.yaml +++ b/deployment/kubernetes/env-configmap.yaml @@ -48,6 +48,7 @@ data: MIN_THREADS_ML_MODELS: "" # Indexing Configs NUM_INDEXING_WORKERS: "" + DISABLE_INDEX_UPDATE_ON_SWAP: "" DASK_JOB_CLIENT_ENABLED: "" CONTINUE_ON_CONNECTOR_FAILURE: "" EXPERIMENTAL_CHECKPOINTING_ENABLED: ""