Option to stop sync-ing primary index when building secondary one (#1096)

This commit is contained in:
Yuhong Sun 2024-02-18 22:53:26 -08:00 committed by GitHub
parent 15335dcd7d
commit c1d1651b43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 61 additions and 3 deletions

View File

@ -185,6 +185,7 @@ def _run_indexing(
db_session.refresh(index_attempt)
if index_attempt.status != IndexingStatus.IN_PROGRESS:
# Likely due to user manually disabling it or model swap
raise RuntimeError("Index Attempt was canceled")
logger.debug(

View File

@ -14,6 +14,7 @@ from danswer.background.indexing.job_client import SimpleJobClient
from danswer.background.indexing.run_indexing import run_indexing_entrypoint
from danswer.configs.app_configs import CLEANUP_INDEXING_JOBS_TIMEOUT
from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.configs.app_configs import LOG_LEVEL
from danswer.configs.app_configs import NUM_INDEXING_WORKERS
from danswer.configs.model_configs import MIN_THREADS_ML_MODELS
@ -69,8 +70,15 @@ def _should_create_new_indexing(
connector: Connector,
last_index: IndexAttempt | None,
model: EmbeddingModel,
secondary_index_building: bool,
db_session: Session,
) -> bool:
# User can still manually create single indexing attempts via the UI for the
# currently in use index
if DISABLE_INDEX_UPDATE_ON_SWAP:
if model.status == IndexModelStatus.PRESENT and secondary_index_building:
return False
# When switching over models, always index at least once
if model.status == IndexModelStatus.FUTURE and not last_index:
if connector.id == 0: # Ingestion API
@ -186,7 +194,11 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None:
connector.id, credential.id, model.id, db_session
)
if not _should_create_new_indexing(
connector, last_attempt, model, db_session
connector=connector,
last_index=last_attempt,
model=model,
secondary_index_building=len(embedding_models) > 1,
db_session=db_session,
):
continue
@ -255,6 +267,9 @@ def cleanup_indexing_jobs(
)
for index_attempt in in_progress_indexing_attempts:
if index_attempt.id in existing_jobs:
# If index attempt is canceled, stop the run
if index_attempt.status == IndexingStatus.FAILED:
existing_jobs[index_attempt.id].cancel()
# check to see if the job has been updated in last `timeout_hours` hours, if not
# assume it to frozen in some bad state and just mark it as failed. Note: this relies
# on the fact that the `time_updated` field is constantly updated every

View File

@ -177,6 +177,11 @@ EXPERIMENTAL_CHECKPOINTING_ENABLED = (
CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get(
"CONTINUE_ON_CONNECTOR_FAILURE", ""
).lower() not in ["false", ""]
# When swapping to a new embedding model, a secondary index is created in the background, to conserve
# resources, we pause updates on the primary index by default while the secondary index is created
DISABLE_INDEX_UPDATE_ON_SWAP = (
os.environ.get("DISABLE_INDEX_UPDATE_ON_SWAP", "").lower() == "true"
)
# Controls how many worker processes we spin up to index documents in the
# background. This is useful for speeding up indexing, but does require a
# fairly large amount of memory in order to increase substantially, since

View File

@ -229,13 +229,24 @@ def expire_index_attempts(
embedding_model_id: int,
db_session: Session,
) -> None:
delete_query = (
delete(IndexAttempt)
.where(IndexAttempt.embedding_model_id == embedding_model_id)
.where(IndexAttempt.status == IndexingStatus.NOT_STARTED)
)
db_session.execute(delete_query)
update_query = (
update(IndexAttempt)
.where(IndexAttempt.embedding_model_id == embedding_model_id)
.where(IndexAttempt.status != IndexingStatus.SUCCESS)
.values(status=IndexingStatus.FAILED, error_msg="Embedding model swapped")
.values(
status=IndexingStatus.FAILED,
error_msg="Canceled due to embedding model swap",
)
)
db_session.execute(update_query)
db_session.commit()

View File

@ -25,6 +25,7 @@ from danswer.configs.app_configs import APP_HOST
from danswer.configs.app_configs import APP_PORT
from danswer.configs.app_configs import AUTH_TYPE
from danswer.configs.app_configs import DISABLE_GENERATIVE_AI
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.configs.app_configs import MODEL_SERVER_HOST
from danswer.configs.app_configs import MODEL_SERVER_PORT
from danswer.configs.app_configs import OAUTH_CLIENT_ID
@ -41,12 +42,15 @@ from danswer.configs.model_configs import GEN_AI_MODEL_VERSION
from danswer.db.chat import delete_old_default_personas
from danswer.db.connector import create_initial_default_connector
from danswer.db.connector_credential_pair import associate_default_cc_pair
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import resync_cc_pair
from danswer.db.credentials import create_initial_public_credential
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.embedding_model import insert_initial_embedding_models
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import expire_index_attempts
from danswer.document_index.factory import get_default_document_index
from danswer.llm.factory import get_default_llm
from danswer.search.search_nlp_models import warm_up_models
@ -257,7 +261,16 @@ def get_application() -> FastAPI:
secondary_db_embedding_model = get_secondary_db_embedding_model(db_session)
# cleanup "NOT_STARTED" indexing attempts for embedding models that are no longer used
# Break bad state for thrashing indexes
if secondary_db_embedding_model and DISABLE_INDEX_UPDATE_ON_SWAP:
expire_index_attempts(
embedding_model_id=db_embedding_model.id, db_session=db_session
)
for cc_pair in get_connector_credential_pairs(db_session):
resync_cc_pair(cc_pair, db_session=db_session)
# Expire all old embedding models indexing attempts, technically redundant
cancel_indexing_attempts_past_model(db_session)
logger.info(f'Using Embedding model: "{db_embedding_model.model_name}"')

View File

@ -6,6 +6,9 @@ from sqlalchemy.orm import Session
from danswer.auth.users import current_admin_user
from danswer.auth.users import current_user
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import resync_cc_pair
from danswer.db.embedding_model import create_embedding_model
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
@ -78,6 +81,14 @@ def set_new_embedding_model(
secondary_index_embedding_dim=new_model.model_dim,
)
# Pause index attempts for the currently in use index to preserve resources
if DISABLE_INDEX_UPDATE_ON_SWAP:
expire_index_attempts(
embedding_model_id=current_model.id, db_session=db_session
)
for cc_pair in get_connector_credential_pairs(db_session):
resync_cc_pair(cc_pair, db_session=db_session)
return IdReturn(id=new_model.id)

View File

@ -132,6 +132,7 @@ services:
- MIN_THREADS_ML_MODELS=${MIN_THREADS_ML_MODELS:-}
# Indexing Configs
- NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-}
- DISABLE_INDEX_UPDATE_ON_SWAP=${DISABLE_INDEX_UPDATE_ON_SWAP:-}
- DASK_JOB_CLIENT_ENABLED=${DASK_JOB_CLIENT_ENABLED:-}
- CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-}
- EXPERIMENTAL_CHECKPOINTING_ENABLED=${EXPERIMENTAL_CHECKPOINTING_ENABLED:-}

View File

@ -48,6 +48,7 @@ data:
MIN_THREADS_ML_MODELS: ""
# Indexing Configs
NUM_INDEXING_WORKERS: ""
DISABLE_INDEX_UPDATE_ON_SWAP: ""
DASK_JOB_CLIENT_ENABLED: ""
CONTINUE_ON_CONNECTOR_FAILURE: ""
EXPERIMENTAL_CHECKPOINTING_ENABLED: ""