diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 8285d72e2..f61553bb6 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -10,6 +10,7 @@ are multiple connector / credential pairs that have indexed it connector / credential pair from the access list (6) delete all relevant entries from postgres """ +import time from collections.abc import Callable from typing import cast @@ -23,9 +24,6 @@ from danswer.db.connector import fetch_connector_by_id from danswer.db.connector_credential_pair import ( delete_connector_credential_pair__no_commit, ) -from danswer.db.connector_credential_pair import ( - delete_document_set_relationships_for_cc_pair__no_commit, -) from danswer.db.connector_credential_pair import get_connector_credential_pair from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.document import delete_document_by_connector_credential_pair @@ -33,6 +31,10 @@ from danswer.db.document import delete_documents_complete from danswer.db.document import get_document_connector_cnts from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.document import prepare_to_modify_documents +from danswer.db.document_set import get_document_sets_by_ids +from danswer.db.document_set import ( + mark_cc_pair__document_set_relationships_to_be_deleted__no_commit, +) from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import delete_index_attempts from danswer.db.models import ConnectorCredentialPair @@ -103,38 +105,46 @@ def _delete_connector_credential_pair_batch( db_session.commit() -def postgres_cc_pair_cleanup__no_commit( +def cleanup_synced_entities( cc_pair: ConnectorCredentialPair, db_session: Session ) -> None: - """Cleans up all rows in Postgres related to the specified - connector_credential_pair + deletes the connector itself if there are - no other credentials left for the connector - """ - connector_id = cc_pair.connector_id - credential_id = cc_pair.credential_id + """Updates the document sets associated with the connector / credential pair, + then relies on the document set sync script to kick off Celery jobs which will + sync these updates to Vespa. - delete_index_attempts( - db_session=db_session, - connector_id=connector_id, - credential_id=credential_id, + Waits until the document sets are synced before returning.""" + logger.info(f"Cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'") + document_sets_ids_to_sync = list( + mark_cc_pair__document_set_relationships_to_be_deleted__no_commit( + cc_pair_id=cc_pair.id, + db_session=db_session, + ) ) - delete_document_set_relationships_for_cc_pair__no_commit( - cc_pair_id=cc_pair.id, - db_session=db_session, + db_session.commit() + + # wait till all document sets are synced before continuing + while True: + all_synced = True + document_sets = get_document_sets_by_ids( + db_session=db_session, document_set_ids=document_sets_ids_to_sync + ) + for document_set in document_sets: + if not document_set.is_up_to_date: + all_synced = False + + if all_synced: + break + + # wait for 30 seconds before checking again + db_session.commit() # end transaction + logger.info( + f"Document sets '{document_sets_ids_to_sync}' not synced yet, waiting 30s" + ) + time.sleep(30) + + logger.info( + f"Finished cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'" ) - delete_connector_credential_pair__no_commit( - db_session=db_session, - connector_id=connector_id, - credential_id=credential_id, - ) - # if there are no credentials left, delete the connector - connector = fetch_connector_by_id( - db_session=db_session, - connector_id=connector_id, - ) - if not connector or not len(connector.credentials): - logger.debug("Found no credentials left for connector, deleting connector") - db_session.delete(connector) def _delete_connector_credential_pair( @@ -164,15 +174,36 @@ def _delete_connector_credential_pair( ) num_docs_deleted += len(documents) - # cleanup everything else up - postgres_cleanup__no_commit = cast( + # Clean up document sets / access information from Postgres + # and sync these updates to Vespa + cleanup_synced_entities__versioned = cast( Callable[[ConnectorCredentialPair, Session], None], fetch_versioned_implementation( "danswer.background.connector_deletion", - "postgres_cc_pair_cleanup__no_commit", + "cleanup_synced_entities", ), ) - postgres_cleanup__no_commit(cc_pair, db_session) + cleanup_synced_entities__versioned(cc_pair, db_session) + + # clean up the rest of the related Postgres entities + delete_index_attempts( + db_session=db_session, + connector_id=connector_id, + credential_id=credential_id, + ) + delete_connector_credential_pair__no_commit( + db_session=db_session, + connector_id=connector_id, + credential_id=credential_id, + ) + # if there are no credentials left, delete the connector + connector = fetch_connector_by_id( + db_session=db_session, + connector_id=connector_id, + ) + if not connector or not len(connector.credentials): + logger.debug("Found no credentials left for connector, deleting connector") + db_session.delete(connector) db_session.commit() logger.info( diff --git a/backend/danswer/background/document_set_sync_script.py b/backend/danswer/background/document_set_sync_script.py index 37ce3be05..7761f1f5f 100644 --- a/backend/danswer/background/document_set_sync_script.py +++ b/backend/danswer/background/document_set_sync_script.py @@ -29,7 +29,9 @@ def _document_sync_loop() -> None: # kick off new tasks with Session(get_sqlalchemy_engine()) as db_session: # check if any document sets are not synced - document_set_info = fetch_document_sets(db_session=db_session) + document_set_info = fetch_document_sets( + db_session=db_session, include_outdated=True + ) for document_set, _ in document_set_info: if not document_set.is_up_to_date: if document_set.id in _ExistingTaskCache: diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index 35a9c7028..076e098bf 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -9,7 +9,6 @@ from sqlalchemy.orm import Session from danswer.db.connector import fetch_connector_by_id from danswer.db.credentials import fetch_credential_by_id from danswer.db.models import ConnectorCredentialPair -from danswer.db.models import DocumentSet__ConnectorCredentialPair from danswer.db.models import IndexingStatus from danswer.db.models import User from danswer.server.models import StatusResponse @@ -82,16 +81,6 @@ def update_connector_credential_pair( db_session.commit() -def delete_document_set_relationships_for_cc_pair__no_commit( - cc_pair_id: int, db_session: Session -) -> None: - """NOTE: does not commit transaction, this must be done by the caller""" - stmt = delete(DocumentSet__ConnectorCredentialPair).where( - DocumentSet__ConnectorCredentialPair.connector_credential_pair_id == cc_pair_id - ) - db_session.execute(stmt) - - def delete_connector_credential_pair__no_commit( db_session: Session, connector_id: int, diff --git a/backend/danswer/db/document_set.py b/backend/danswer/db/document_set.py index 10ee5aa88..3a1522602 100644 --- a/backend/danswer/db/document_set.py +++ b/backend/danswer/db/document_set.py @@ -49,6 +49,14 @@ def get_document_set_by_id( ) +def get_document_sets_by_ids( + db_session: Session, document_set_ids: list[int] +) -> Sequence[DocumentSetDBModel]: + return db_session.scalars( + select(DocumentSetDBModel).where(DocumentSetDBModel.id.in_(document_set_ids)) + ).all() + + def insert_document_set( document_set_creation_request: DocumentSetCreationRequest, user_id: UUID | None, @@ -196,36 +204,72 @@ def mark_document_set_as_to_be_deleted( raise +def mark_cc_pair__document_set_relationships_to_be_deleted__no_commit( + cc_pair_id: int, db_session: Session +) -> set[int]: + """Marks all CC Pair -> Document Set relationships for the specified + `cc_pair_id` as not current and returns the list of all document set IDs + affected. + + NOTE: rases a `ValueError` if any of the document sets are currently syncing + to avoid getting into a bad state.""" + document_set__cc_pair_relationships = db_session.scalars( + select(DocumentSet__ConnectorCredentialPair).where( + DocumentSet__ConnectorCredentialPair.connector_credential_pair_id + == cc_pair_id + ) + ).all() + + document_set_ids_touched: set[int] = set() + for document_set__cc_pair_relationship in document_set__cc_pair_relationships: + document_set__cc_pair_relationship.is_current = False + + if not document_set__cc_pair_relationship.document_set.is_up_to_date: + raise ValueError( + "Cannot delete CC pair while it is attached to a document set " + "that is syncing. Please wait for the document set to finish " + "syncing, and then try again." + ) + + document_set__cc_pair_relationship.document_set.is_up_to_date = False + document_set_ids_touched.add(document_set__cc_pair_relationship.document_set_id) + + return document_set_ids_touched + + def fetch_document_sets( - db_session: Session, + db_session: Session, include_outdated: bool = False ) -> list[tuple[DocumentSetDBModel, list[ConnectorCredentialPair]]]: """Return is a list where each element contains a tuple of: 1. The document set itself 2. All CC pairs associated with the document set""" + stmt = ( + select(DocumentSetDBModel, ConnectorCredentialPair) + .join( + DocumentSet__ConnectorCredentialPair, + DocumentSetDBModel.id + == DocumentSet__ConnectorCredentialPair.document_set_id, + isouter=True, # outer join is needed to also fetch document sets with no cc pairs + ) + .join( + ConnectorCredentialPair, + ConnectorCredentialPair.id + == DocumentSet__ConnectorCredentialPair.connector_credential_pair_id, + isouter=True, # outer join is needed to also fetch document sets with no cc pairs + ) + ) + if not include_outdated: + stmt = stmt.where( + or_( + DocumentSet__ConnectorCredentialPair.is_current == True, # noqa: E712 + # `None` handles case where no CC Pairs exist for a Document Set + DocumentSet__ConnectorCredentialPair.is_current.is_(None), + ) + ) + results = cast( list[tuple[DocumentSetDBModel, ConnectorCredentialPair | None]], - db_session.execute( - select(DocumentSetDBModel, ConnectorCredentialPair) - .join( - DocumentSet__ConnectorCredentialPair, - DocumentSetDBModel.id - == DocumentSet__ConnectorCredentialPair.document_set_id, - isouter=True, # outer join is needed to also fetch document sets with no cc pairs - ) - .join( - ConnectorCredentialPair, - ConnectorCredentialPair.id - == DocumentSet__ConnectorCredentialPair.connector_credential_pair_id, - isouter=True, # outer join is needed to also fetch document sets with no cc pairs - ) - .where( - or_( - DocumentSet__ConnectorCredentialPair.is_current - == True, # noqa: E712 - DocumentSet__ConnectorCredentialPair.is_current.is_(None), - ) - ) - ).all(), + db_session.execute(stmt).all(), ) aggregated_results: dict[ diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index 07a2fb1de..1f27af166 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -119,6 +119,8 @@ class DocumentSet__ConnectorCredentialPair(Base): primary_key=True, ) + document_set: Mapped["DocumentSet"] = relationship("DocumentSet") + class ConnectorCredentialPair(Base): """Connectors and Credentials can have a many-to-many relationship diff --git a/web/src/app/admin/documents/sets/page.tsx b/web/src/app/admin/documents/sets/page.tsx index 2a6a63f60..4c8f395e5 100644 --- a/web/src/app/admin/documents/sets/page.tsx +++ b/web/src/app/admin/documents/sets/page.tsx @@ -129,7 +129,6 @@ const DocumentSetTable = ({ }, ]} data={documentSets - .filter((documentSet) => documentSet.cc_pair_descriptors.length > 0) .slice((page - 1) * numToDisplay, page * numToDisplay) .map((documentSet) => { return { @@ -170,10 +169,14 @@ const DocumentSetTable = ({ ), status: documentSet.is_up_to_date ? (