mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-11 05:19:52 +02:00
Connector deletion fix (#2293)
--------- Co-authored-by: Weves <chrisweaver101@gmail.com>
This commit is contained in:
parent
c122be2f6a
commit
aa84846298
@ -17,6 +17,7 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
|
||||
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
|
||||
from danswer.configs.app_configs import NUM_INDEXING_WORKERS
|
||||
from danswer.configs.app_configs import NUM_SECONDARY_INDEXING_WORKERS
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import POSTGRES_INDEXER_APP_NAME
|
||||
from danswer.db.connector import fetch_connectors
|
||||
from danswer.db.connector_credential_pair import fetch_connector_credential_pairs
|
||||
@ -46,7 +47,6 @@ from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
|
||||
from shared_configs.configs import LOG_LEVEL
|
||||
from shared_configs.configs import MODEL_SERVER_PORT
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# If the indexing dies, it's most likely due to resource constraints,
|
||||
@ -67,6 +67,10 @@ def _should_create_new_indexing(
|
||||
) -> bool:
|
||||
connector = cc_pair.connector
|
||||
|
||||
# don't kick off indexing for `NOT_APPLICABLE` sources
|
||||
if connector.source == DocumentSource.NOT_APPLICABLE:
|
||||
return False
|
||||
|
||||
# User can still manually create single indexing attempts via the UI for the
|
||||
# currently in use index
|
||||
if DISABLE_INDEX_UPDATE_ON_SWAP:
|
||||
|
@ -524,37 +524,66 @@ def fetch_document_sets_for_documents(
|
||||
db_session: Session,
|
||||
) -> Sequence[tuple[str, list[str]]]:
|
||||
"""Gives back a list of (document_id, list[document_set_names]) tuples"""
|
||||
|
||||
"""Building subqueries"""
|
||||
# NOTE: have to build these subqueries first in order to guarantee that we get one
|
||||
# returned row for each specified document_id. Basically, we want to do the filters first,
|
||||
# then the outer joins.
|
||||
|
||||
# don't include CC pairs that are being deleted
|
||||
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
|
||||
# as we can assume their document sets are no longer relevant
|
||||
valid_cc_pairs_subquery = aliased(
|
||||
ConnectorCredentialPair,
|
||||
select(ConnectorCredentialPair)
|
||||
.where(
|
||||
ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING
|
||||
) # noqa: E712
|
||||
.subquery(),
|
||||
)
|
||||
|
||||
valid_document_set__cc_pairs_subquery = aliased(
|
||||
DocumentSet__ConnectorCredentialPair,
|
||||
select(DocumentSet__ConnectorCredentialPair)
|
||||
.where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712
|
||||
.subquery(),
|
||||
)
|
||||
"""End building subqueries"""
|
||||
|
||||
stmt = (
|
||||
select(Document.id, func.array_agg(DocumentSetDBModel.name))
|
||||
.join(
|
||||
DocumentSet__ConnectorCredentialPair,
|
||||
DocumentSetDBModel.id
|
||||
== DocumentSet__ConnectorCredentialPair.document_set_id,
|
||||
select(
|
||||
Document.id,
|
||||
func.coalesce(
|
||||
func.array_remove(func.array_agg(DocumentSetDBModel.name), None), []
|
||||
).label("document_set_names"),
|
||||
)
|
||||
.join(
|
||||
ConnectorCredentialPair,
|
||||
ConnectorCredentialPair.id
|
||||
== DocumentSet__ConnectorCredentialPair.connector_credential_pair_id,
|
||||
)
|
||||
.join(
|
||||
# Here we select document sets by relation:
|
||||
# Document -> DocumentByConnectorCredentialPair -> ConnectorCredentialPair ->
|
||||
# DocumentSet__ConnectorCredentialPair -> DocumentSet
|
||||
.outerjoin(
|
||||
DocumentByConnectorCredentialPair,
|
||||
and_(
|
||||
DocumentByConnectorCredentialPair.connector_id
|
||||
== ConnectorCredentialPair.connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id
|
||||
== ConnectorCredentialPair.credential_id,
|
||||
),
|
||||
)
|
||||
.join(
|
||||
Document,
|
||||
Document.id == DocumentByConnectorCredentialPair.id,
|
||||
)
|
||||
.outerjoin(
|
||||
valid_cc_pairs_subquery,
|
||||
and_(
|
||||
DocumentByConnectorCredentialPair.connector_id
|
||||
== valid_cc_pairs_subquery.connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id
|
||||
== valid_cc_pairs_subquery.credential_id,
|
||||
),
|
||||
)
|
||||
.outerjoin(
|
||||
valid_document_set__cc_pairs_subquery,
|
||||
valid_cc_pairs_subquery.id
|
||||
== valid_document_set__cc_pairs_subquery.connector_credential_pair_id,
|
||||
)
|
||||
.outerjoin(
|
||||
DocumentSetDBModel,
|
||||
DocumentSetDBModel.id
|
||||
== valid_document_set__cc_pairs_subquery.document_set_id,
|
||||
)
|
||||
.where(Document.id.in_(document_ids))
|
||||
# don't include CC pairs that are being deleted
|
||||
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
|
||||
# as we can assume their document sets are no longer relevant
|
||||
.where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING)
|
||||
.where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712
|
||||
.group_by(Document.id)
|
||||
)
|
||||
return db_session.execute(stmt).all() # type: ignore
|
||||
|
@ -166,10 +166,14 @@ def create_deletion_attempt_for_connector_id(
|
||||
get_editable=True,
|
||||
)
|
||||
if cc_pair is None:
|
||||
error = (
|
||||
f"Connector with ID '{connector_id}' and credential ID "
|
||||
f"'{credential_id}' does not exist. Has it already been deleted?"
|
||||
)
|
||||
logger.error(error)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Connector with ID '{connector_id}' and credential ID "
|
||||
f"'{credential_id}' does not exist. Has it already been deleted?",
|
||||
detail=error,
|
||||
)
|
||||
|
||||
# Cancel any scheduled indexing attempts
|
||||
|
@ -264,6 +264,8 @@ def test_connector_deletion_for_overlapping_connectors(
|
||||
doc_creating_user=admin_user,
|
||||
)
|
||||
|
||||
# EVERYTHING BELOW HERE IS CURRENTLY BROKEN AND NEEDS TO BE FIXED SERVER SIDE
|
||||
|
||||
# delete connector 1
|
||||
CCPairManager.pause_cc_pair(
|
||||
cc_pair=cc_pair_1,
|
||||
@ -274,32 +276,30 @@ def test_connector_deletion_for_overlapping_connectors(
|
||||
user_performing_action=admin_user,
|
||||
)
|
||||
|
||||
# EVERYTHING BELOW HERE IS CURRENTLY BROKEN AND NEEDS TO BE FIXED SERVER SIDE
|
||||
|
||||
# wait for deletion to finish
|
||||
# CCPairManager.wait_for_deletion_completion(user_performing_action=admin_user)
|
||||
CCPairManager.wait_for_deletion_completion(user_performing_action=admin_user)
|
||||
|
||||
# print("Connector 1 deleted")
|
||||
print("Connector 1 deleted")
|
||||
|
||||
# check that only connector 1 is deleted
|
||||
# TODO: check for the CC pair rather than the connector once the refactor is done
|
||||
# CCPairManager.verify(
|
||||
# cc_pair=cc_pair_1,
|
||||
# verify_deleted=True,
|
||||
# user_performing_action=admin_user,
|
||||
# )
|
||||
# CCPairManager.verify(
|
||||
# cc_pair=cc_pair_2,
|
||||
# user_performing_action=admin_user,
|
||||
# )
|
||||
CCPairManager.verify(
|
||||
cc_pair=cc_pair_1,
|
||||
verify_deleted=True,
|
||||
user_performing_action=admin_user,
|
||||
)
|
||||
CCPairManager.verify(
|
||||
cc_pair=cc_pair_2,
|
||||
user_performing_action=admin_user,
|
||||
)
|
||||
|
||||
# verify the document is not in any document sets
|
||||
# verify the document is only in user group 2
|
||||
# DocumentManager.verify(
|
||||
# vespa_client=vespa_client,
|
||||
# cc_pair=cc_pair_2,
|
||||
# doc_set_names=[],
|
||||
# group_names=[user_group_2.name],
|
||||
# doc_creating_user=admin_user,
|
||||
# verify_deleted=False,
|
||||
# )
|
||||
DocumentManager.verify(
|
||||
vespa_client=vespa_client,
|
||||
cc_pair=cc_pair_2,
|
||||
doc_set_names=[],
|
||||
group_names=[user_group_2.name],
|
||||
doc_creating_user=admin_user,
|
||||
verify_deleted=False,
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user