Address bug with automatic document set cleanup on connector deletion

This commit is contained in:
Weves
2023-10-10 21:38:44 -07:00
committed by Chris Weaver
parent 3e05c4fa67
commit 876c6fdaa6
6 changed files with 144 additions and 73 deletions

View File

@@ -10,6 +10,7 @@ are multiple connector / credential pairs that have indexed it
connector / credential pair from the access list connector / credential pair from the access list
(6) delete all relevant entries from postgres (6) delete all relevant entries from postgres
""" """
import time
from collections.abc import Callable from collections.abc import Callable
from typing import cast from typing import cast
@@ -23,9 +24,6 @@ from danswer.db.connector import fetch_connector_by_id
from danswer.db.connector_credential_pair import ( from danswer.db.connector_credential_pair import (
delete_connector_credential_pair__no_commit, delete_connector_credential_pair__no_commit,
) )
from danswer.db.connector_credential_pair import (
delete_document_set_relationships_for_cc_pair__no_commit,
)
from danswer.db.connector_credential_pair import get_connector_credential_pair from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.document import delete_document_by_connector_credential_pair from danswer.db.document import delete_document_by_connector_credential_pair
@@ -33,6 +31,10 @@ from danswer.db.document import delete_documents_complete
from danswer.db.document import get_document_connector_cnts from danswer.db.document import get_document_connector_cnts
from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.document import prepare_to_modify_documents from danswer.db.document import prepare_to_modify_documents
from danswer.db.document_set import get_document_sets_by_ids
from danswer.db.document_set import (
mark_cc_pair__document_set_relationships_to_be_deleted__no_commit,
)
from danswer.db.engine import get_sqlalchemy_engine from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import delete_index_attempts from danswer.db.index_attempt import delete_index_attempts
from danswer.db.models import ConnectorCredentialPair from danswer.db.models import ConnectorCredentialPair
@@ -103,38 +105,46 @@ def _delete_connector_credential_pair_batch(
db_session.commit() db_session.commit()
def postgres_cc_pair_cleanup__no_commit( def cleanup_synced_entities(
cc_pair: ConnectorCredentialPair, db_session: Session cc_pair: ConnectorCredentialPair, db_session: Session
) -> None: ) -> None:
"""Cleans up all rows in Postgres related to the specified """Updates the document sets associated with the connector / credential pair,
connector_credential_pair + deletes the connector itself if there are then relies on the document set sync script to kick off Celery jobs which will
no other credentials left for the connector sync these updates to Vespa.
"""
connector_id = cc_pair.connector_id
credential_id = cc_pair.credential_id
delete_index_attempts( Waits until the document sets are synced before returning."""
db_session=db_session, logger.info(f"Cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'")
connector_id=connector_id, document_sets_ids_to_sync = list(
credential_id=credential_id, mark_cc_pair__document_set_relationships_to_be_deleted__no_commit(
)
delete_document_set_relationships_for_cc_pair__no_commit(
cc_pair_id=cc_pair.id, cc_pair_id=cc_pair.id,
db_session=db_session, db_session=db_session,
) )
delete_connector_credential_pair__no_commit(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
) )
# if there are no credentials left, delete the connector db_session.commit()
connector = fetch_connector_by_id(
db_session=db_session, # wait till all document sets are synced before continuing
connector_id=connector_id, while True:
all_synced = True
document_sets = get_document_sets_by_ids(
db_session=db_session, document_set_ids=document_sets_ids_to_sync
)
for document_set in document_sets:
if not document_set.is_up_to_date:
all_synced = False
if all_synced:
break
# wait for 30 seconds before checking again
db_session.commit() # end transaction
logger.info(
f"Document sets '{document_sets_ids_to_sync}' not synced yet, waiting 30s"
)
time.sleep(30)
logger.info(
f"Finished cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'"
) )
if not connector or not len(connector.credentials):
logger.debug("Found no credentials left for connector, deleting connector")
db_session.delete(connector)
def _delete_connector_credential_pair( def _delete_connector_credential_pair(
@@ -164,15 +174,36 @@ def _delete_connector_credential_pair(
) )
num_docs_deleted += len(documents) num_docs_deleted += len(documents)
# cleanup everything else up # Clean up document sets / access information from Postgres
postgres_cleanup__no_commit = cast( # and sync these updates to Vespa
cleanup_synced_entities__versioned = cast(
Callable[[ConnectorCredentialPair, Session], None], Callable[[ConnectorCredentialPair, Session], None],
fetch_versioned_implementation( fetch_versioned_implementation(
"danswer.background.connector_deletion", "danswer.background.connector_deletion",
"postgres_cc_pair_cleanup__no_commit", "cleanup_synced_entities",
), ),
) )
postgres_cleanup__no_commit(cc_pair, db_session) cleanup_synced_entities__versioned(cc_pair, db_session)
# clean up the rest of the related Postgres entities
delete_index_attempts(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
delete_connector_credential_pair__no_commit(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
# if there are no credentials left, delete the connector
connector = fetch_connector_by_id(
db_session=db_session,
connector_id=connector_id,
)
if not connector or not len(connector.credentials):
logger.debug("Found no credentials left for connector, deleting connector")
db_session.delete(connector)
db_session.commit() db_session.commit()
logger.info( logger.info(

View File

@@ -29,7 +29,9 @@ def _document_sync_loop() -> None:
# kick off new tasks # kick off new tasks
with Session(get_sqlalchemy_engine()) as db_session: with Session(get_sqlalchemy_engine()) as db_session:
# check if any document sets are not synced # check if any document sets are not synced
document_set_info = fetch_document_sets(db_session=db_session) document_set_info = fetch_document_sets(
db_session=db_session, include_outdated=True
)
for document_set, _ in document_set_info: for document_set, _ in document_set_info:
if not document_set.is_up_to_date: if not document_set.is_up_to_date:
if document_set.id in _ExistingTaskCache: if document_set.id in _ExistingTaskCache:

View File

@@ -9,7 +9,6 @@ from sqlalchemy.orm import Session
from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import fetch_connector_by_id
from danswer.db.credentials import fetch_credential_by_id from danswer.db.credentials import fetch_credential_by_id
from danswer.db.models import ConnectorCredentialPair from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import DocumentSet__ConnectorCredentialPair
from danswer.db.models import IndexingStatus from danswer.db.models import IndexingStatus
from danswer.db.models import User from danswer.db.models import User
from danswer.server.models import StatusResponse from danswer.server.models import StatusResponse
@@ -82,16 +81,6 @@ def update_connector_credential_pair(
db_session.commit() db_session.commit()
def delete_document_set_relationships_for_cc_pair__no_commit(
cc_pair_id: int, db_session: Session
) -> None:
"""NOTE: does not commit transaction, this must be done by the caller"""
stmt = delete(DocumentSet__ConnectorCredentialPair).where(
DocumentSet__ConnectorCredentialPair.connector_credential_pair_id == cc_pair_id
)
db_session.execute(stmt)
def delete_connector_credential_pair__no_commit( def delete_connector_credential_pair__no_commit(
db_session: Session, db_session: Session,
connector_id: int, connector_id: int,

View File

@@ -49,6 +49,14 @@ def get_document_set_by_id(
) )
def get_document_sets_by_ids(
db_session: Session, document_set_ids: list[int]
) -> Sequence[DocumentSetDBModel]:
return db_session.scalars(
select(DocumentSetDBModel).where(DocumentSetDBModel.id.in_(document_set_ids))
).all()
def insert_document_set( def insert_document_set(
document_set_creation_request: DocumentSetCreationRequest, document_set_creation_request: DocumentSetCreationRequest,
user_id: UUID | None, user_id: UUID | None,
@@ -196,15 +204,46 @@ def mark_document_set_as_to_be_deleted(
raise raise
def mark_cc_pair__document_set_relationships_to_be_deleted__no_commit(
cc_pair_id: int, db_session: Session
) -> set[int]:
"""Marks all CC Pair -> Document Set relationships for the specified
`cc_pair_id` as not current and returns the list of all document set IDs
affected.
NOTE: rases a `ValueError` if any of the document sets are currently syncing
to avoid getting into a bad state."""
document_set__cc_pair_relationships = db_session.scalars(
select(DocumentSet__ConnectorCredentialPair).where(
DocumentSet__ConnectorCredentialPair.connector_credential_pair_id
== cc_pair_id
)
).all()
document_set_ids_touched: set[int] = set()
for document_set__cc_pair_relationship in document_set__cc_pair_relationships:
document_set__cc_pair_relationship.is_current = False
if not document_set__cc_pair_relationship.document_set.is_up_to_date:
raise ValueError(
"Cannot delete CC pair while it is attached to a document set "
"that is syncing. Please wait for the document set to finish "
"syncing, and then try again."
)
document_set__cc_pair_relationship.document_set.is_up_to_date = False
document_set_ids_touched.add(document_set__cc_pair_relationship.document_set_id)
return document_set_ids_touched
def fetch_document_sets( def fetch_document_sets(
db_session: Session, db_session: Session, include_outdated: bool = False
) -> list[tuple[DocumentSetDBModel, list[ConnectorCredentialPair]]]: ) -> list[tuple[DocumentSetDBModel, list[ConnectorCredentialPair]]]:
"""Return is a list where each element contains a tuple of: """Return is a list where each element contains a tuple of:
1. The document set itself 1. The document set itself
2. All CC pairs associated with the document set""" 2. All CC pairs associated with the document set"""
results = cast( stmt = (
list[tuple[DocumentSetDBModel, ConnectorCredentialPair | None]],
db_session.execute(
select(DocumentSetDBModel, ConnectorCredentialPair) select(DocumentSetDBModel, ConnectorCredentialPair)
.join( .join(
DocumentSet__ConnectorCredentialPair, DocumentSet__ConnectorCredentialPair,
@@ -218,14 +257,19 @@ def fetch_document_sets(
== DocumentSet__ConnectorCredentialPair.connector_credential_pair_id, == DocumentSet__ConnectorCredentialPair.connector_credential_pair_id,
isouter=True, # outer join is needed to also fetch document sets with no cc pairs isouter=True, # outer join is needed to also fetch document sets with no cc pairs
) )
.where( )
if not include_outdated:
stmt = stmt.where(
or_( or_(
DocumentSet__ConnectorCredentialPair.is_current DocumentSet__ConnectorCredentialPair.is_current == True, # noqa: E712
== True, # noqa: E712 # `None` handles case where no CC Pairs exist for a Document Set
DocumentSet__ConnectorCredentialPair.is_current.is_(None), DocumentSet__ConnectorCredentialPair.is_current.is_(None),
) )
) )
).all(),
results = cast(
list[tuple[DocumentSetDBModel, ConnectorCredentialPair | None]],
db_session.execute(stmt).all(),
) )
aggregated_results: dict[ aggregated_results: dict[

View File

@@ -119,6 +119,8 @@ class DocumentSet__ConnectorCredentialPair(Base):
primary_key=True, primary_key=True,
) )
document_set: Mapped["DocumentSet"] = relationship("DocumentSet")
class ConnectorCredentialPair(Base): class ConnectorCredentialPair(Base):
"""Connectors and Credentials can have a many-to-many relationship """Connectors and Credentials can have a many-to-many relationship

View File

@@ -129,7 +129,6 @@ const DocumentSetTable = ({
}, },
]} ]}
data={documentSets data={documentSets
.filter((documentSet) => documentSet.cc_pair_descriptors.length > 0)
.slice((page - 1) * numToDisplay, page * numToDisplay) .slice((page - 1) * numToDisplay, page * numToDisplay)
.map((documentSet) => { .map((documentSet) => {
return { return {
@@ -170,10 +169,14 @@ const DocumentSetTable = ({
), ),
status: documentSet.is_up_to_date ? ( status: documentSet.is_up_to_date ? (
<div className="text-emerald-600">Up to date!</div> <div className="text-emerald-600">Up to date!</div>
) : ( ) : documentSet.cc_pair_descriptors.length > 0 ? (
<div className="text-gray-300 w-10"> <div className="text-gray-300 w-10">
<LoadingAnimation text="Syncing" /> <LoadingAnimation text="Syncing" />
</div> </div>
) : (
<div className="text-red-500 w-10">
<LoadingAnimation text="Deleting" />
</div>
), ),
delete: ( delete: (
<div <div
@@ -182,13 +185,13 @@ const DocumentSetTable = ({
const response = await deleteDocumentSet(documentSet.id); const response = await deleteDocumentSet(documentSet.id);
if (response.ok) { if (response.ok) {
setPopup({ setPopup({
message: `Document set "${documentSet.name}" deleted`, message: `Document set "${documentSet.name}" scheduled for deletion`,
type: "success", type: "success",
}); });
} else { } else {
const errorMsg = (await response.json()).detail; const errorMsg = (await response.json()).detail;
setPopup({ setPopup({
message: `Failed to delete document set - ${errorMsg}`, message: `Failed to schedule document set for deletion - ${errorMsg}`,
type: "error", type: "error",
}); });
} }