mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-11 21:39:31 +02:00
Fix deletion in the overlapping connector case
This commit is contained in:
parent
5800c7158e
commit
6147a58211
@ -17,6 +17,7 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
|
||||
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
|
||||
from danswer.configs.app_configs import NUM_INDEXING_WORKERS
|
||||
from danswer.configs.app_configs import NUM_SECONDARY_INDEXING_WORKERS
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import POSTGRES_INDEXER_APP_NAME
|
||||
from danswer.db.connector import fetch_connectors
|
||||
from danswer.db.connector_credential_pair import fetch_connector_credential_pairs
|
||||
@ -67,6 +68,10 @@ def _should_create_new_indexing(
|
||||
) -> bool:
|
||||
connector = cc_pair.connector
|
||||
|
||||
# don't kick off indexing for `NOT_APPLICABLE`
|
||||
if connector.source == DocumentSource.NOT_APPLICABLE:
|
||||
return False
|
||||
|
||||
# User can still manually create single indexing attempts via the UI for the
|
||||
# currently in use index
|
||||
if DISABLE_INDEX_UPDATE_ON_SWAP:
|
||||
|
@ -525,19 +525,18 @@ def fetch_document_sets_for_documents(
|
||||
) -> Sequence[tuple[str, list[str]]]:
|
||||
"""Gives back a list of (document_id, list[document_set_names]) tuples"""
|
||||
stmt = (
|
||||
select(Document.id, func.array_agg(DocumentSetDBModel.name))
|
||||
.join(
|
||||
DocumentSet__ConnectorCredentialPair,
|
||||
DocumentSetDBModel.id
|
||||
== DocumentSet__ConnectorCredentialPair.document_set_id,
|
||||
select(
|
||||
Document.id,
|
||||
func.coalesce(
|
||||
func.array_remove(func.array_agg(DocumentSetDBModel.name), None), []
|
||||
).label("document_set_names"),
|
||||
)
|
||||
.join(
|
||||
ConnectorCredentialPair,
|
||||
ConnectorCredentialPair.id
|
||||
== DocumentSet__ConnectorCredentialPair.connector_credential_pair_id,
|
||||
)
|
||||
.join(
|
||||
.outerjoin(
|
||||
DocumentByConnectorCredentialPair,
|
||||
Document.id == DocumentByConnectorCredentialPair.id,
|
||||
)
|
||||
.outerjoin(
|
||||
ConnectorCredentialPair,
|
||||
and_(
|
||||
DocumentByConnectorCredentialPair.connector_id
|
||||
== ConnectorCredentialPair.connector_id,
|
||||
@ -545,16 +544,33 @@ def fetch_document_sets_for_documents(
|
||||
== ConnectorCredentialPair.credential_id,
|
||||
),
|
||||
)
|
||||
.join(
|
||||
Document,
|
||||
Document.id == DocumentByConnectorCredentialPair.id,
|
||||
.outerjoin(
|
||||
DocumentSet__ConnectorCredentialPair,
|
||||
ConnectorCredentialPair.id
|
||||
== DocumentSet__ConnectorCredentialPair.connector_credential_pair_id,
|
||||
)
|
||||
.outerjoin(
|
||||
DocumentSetDBModel,
|
||||
DocumentSetDBModel.id
|
||||
== DocumentSet__ConnectorCredentialPair.document_set_id,
|
||||
)
|
||||
.where(Document.id.in_(document_ids))
|
||||
# don't include CC pairs that are being deleted
|
||||
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
|
||||
# as we can assume their document sets are no longer relevant
|
||||
.where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING)
|
||||
.where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712
|
||||
.where(
|
||||
or_(
|
||||
ConnectorCredentialPair.status.is_(None),
|
||||
ConnectorCredentialPair.status
|
||||
!= ConnectorCredentialPairStatus.DELETING,
|
||||
)
|
||||
)
|
||||
.where(
|
||||
or_(
|
||||
DocumentSet__ConnectorCredentialPair.is_current.is_(None),
|
||||
DocumentSet__ConnectorCredentialPair.is_current == True, # noqa: E712
|
||||
)
|
||||
)
|
||||
.group_by(Document.id)
|
||||
)
|
||||
return db_session.execute(stmt).all() # type: ignore
|
||||
|
@ -166,10 +166,14 @@ def create_deletion_attempt_for_connector_id(
|
||||
get_editable=True,
|
||||
)
|
||||
if cc_pair is None:
|
||||
error = (
|
||||
f"Connector with ID '{connector_id}' and credential ID "
|
||||
f"'{credential_id}' does not exist. Has it already been deleted?"
|
||||
)
|
||||
logger.error(error)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Connector with ID '{connector_id}' and credential ID "
|
||||
f"'{credential_id}' does not exist. Has it already been deleted?",
|
||||
detail=error,
|
||||
)
|
||||
|
||||
# Cancel any scheduled indexing attempts
|
||||
@ -182,6 +186,7 @@ def create_deletion_attempt_for_connector_id(
|
||||
connector_credential_pair=cc_pair, db_session=db_session
|
||||
)
|
||||
if deletion_attempt_disallowed_reason:
|
||||
logger.error(deletion_attempt_disallowed_reason)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=deletion_attempt_disallowed_reason,
|
||||
|
@ -18,7 +18,8 @@ class ConnectorCreationDetails(BaseModel):
|
||||
class ConnectorClient:
|
||||
@staticmethod
|
||||
def create_connector(
|
||||
name_prefix: str = "test_connector", credential_id: int | None = None
|
||||
name_prefix: str = "test_connector",
|
||||
credential_id: int | None = None,
|
||||
) -> ConnectorCreationDetails:
|
||||
unique_id = uuid.uuid4()
|
||||
|
||||
|
@ -21,16 +21,23 @@ class SeedDocumentResponse(BaseModel):
|
||||
class TestDocumentClient:
|
||||
@staticmethod
|
||||
def seed_documents(
|
||||
num_docs: int = 5, cc_pair_id: int | None = None
|
||||
num_docs: int = 5,
|
||||
cc_pair_id: int | None = None,
|
||||
document_ids: list[str] | None = None,
|
||||
) -> SeedDocumentResponse:
|
||||
if not cc_pair_id:
|
||||
connector_details = ConnectorClient.create_connector()
|
||||
cc_pair_id = connector_details.cc_pair_id
|
||||
|
||||
# Use provided document_ids if available, otherwise generate random UUIDs
|
||||
if document_ids is None:
|
||||
document_ids = [f"test-doc-{uuid.uuid4()}" for _ in range(num_docs)]
|
||||
else:
|
||||
num_docs = len(document_ids)
|
||||
|
||||
# Create and ingest some documents
|
||||
documents: list[dict] = []
|
||||
for _ in range(num_docs):
|
||||
document_id = f"test-doc-{uuid.uuid4()}"
|
||||
for document_id in document_ids:
|
||||
document = {
|
||||
"document": {
|
||||
"id": document_id,
|
||||
|
@ -1,4 +1,5 @@
|
||||
import time
|
||||
from uuid import uuid4
|
||||
|
||||
from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.server.features.document_set.models import DocumentSetCreationRequest
|
||||
@ -188,3 +189,123 @@ def test_connector_deletion(reset: None, vespa_client: TestVespaClient) -> None:
|
||||
|
||||
assert user_group_1_found
|
||||
assert user_group_2_found
|
||||
|
||||
|
||||
def test_connector_deletion_for_overlapping_connectors(
|
||||
reset: None, vespa_client: TestVespaClient
|
||||
) -> None:
|
||||
"""Checks to make sure that connectors with overlapping documents work properly. Specifically, that the overlapping
|
||||
document (1) still exists and (2) has the right document set / group post-deletion of one of the connectors.
|
||||
"""
|
||||
|
||||
# create connectors
|
||||
c1_details = ConnectorClient.create_connector(name_prefix="tc1")
|
||||
c2_details = ConnectorClient.create_connector(name_prefix="tc2")
|
||||
|
||||
doc_ids = [str(uuid4())]
|
||||
c1_seed_res = TestDocumentClient.seed_documents(
|
||||
cc_pair_id=c1_details.cc_pair_id, document_ids=doc_ids
|
||||
)
|
||||
TestDocumentClient.seed_documents(
|
||||
cc_pair_id=c2_details.cc_pair_id, document_ids=doc_ids
|
||||
)
|
||||
|
||||
# create document set
|
||||
doc_set_id = DocumentSetClient.create_document_set(
|
||||
DocumentSetCreationRequest(
|
||||
name="Test Document Set",
|
||||
description="Test",
|
||||
cc_pair_ids=[c1_details.cc_pair_id],
|
||||
is_public=True,
|
||||
users=[],
|
||||
groups=[],
|
||||
)
|
||||
)
|
||||
|
||||
# wait for document sets to be synced
|
||||
start = time.time()
|
||||
while True:
|
||||
doc_sets = DocumentSetClient.fetch_document_sets()
|
||||
doc_set_1 = next(
|
||||
(doc_set for doc_set in doc_sets if doc_set.id == doc_set_id), None
|
||||
)
|
||||
|
||||
if not doc_set_1:
|
||||
raise RuntimeError("Document set not found")
|
||||
|
||||
if doc_set_1.is_up_to_date:
|
||||
break
|
||||
|
||||
if time.time() - start > MAX_DELAY:
|
||||
raise TimeoutError("Document sets were not synced within the max delay")
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
print("Document sets created and synced")
|
||||
|
||||
# create a user group and attach it to connector 1
|
||||
user_group_id = UserGroupClient.create_user_group(
|
||||
UserGroupCreate(
|
||||
name="Test User Group", user_ids=[], cc_pair_ids=[c1_details.cc_pair_id]
|
||||
)
|
||||
)
|
||||
|
||||
# wait for user group to be available
|
||||
start = time.time()
|
||||
while True:
|
||||
user_groups = {ug.id: ug for ug in UserGroupClient.fetch_user_groups()}
|
||||
|
||||
if user_group_id not in user_groups:
|
||||
raise RuntimeError("User group not found")
|
||||
|
||||
if user_groups[user_group_id].is_up_to_date:
|
||||
break
|
||||
|
||||
if time.time() - start > MAX_DELAY:
|
||||
raise TimeoutError("User group was not synced within the max delay")
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
print("User group created and synced")
|
||||
|
||||
# delete connector 1
|
||||
ConnectorClient.update_connector_status(
|
||||
cc_pair_id=c1_details.cc_pair_id, status=ConnectorCredentialPairStatus.PAUSED
|
||||
)
|
||||
ConnectorClient.delete_connector(
|
||||
connector_id=c1_details.connector_id, credential_id=c1_details.credential_id
|
||||
)
|
||||
|
||||
# wait for deletion to finish
|
||||
start = time.time()
|
||||
while True:
|
||||
connectors = ConnectorClient.get_connectors()
|
||||
if c1_details.connector_id not in [c["id"] for c in connectors]:
|
||||
break
|
||||
|
||||
if time.time() - start > MAX_DELAY:
|
||||
raise TimeoutError("Connector 1 was not deleted within the max delay")
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
print("Connector 1 deleted")
|
||||
|
||||
# check that only connector 2 is deleted
|
||||
# TODO: check for the CC pair rather than the connector once the refactor is done
|
||||
all_connectors = ConnectorClient.get_connectors()
|
||||
assert len(all_connectors) == 1
|
||||
assert all_connectors[0]["id"] == c2_details.connector_id
|
||||
|
||||
# validate vespa documents
|
||||
c1_vespa_docs = vespa_client.get_documents_by_id(
|
||||
[doc.id for doc in c1_seed_res.documents]
|
||||
)["documents"]
|
||||
|
||||
assert len(c1_vespa_docs) == 1
|
||||
|
||||
for doc in c1_vespa_docs:
|
||||
assert doc["fields"]["access_control_list"] == {
|
||||
"PUBLIC": 1,
|
||||
}
|
||||
# no document_sets in the "fields" means that it's unset e.g. no document sets
|
||||
assert "document_sets" not in doc["fields"]
|
||||
|
Loading…
x
Reference in New Issue
Block a user