Swap Index Early (#1353)

This commit is contained in:
Yuhong Sun 2024-04-19 10:38:15 -07:00 committed by GitHub
parent 82b9cb4cc1
commit 87f304dfd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 62 additions and 49 deletions

View File

@ -17,19 +17,12 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.configs.app_configs import NUM_INDEXING_WORKERS
from danswer.db.connector import fetch_connectors
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed
from danswer.db.connector_credential_pair import resync_cc_pair
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.embedding_model import update_embedding_model_status
from danswer.db.engine import get_db_current_time
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import (
count_unique_cc_pairs_with_successful_index_attempts,
)
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import get_inprogress_index_attempts
@ -41,6 +34,7 @@ from danswer.db.models import EmbeddingModel
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import IndexModelStatus
from danswer.db.swap_index import check_index_swap
from danswer.search.search_nlp_models import warm_up_encoders
from danswer.utils.logger import setup_logger
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
@ -354,51 +348,10 @@ def kickoff_indexing_jobs(
return existing_jobs_copy
def check_index_swap(db_session: Session) -> None:
"""Get count of cc-pairs and count of successful index_attempts for the
new model grouped by connector + credential, if it's the same, then assume
new index is done building. If so, swap the indices and expire the old one."""
# Default CC-pair created for Ingestion API unused here
all_cc_pairs = get_connector_credential_pairs(db_session)
cc_pair_count = len(all_cc_pairs) - 1
embedding_model = get_secondary_db_embedding_model(db_session)
if not embedding_model:
return
unique_cc_indexings = count_unique_cc_pairs_with_successful_index_attempts(
embedding_model_id=embedding_model.id, db_session=db_session
)
if unique_cc_indexings > cc_pair_count:
raise RuntimeError("More unique indexings than cc pairs, should not occur")
if cc_pair_count == unique_cc_indexings:
# Swap indices
now_old_embedding_model = get_current_db_embedding_model(db_session)
update_embedding_model_status(
embedding_model=now_old_embedding_model,
new_status=IndexModelStatus.PAST,
db_session=db_session,
)
update_embedding_model_status(
embedding_model=embedding_model,
new_status=IndexModelStatus.PRESENT,
db_session=db_session,
)
# Expire jobs for the now past index/embedding model
cancel_indexing_attempts_past_model(db_session)
# Recount aggregates
for cc_pair in all_cc_pairs:
resync_cc_pair(cc_pair, db_session=db_session)
def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> None:
engine = get_sqlalchemy_engine()
with Session(engine) as db_session:
check_index_swap(db_session=db_session)
db_embedding_model = get_current_db_embedding_model(db_session)
# So that the first time users aren't surprised by really slow speed of first

View File

@ -0,0 +1,58 @@
from sqlalchemy.orm import Session
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import resync_cc_pair
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.embedding_model import update_embedding_model_status
from danswer.db.enums import IndexModelStatus
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import (
count_unique_cc_pairs_with_successful_index_attempts,
)
from danswer.utils.logger import setup_logger
logger = setup_logger()
def check_index_swap(db_session: Session) -> None:
"""Get count of cc-pairs and count of successful index_attempts for the
new model grouped by connector + credential, if it's the same, then assume
new index is done building. If so, swap the indices and expire the old one."""
# Default CC-pair created for Ingestion API unused here
all_cc_pairs = get_connector_credential_pairs(db_session)
cc_pair_count = max(len(all_cc_pairs) - 1, 0)
embedding_model = get_secondary_db_embedding_model(db_session)
if not embedding_model:
return
unique_cc_indexings = count_unique_cc_pairs_with_successful_index_attempts(
embedding_model_id=embedding_model.id, db_session=db_session
)
if unique_cc_indexings > cc_pair_count:
raise RuntimeError("More unique indexings than cc pairs, should not occur")
if cc_pair_count == 0 or cc_pair_count == unique_cc_indexings:
# Swap indices
now_old_embedding_model = get_current_db_embedding_model(db_session)
update_embedding_model_status(
embedding_model=now_old_embedding_model,
new_status=IndexModelStatus.PAST,
db_session=db_session,
)
update_embedding_model_status(
embedding_model=embedding_model,
new_status=IndexModelStatus.PRESENT,
db_session=db_session,
)
if cc_pair_count > 0:
# Expire jobs for the now past index/embedding model
cancel_indexing_attempts_past_model(db_session)
# Recount aggregates
for cc_pair in all_cc_pairs:
resync_cc_pair(cc_pair, db_session=db_session)

View File

@ -46,6 +46,7 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import expire_index_attempts
from danswer.db.swap_index import check_index_swap
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs.port_configs import port_filesystem_to_postgres
from danswer.llm.factory import get_default_llm
@ -180,6 +181,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
)
with Session(engine) as db_session:
check_index_swap(db_session=db_session)
db_embedding_model = get_current_db_embedding_model(db_session)
secondary_db_embedding_model = get_secondary_db_embedding_model(db_session)