mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-13 18:12:14 +02:00
Fix weird re-index state (#4439)
* Fix weird re-index state * Address rkuo's comments
This commit is contained in:
@@ -613,8 +613,19 @@ def fetch_connector_credential_pairs(
|
||||
|
||||
def resync_cc_pair(
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
search_settings_id: int,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""
|
||||
Updates state stored in the connector_credential_pair table based on the
|
||||
latest index attempt for the given search settings.
|
||||
|
||||
Args:
|
||||
cc_pair: ConnectorCredentialPair to resync
|
||||
search_settings_id: SearchSettings to use for resync
|
||||
db_session: Database session
|
||||
"""
|
||||
|
||||
def find_latest_index_attempt(
|
||||
connector_id: int,
|
||||
credential_id: int,
|
||||
@@ -627,11 +638,10 @@ def resync_cc_pair(
|
||||
ConnectorCredentialPair,
|
||||
IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id,
|
||||
)
|
||||
.join(SearchSettings, IndexAttempt.search_settings_id == SearchSettings.id)
|
||||
.filter(
|
||||
ConnectorCredentialPair.connector_id == connector_id,
|
||||
ConnectorCredentialPair.credential_id == credential_id,
|
||||
SearchSettings.status == IndexModelStatus.PRESENT,
|
||||
IndexAttempt.search_settings_id == search_settings_id,
|
||||
)
|
||||
)
|
||||
|
||||
|
@@ -43,6 +43,8 @@ from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
ONE_HOUR_IN_SECONDS = 60 * 60
|
||||
|
||||
|
||||
def check_docs_exist(db_session: Session) -> bool:
|
||||
stmt = select(exists(DbDocument))
|
||||
@@ -607,6 +609,46 @@ def delete_documents_complete__no_commit(
|
||||
delete_documents__no_commit(db_session, document_ids)
|
||||
|
||||
|
||||
def delete_all_documents_for_connector_credential_pair(
|
||||
db_session: Session,
|
||||
connector_id: int,
|
||||
credential_id: int,
|
||||
timeout: int = ONE_HOUR_IN_SECONDS,
|
||||
) -> None:
|
||||
"""Delete all documents for a given connector credential pair.
|
||||
This will delete all documents and their associated data (chunks, feedback, tags, etc.)
|
||||
|
||||
NOTE: a bit inefficient, but it's not a big deal since this is done rarely - only during
|
||||
an index swap. If we wanted to make this more efficient, we could use a single delete
|
||||
statement + cascade.
|
||||
"""
|
||||
batch_size = 1000
|
||||
start_time = time.monotonic()
|
||||
|
||||
while True:
|
||||
# Get document IDs in batches
|
||||
stmt = (
|
||||
select(DocumentByConnectorCredentialPair.id)
|
||||
.where(
|
||||
DocumentByConnectorCredentialPair.connector_id == connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id == credential_id,
|
||||
)
|
||||
.limit(batch_size)
|
||||
)
|
||||
document_ids = db_session.scalars(stmt).all()
|
||||
|
||||
if not document_ids:
|
||||
break
|
||||
|
||||
delete_documents_complete__no_commit(
|
||||
db_session=db_session, document_ids=list(document_ids)
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
if time.monotonic() - start_time > timeout:
|
||||
raise RuntimeError("Timeout reached while deleting documents")
|
||||
|
||||
|
||||
def acquire_document_locks(db_session: Session, document_ids: list[str]) -> bool:
|
||||
"""Acquire locks for the specified documents. Ideally this shouldn't be
|
||||
called with large list of document_ids (an exception could be made if the
|
||||
|
@@ -710,6 +710,25 @@ def cancel_indexing_attempts_past_model(
|
||||
)
|
||||
|
||||
|
||||
def cancel_indexing_attempts_for_search_settings(
|
||||
search_settings_id: int,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Stops all indexing attempts that are in progress or not started for
|
||||
the specified search settings."""
|
||||
|
||||
db_session.execute(
|
||||
update(IndexAttempt)
|
||||
.where(
|
||||
IndexAttempt.status.in_(
|
||||
[IndexingStatus.IN_PROGRESS, IndexingStatus.NOT_STARTED]
|
||||
),
|
||||
IndexAttempt.search_settings_id == search_settings_id,
|
||||
)
|
||||
.values(status=IndexingStatus.FAILED)
|
||||
)
|
||||
|
||||
|
||||
def count_unique_cc_pairs_with_successful_index_attempts(
|
||||
search_settings_id: int | None,
|
||||
db_session: Session,
|
||||
|
@@ -3,8 +3,9 @@ from sqlalchemy.orm import Session
|
||||
from onyx.configs.constants import KV_REINDEX_KEY
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pairs
|
||||
from onyx.db.connector_credential_pair import resync_cc_pair
|
||||
from onyx.db.document import delete_all_documents_for_connector_credential_pair
|
||||
from onyx.db.enums import IndexModelStatus
|
||||
from onyx.db.index_attempt import cancel_indexing_attempts_past_model
|
||||
from onyx.db.index_attempt import cancel_indexing_attempts_for_search_settings
|
||||
from onyx.db.index_attempt import (
|
||||
count_unique_cc_pairs_with_successful_index_attempts,
|
||||
)
|
||||
@@ -26,31 +27,49 @@ def _perform_index_swap(
|
||||
current_search_settings: SearchSettings,
|
||||
secondary_search_settings: SearchSettings,
|
||||
all_cc_pairs: list[ConnectorCredentialPair],
|
||||
cleanup_documents: bool = False,
|
||||
) -> None:
|
||||
"""Swap the indices and expire the old one."""
|
||||
current_search_settings = get_current_search_settings(db_session)
|
||||
update_search_settings_status(
|
||||
search_settings=current_search_settings,
|
||||
new_status=IndexModelStatus.PAST,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
update_search_settings_status(
|
||||
search_settings=secondary_search_settings,
|
||||
new_status=IndexModelStatus.PRESENT,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
if len(all_cc_pairs) > 0:
|
||||
kv_store = get_kv_store()
|
||||
kv_store.store(KV_REINDEX_KEY, False)
|
||||
|
||||
# Expire jobs for the now past index/embedding model
|
||||
cancel_indexing_attempts_past_model(db_session)
|
||||
cancel_indexing_attempts_for_search_settings(
|
||||
search_settings_id=current_search_settings.id,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# Recount aggregates
|
||||
for cc_pair in all_cc_pairs:
|
||||
resync_cc_pair(cc_pair, db_session=db_session)
|
||||
resync_cc_pair(
|
||||
cc_pair=cc_pair,
|
||||
# sync based on the new search settings
|
||||
search_settings_id=secondary_search_settings.id,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
if cleanup_documents:
|
||||
# clean up all DocumentByConnectorCredentialPair / Document rows, since we're
|
||||
# doing an instant swap and no documents will exist in the new index.
|
||||
for cc_pair in all_cc_pairs:
|
||||
delete_all_documents_for_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
)
|
||||
|
||||
# swap over search settings
|
||||
update_search_settings_status(
|
||||
search_settings=current_search_settings,
|
||||
new_status=IndexModelStatus.PAST,
|
||||
db_session=db_session,
|
||||
)
|
||||
update_search_settings_status(
|
||||
search_settings=secondary_search_settings,
|
||||
new_status=IndexModelStatus.PRESENT,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# remove the old index from the vector db
|
||||
document_index = get_default_document_index(secondary_search_settings, None)
|
||||
@@ -88,6 +107,9 @@ def check_and_perform_index_swap(db_session: Session) -> SearchSettings | None:
|
||||
current_search_settings=current_search_settings,
|
||||
secondary_search_settings=secondary_search_settings,
|
||||
all_cc_pairs=all_cc_pairs,
|
||||
# clean up all DocumentByConnectorCredentialPair / Document rows, since we're
|
||||
# doing an instant swap.
|
||||
cleanup_documents=True,
|
||||
)
|
||||
return current_search_settings
|
||||
|
||||
|
@@ -117,7 +117,11 @@ def set_new_search_settings(
|
||||
search_settings_id=search_settings.id, db_session=db_session
|
||||
)
|
||||
for cc_pair in get_connector_credential_pairs(db_session):
|
||||
resync_cc_pair(cc_pair, db_session=db_session)
|
||||
resync_cc_pair(
|
||||
cc_pair=cc_pair,
|
||||
search_settings_id=new_search_settings.id,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
db_session.commit()
|
||||
return IdReturn(id=new_search_settings.id)
|
||||
|
@@ -96,7 +96,11 @@ def setup_onyx(
|
||||
)
|
||||
|
||||
for cc_pair in get_connector_credential_pairs(db_session):
|
||||
resync_cc_pair(cc_pair, db_session=db_session)
|
||||
resync_cc_pair(
|
||||
cc_pair=cc_pair,
|
||||
search_settings_id=search_settings.id,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# Expire all old embedding models indexing attempts, technically redundant
|
||||
cancel_indexing_attempts_past_model(db_session)
|
||||
|
@@ -487,11 +487,6 @@ export default function EmbeddingForm() {
|
||||
};
|
||||
|
||||
const handleReIndex = async () => {
|
||||
console.log("handleReIndex");
|
||||
console.log(selectedProvider);
|
||||
console.log(advancedEmbeddingDetails);
|
||||
console.log(rerankingDetails);
|
||||
console.log(reindexType);
|
||||
if (!selectedProvider) {
|
||||
return;
|
||||
}
|
||||
|
Reference in New Issue
Block a user