From c7e5b11c63482e199c7179634b487afdb1dfed16 Mon Sep 17 00:00:00 2001 From: Nathan Schwerdfeger <35745818+nschwerd@users.noreply.github.com> Date: Sun, 11 Aug 2024 20:33:07 -0700 Subject: [PATCH] EE Connector Deletion Bugfix + Refactor (#2042) --------- Co-authored-by: Weves --- ...1_moved_status_to_connector_credential_.py | 80 ++++++ backend/danswer/access/access.py | 6 +- .../danswer/background/celery/celery_app.py | 27 ++ .../danswer/background/celery/celery_utils.py | 42 ++- .../danswer/background/connector_deletion.py | 104 ++++---- .../background/indexing/run_indexing.py | 20 +- backend/danswer/background/update.py | 13 +- backend/danswer/db/connector.py | 35 +-- .../danswer/db/connector_credential_pair.py | 69 ++++- backend/danswer/db/deletion_attempt.py | 8 +- backend/danswer/db/document.py | 35 +-- backend/danswer/db/document_set.py | 50 ++-- backend/danswer/db/enums.py | 6 + backend/danswer/db/index_attempt.py | 9 +- backend/danswer/db/models.py | 5 +- backend/danswer/server/documents/cc_pair.py | 45 +++- backend/danswer/server/documents/connector.py | 15 +- backend/danswer/server/documents/models.py | 6 +- .../danswer/server/manage/administrative.py | 17 +- .../danswer/utils/variable_functionality.py | 4 + backend/ee/danswer/access/access.py | 4 - .../danswer/db/connector_credential_pair.py | 23 ++ backend/ee/danswer/db/user_group.py | 40 ++- .../scripts/force_delete_connector_by_id.py | 14 +- backend/tests/integration/common/chat.py | 66 +++++ .../tests/integration/common/connectors.py | 114 ++++++++ backend/tests/integration/common/constants.py | 1 + .../tests/integration/common/document_sets.py | 30 +++ backend/tests/integration/common/reset.py | 4 + .../integration/common/seed_documents.py | 36 +-- .../tests/integration/common/user_groups.py | 24 ++ .../integration/connector/test_deletion.py | 190 +++++++++++++ .../regression/answer_quality/api_utils.py | 1 - .../connector/[ccPairId]/DeletionButton.tsx | 8 +- .../[ccPairId]/ModifyStatusButtonCluster.tsx | 20 +- .../app/admin/connector/[ccPairId]/page.tsx | 15 +- .../app/admin/connector/[ccPairId]/types.ts | 7 + .../[connector]/AddConnectorPage.tsx | 1 - .../[connector]/pages/utils/files.ts | 1 - .../[connector]/pages/utils/google_site.ts | 1 - .../status/CCPairIndexingStatusTable.tsx | 43 +-- .../admin/connectors/ConnectorForm.tsx | 2 - .../connectors/table/ConnectorsTable.tsx | 251 ------------------ .../admin/connectors/table/DeleteColumn.tsx | 60 ----- .../table/SingleUseConnectorsTable.tsx | 174 ------------ web/src/lib/ccPair.ts | 49 ++++ web/src/lib/connector.ts | 20 -- web/src/lib/connectors/connectors.ts | 1 - web/src/lib/types.ts | 2 + 49 files changed, 998 insertions(+), 800 deletions(-) create mode 100644 backend/alembic/versions/4a951134c801_moved_status_to_connector_credential_.py create mode 100644 backend/tests/integration/common/chat.py create mode 100644 backend/tests/integration/common/connectors.py create mode 100644 backend/tests/integration/common/document_sets.py create mode 100644 backend/tests/integration/common/user_groups.py create mode 100644 backend/tests/integration/connector/test_deletion.py delete mode 100644 web/src/components/admin/connectors/table/ConnectorsTable.tsx delete mode 100644 web/src/components/admin/connectors/table/DeleteColumn.tsx delete mode 100644 web/src/components/admin/connectors/table/SingleUseConnectorsTable.tsx create mode 100644 web/src/lib/ccPair.ts diff --git a/backend/alembic/versions/4a951134c801_moved_status_to_connector_credential_.py b/backend/alembic/versions/4a951134c801_moved_status_to_connector_credential_.py new file mode 100644 index 000000000..c79cdb55c --- /dev/null +++ b/backend/alembic/versions/4a951134c801_moved_status_to_connector_credential_.py @@ -0,0 +1,80 @@ +"""Moved status to connector credential pair + +Revision ID: 4a951134c801 +Revises: 7477a5f5d728 +Create Date: 2024-08-10 19:20:34.527559 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "4a951134c801" +down_revision = "7477a5f5d728" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "connector_credential_pair", + sa.Column( + "status", + sa.Enum( + "ACTIVE", + "PAUSED", + "DELETING", + name="connectorcredentialpairstatus", + native_enum=False, + ), + nullable=True, + ), + ) + + # Update status of connector_credential_pair based on connector's disabled status + op.execute( + """ + UPDATE connector_credential_pair + SET status = CASE + WHEN ( + SELECT disabled + FROM connector + WHERE connector.id = connector_credential_pair.connector_id + ) = FALSE THEN 'ACTIVE' + ELSE 'PAUSED' + END + """ + ) + + # Make the status column not nullable after setting values + op.alter_column("connector_credential_pair", "status", nullable=False) + + op.drop_column("connector", "disabled") + + +def downgrade() -> None: + op.add_column( + "connector", + sa.Column("disabled", sa.BOOLEAN(), autoincrement=False, nullable=True), + ) + + # Update disabled status of connector based on connector_credential_pair's status + op.execute( + """ + UPDATE connector + SET disabled = CASE + WHEN EXISTS ( + SELECT 1 + FROM connector_credential_pair + WHERE connector_credential_pair.connector_id = connector.id + AND connector_credential_pair.status = 'ACTIVE' + ) THEN FALSE + ELSE TRUE + END + """ + ) + + # Make the disabled column not nullable after setting values + op.alter_column("connector", "disabled", nullable=False) + + op.drop_column("connector_credential_pair", "status") diff --git a/backend/danswer/access/access.py b/backend/danswer/access/access.py index 51f5a300c..5501980ab 100644 --- a/backend/danswer/access/access.py +++ b/backend/danswer/access/access.py @@ -5,19 +5,16 @@ from danswer.access.utils import prefix_user from danswer.configs.constants import PUBLIC_DOC_PAT from danswer.db.document import get_acccess_info_for_documents from danswer.db.models import User -from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.utils.variable_functionality import fetch_versioned_implementation def _get_access_for_documents( document_ids: list[str], db_session: Session, - cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None, ) -> dict[str, DocumentAccess]: document_access_info = get_acccess_info_for_documents( db_session=db_session, document_ids=document_ids, - cc_pair_to_delete=cc_pair_to_delete, ) return { document_id: DocumentAccess.build(user_ids, [], is_public) @@ -28,14 +25,13 @@ def _get_access_for_documents( def get_access_for_documents( document_ids: list[str], db_session: Session, - cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None, ) -> dict[str, DocumentAccess]: """Fetches all access information for the given documents.""" versioned_get_access_for_documents_fn = fetch_versioned_implementation( "danswer.access.access", "_get_access_for_documents" ) return versioned_get_access_for_documents_fn( - document_ids, db_session, cc_pair_to_delete + document_ids, db_session ) # type: ignore diff --git a/backend/danswer/background/celery/celery_app.py b/backend/danswer/background/celery/celery_app.py index d7bce6010..e6507a8ba 100644 --- a/backend/danswer/background/celery/celery_app.py +++ b/backend/danswer/background/celery/celery_app.py @@ -5,6 +5,7 @@ from celery import Celery # type: ignore from sqlalchemy.orm import Session from danswer.background.celery.celery_utils import extract_ids_from_runnable_connector +from danswer.background.celery.celery_utils import should_kick_off_deletion_of_cc_pair from danswer.background.celery.celery_utils import should_prune_cc_pair from danswer.background.celery.celery_utils import should_sync_doc_set from danswer.background.connector_deletion import delete_connector_credential_pair @@ -270,6 +271,26 @@ def check_for_document_sets_sync_task() -> None: ) +@celery_app.task( + name="check_for_cc_pair_deletion_task", + soft_time_limit=JOB_TIMEOUT, +) +def check_for_cc_pair_deletion_task() -> None: + """Runs periodically to check if any deletion tasks should be run""" + with Session(get_sqlalchemy_engine()) as db_session: + # check if any document sets are not synced + cc_pairs = get_connector_credential_pairs(db_session) + for cc_pair in cc_pairs: + if should_kick_off_deletion_of_cc_pair(cc_pair, db_session): + logger.info(f"Deleting the {cc_pair.name} connector credential pair") + cleanup_connector_credential_pair_task.apply_async( + kwargs=dict( + connector_id=cc_pair.connector.id, + credential_id=cc_pair.credential.id, + ), + ) + + @celery_app.task( name="check_for_prune_task", soft_time_limit=JOB_TIMEOUT, @@ -305,6 +326,12 @@ celery_app.conf.beat_schedule = { "task": "check_for_document_sets_sync_task", "schedule": timedelta(seconds=5), }, + "check-for-cc-pair-deletion": { + "task": "check_for_cc_pair_deletion_task", + # don't need to check too often, since we kick off a deletion initially + # during the API call that actually marks the CC pair for deletion + "schedule": timedelta(minutes=1), + }, } celery_app.conf.beat_schedule.update( { diff --git a/backend/danswer/background/celery/celery_utils.py b/backend/danswer/background/celery/celery_utils.py index eb9943b41..772862724 100644 --- a/backend/danswer/background/celery/celery_utils.py +++ b/backend/danswer/background/celery/celery_utils.py @@ -16,10 +16,14 @@ from danswer.connectors.interfaces import IdConnector from danswer.connectors.interfaces import LoadConnector from danswer.connectors.interfaces import PollConnector from danswer.connectors.models import Document +from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.engine import get_db_current_time +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import Connector +from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Credential from danswer.db.models import DocumentSet +from danswer.db.models import TaskQueueState from danswer.db.tasks import check_task_is_live_and_not_timed_out from danswer.db.tasks import get_latest_task from danswer.db.tasks import get_latest_task_by_type @@ -31,22 +35,52 @@ logger = setup_logger() def get_deletion_status( connector_id: int, credential_id: int, db_session: Session -) -> DeletionAttemptSnapshot | None: +) -> TaskQueueState | None: cleanup_task_name = name_cc_cleanup_task( connector_id=connector_id, credential_id=credential_id ) - task_state = get_latest_task(task_name=cleanup_task_name, db_session=db_session) + return get_latest_task(task_name=cleanup_task_name, db_session=db_session) - if not task_state: + +def get_deletion_attempt_snapshot( + connector_id: int, credential_id: int, db_session: Session +) -> DeletionAttemptSnapshot | None: + deletion_task = get_deletion_status(connector_id, credential_id, db_session) + if not deletion_task: return None return DeletionAttemptSnapshot( connector_id=connector_id, credential_id=credential_id, - status=task_state.status, + status=deletion_task.status, ) +def should_kick_off_deletion_of_cc_pair( + cc_pair: ConnectorCredentialPair, db_session: Session +) -> bool: + if cc_pair.status != ConnectorCredentialPairStatus.DELETING: + return False + + if check_deletion_attempt_is_allowed(cc_pair, db_session): + return False + + deletion_task = get_deletion_status( + connector_id=cc_pair.connector_id, + credential_id=cc_pair.credential_id, + db_session=db_session, + ) + if deletion_task and check_task_is_live_and_not_timed_out( + deletion_task, + db_session, + # 1 hour timeout + timeout=60 * 60, + ): + return False + + return True + + def should_sync_doc_set(document_set: DocumentSet, db_session: Session) -> bool: if document_set.is_up_to_date: return False diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 28c58f02d..9c653a87f 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -10,8 +10,6 @@ are multiple connector / credential pairs that have indexed it connector / credential pair from the access list (6) delete all relevant entries from postgres """ -import time - from sqlalchemy.orm import Session from danswer.access.access import get_access_for_documents @@ -24,10 +22,8 @@ from danswer.db.document import delete_documents_complete__no_commit from danswer.db.document import get_document_connector_cnts from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.document import prepare_to_modify_documents -from danswer.db.document_set import get_document_sets_by_ids -from danswer.db.document_set import ( - mark_cc_pair__document_set_relationships_to_be_deleted__no_commit, -) +from danswer.db.document_set import delete_document_set_cc_pair_relationship__no_commit +from danswer.db.document_set import fetch_document_sets_for_documents from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import delete_index_attempts from danswer.db.models import ConnectorCredentialPair @@ -35,6 +31,10 @@ from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.utils.logger import setup_logger +from danswer.utils.variable_functionality import ( + fetch_versioned_implementation_with_fallback, +) +from danswer.utils.variable_functionality import noop_fallback logger = setup_logger() @@ -78,25 +78,37 @@ def delete_connector_credential_pair_batch( document_ids_to_update = [ document_id for document_id, cnt in document_connector_cnts if cnt > 1 ] + + # maps document id to list of document set names + new_doc_sets_for_documents: dict[str, set[str]] = { + document_id_and_document_set_names_tuple[0]: set( + document_id_and_document_set_names_tuple[1] + ) + for document_id_and_document_set_names_tuple in fetch_document_sets_for_documents( + db_session=db_session, + document_ids=document_ids_to_update, + ) + } + + # determine future ACLs for documents in batch access_for_documents = get_access_for_documents( document_ids=document_ids_to_update, db_session=db_session, - cc_pair_to_delete=ConnectorCredentialPairIdentifier( - connector_id=connector_id, - credential_id=credential_id, - ), ) + + # update Vespa + logger.debug(f"Updating documents: {document_ids_to_update}") update_requests = [ UpdateRequest( document_ids=[document_id], access=access, + document_sets=new_doc_sets_for_documents[document_id], ) for document_id, access in access_for_documents.items() ] - logger.debug(f"Updating documents: {document_ids_to_update}") - document_index.update(update_requests=update_requests) + # clean up Postgres delete_document_by_connector_credential_pair__no_commit( db_session=db_session, document_ids=document_ids_to_update, @@ -108,48 +120,6 @@ def delete_connector_credential_pair_batch( db_session.commit() -def cleanup_synced_entities( - cc_pair: ConnectorCredentialPair, db_session: Session -) -> None: - """Updates the document sets associated with the connector / credential pair, - then relies on the document set sync script to kick off Celery jobs which will - sync these updates to Vespa. - - Waits until the document sets are synced before returning.""" - logger.info(f"Cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'") - document_sets_ids_to_sync = list( - mark_cc_pair__document_set_relationships_to_be_deleted__no_commit( - cc_pair_id=cc_pair.id, - db_session=db_session, - ) - ) - db_session.commit() - - # wait till all document sets are synced before continuing - while True: - all_synced = True - document_sets = get_document_sets_by_ids( - db_session=db_session, document_set_ids=document_sets_ids_to_sync - ) - for document_set in document_sets: - if not document_set.is_up_to_date: - all_synced = False - - if all_synced: - break - - # wait for 30 seconds before checking again - db_session.commit() # end transaction - logger.info( - f"Document sets '{document_sets_ids_to_sync}' not synced yet, waiting 30s" - ) - time.sleep(30) - - logger.info( - f"Finished cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'" - ) - - def delete_connector_credential_pair( db_session: Session, document_index: DocumentIndex, @@ -177,17 +147,33 @@ def delete_connector_credential_pair( ) num_docs_deleted += len(documents) - # Clean up document sets / access information from Postgres - # and sync these updates to Vespa - # TODO: add user group cleanup with `fetch_versioned_implementation` - cleanup_synced_entities(cc_pair, db_session) - # clean up the rest of the related Postgres entities + # index attempts delete_index_attempts( db_session=db_session, connector_id=connector_id, credential_id=credential_id, ) + + # document sets + delete_document_set_cc_pair_relationship__no_commit( + db_session=db_session, + connector_id=connector_id, + credential_id=credential_id, + ) + + # user groups + cleanup_user_groups = fetch_versioned_implementation_with_fallback( + "danswer.db.user_group", + "delete_user_group_cc_pair_relationship__no_commit", + noop_fallback, + ) + cleanup_user_groups( + cc_pair_id=cc_pair.id, + db_session=db_session, + ) + + # finally, delete the cc-pair delete_connector_credential_pair__no_commit( db_session=db_session, connector_id=connector_id, diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 8f0e25bf0..b7359f19c 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -14,10 +14,10 @@ from danswer.connectors.interfaces import LoadConnector from danswer.connectors.interfaces import PollConnector from danswer.connectors.models import IndexAttemptMetadata from danswer.connectors.models import InputType -from danswer.db.connector import disable_connector from danswer.db.connector_credential_pair import get_last_successful_attempt_time from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import mark_attempt_failed from danswer.db.index_attempt import mark_attempt_in_progress @@ -61,7 +61,14 @@ def _get_document_generator( ) except Exception as e: logger.exception(f"Unable to instantiate connector due to {e}") - disable_connector(attempt.connector_credential_pair.connector.id, db_session) + # since we failed to even instantiate the connector, we pause the CCPair since + # it will never succeed + update_connector_credential_pair( + db_session=db_session, + connector_id=attempt.connector_credential_pair.connector.id, + credential_id=attempt.connector_credential_pair.credential.id, + status=ConnectorCredentialPairStatus.PAUSED, + ) raise e if task == InputType.LOAD_STATE: @@ -130,6 +137,7 @@ def _run_indexing( db_session=db_session, ) + db_cc_pair = index_attempt.connector_credential_pair db_connector = index_attempt.connector_credential_pair.connector db_credential = index_attempt.connector_credential_pair.credential @@ -181,7 +189,7 @@ def _run_indexing( # contents still need to be initially pulled. db_session.refresh(db_connector) if ( - db_connector.disabled + db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED and db_embedding_model.status != IndexModelStatus.FUTURE ): # let the `except` block handle this @@ -246,7 +254,7 @@ def _run_indexing( # to give better clarity in the UI, as the next run will never happen. if ( ind == 0 - or db_connector.disabled + or db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED or index_attempt.status != IndexingStatus.IN_PROGRESS ): mark_attempt_failed( @@ -258,8 +266,8 @@ def _run_indexing( if is_primary: update_connector_credential_pair( db_session=db_session, - connector_id=index_attempt.connector_credential_pair.connector.id, - credential_id=index_attempt.connector_credential_pair.credential.id, + connector_id=db_connector.id, + credential_id=db_credential.id, net_docs=net_doc_change, ) raise e diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index f192c196b..437a48bfc 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -25,13 +25,14 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.engine import get_db_current_time from danswer.db.engine import get_sqlalchemy_engine from danswer.db.engine import init_sqlalchemy_engine +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.index_attempt import create_index_attempt from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import get_inprogress_index_attempts from danswer.db.index_attempt import get_last_attempt_for_cc_pair from danswer.db.index_attempt import get_not_started_index_attempts from danswer.db.index_attempt import mark_attempt_failed -from danswer.db.models import Connector +from danswer.db.models import ConnectorCredentialPair from danswer.db.models import EmbeddingModel from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus @@ -57,12 +58,14 @@ _UNEXPECTED_STATE_FAILURE_REASON = ( def _should_create_new_indexing( - connector: Connector, + cc_pair: ConnectorCredentialPair, last_index: IndexAttempt | None, model: EmbeddingModel, secondary_index_building: bool, db_session: Session, ) -> bool: + connector = cc_pair.connector + # User can still manually create single indexing attempts via the UI for the # currently in use index if DISABLE_INDEX_UPDATE_ON_SWAP: @@ -89,10 +92,10 @@ def _should_create_new_indexing( return False return True - # If the connector is disabled or is the ingestion API, don't index + # If the connector is paused or is the ingestion API, don't index # NOTE: during an embedding model switch over, the following logic # is bypassed by the above check for a future model - if connector.disabled or connector.id == 0: + if cc_pair.status == ConnectorCredentialPairStatus.PAUSED or connector.id == 0: return False if not last_index: @@ -187,7 +190,7 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None: cc_pair.id, model.id, db_session ) if not _should_create_new_indexing( - connector=cc_pair.connector, + cc_pair=cc_pair, last_index=last_attempt, model=model, secondary_index_building=len(embedding_models) > 1, diff --git a/backend/danswer/db/connector.py b/backend/danswer/db/connector.py index 4dd1f421f..3c06cd69e 100644 --- a/backend/danswer/db/connector.py +++ b/backend/danswer/db/connector.py @@ -1,6 +1,5 @@ from typing import cast -from fastapi import HTTPException from sqlalchemy import and_ from sqlalchemy import exists from sqlalchemy import func @@ -34,15 +33,12 @@ def fetch_connectors( db_session: Session, sources: list[DocumentSource] | None = None, input_types: list[InputType] | None = None, - disabled_status: bool | None = None, ) -> list[Connector]: stmt = select(Connector) if sources is not None: stmt = stmt.where(Connector.source.in_(sources)) if input_types is not None: stmt = stmt.where(Connector.input_type.in_(input_types)) - if disabled_status is not None: - stmt = stmt.where(Connector.disabled == disabled_status) results = db_session.scalars(stmt) return list(results.all()) @@ -97,7 +93,6 @@ def create_connector( refresh_freq=connector_data.refresh_freq, indexing_start=connector_data.indexing_start, prune_freq=connector_data.prune_freq, - disabled=connector_data.disabled, ) db_session.add(connector) db_session.commit() @@ -131,33 +126,18 @@ def update_connector( if connector_data.prune_freq is not None else DEFAULT_PRUNING_FREQ ) - connector.disabled = connector_data.disabled db_session.commit() return connector -def disable_connector( - connector_id: int, - db_session: Session, -) -> StatusResponse[int]: - connector = fetch_connector_by_id(connector_id, db_session) - if connector is None: - raise HTTPException(status_code=404, detail="Connector does not exist") - - connector.disabled = True - db_session.commit() - return StatusResponse( - success=True, message="Connector deleted successfully", data=connector_id - ) - - def delete_connector( connector_id: int, db_session: Session, ) -> StatusResponse[int]: - """Currently unused due to foreign key restriction from IndexAttempt - Use disable_connector instead""" + """Only used in special cases (e.g. a connector is in a bad state and we need to delete it). + Be VERY careful using this, as it could lead to a bad state if not used correctly. + """ connector = fetch_connector_by_id(connector_id, db_session) if connector is None: return StatusResponse( @@ -188,11 +168,9 @@ def fetch_latest_index_attempt_by_connector( latest_index_attempts: list[IndexAttempt] = [] if source: - connectors = fetch_connectors( - db_session, sources=[source], disabled_status=False - ) + connectors = fetch_connectors(db_session, sources=[source]) else: - connectors = fetch_connectors(db_session, disabled_status=False) + connectors = fetch_connectors(db_session) if not connectors: return [] @@ -261,7 +239,6 @@ def create_initial_default_connector(db_session: Session) -> None: default_connector.source != DocumentSource.INGESTION_API or default_connector.input_type != InputType.LOAD_STATE or default_connector.refresh_freq is not None - or default_connector.disabled or default_connector.name != "Ingestion API" or default_connector.connector_specific_config != {} or default_connector.prune_freq is not None @@ -273,7 +250,6 @@ def create_initial_default_connector(db_session: Session) -> None: default_connector.source = DocumentSource.INGESTION_API default_connector.input_type = InputType.LOAD_STATE default_connector.refresh_freq = None - default_connector.disabled = False default_connector.name = "Ingestion API" default_connector.connector_specific_config = {} default_connector.prune_freq = None @@ -289,7 +265,6 @@ def create_initial_default_connector(db_session: Session) -> None: connector_specific_config={}, refresh_freq=None, prune_freq=None, - disabled=False, ) db_session.add(connector) db_session.commit() diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index fd68f05d9..ed1742b2f 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import Session from danswer.configs.constants import DocumentSource from danswer.db.connector import fetch_connector_by_id from danswer.db.credentials import fetch_credential_by_id +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import ConnectorCredentialPair from danswer.db.models import EmbeddingModel from danswer.db.models import IndexAttempt @@ -26,7 +27,9 @@ def get_connector_credential_pairs( ) -> list[ConnectorCredentialPair]: stmt = select(ConnectorCredentialPair) if not include_disabled: - stmt = stmt.where(ConnectorCredentialPair.connector.disabled == False) # noqa + stmt = stmt.where( + ConnectorCredentialPair.status == ConnectorCredentialPairStatus.ACTIVE + ) # noqa results = db_session.scalars(stmt) return list(results.all()) @@ -109,10 +112,56 @@ def get_last_successful_attempt_time( return attempt.time_started.timestamp() +"""Updates""" + + +def _update_connector_credential_pair( + db_session: Session, + cc_pair: ConnectorCredentialPair, + status: ConnectorCredentialPairStatus | None = None, + net_docs: int | None = None, + run_dt: datetime | None = None, +) -> None: + # simply don't update last_successful_index_time if run_dt is not specified + # at worst, this would result in re-indexing documents that were already indexed + if run_dt is not None: + cc_pair.last_successful_index_time = run_dt + if net_docs is not None: + cc_pair.total_docs_indexed += net_docs + if status is not None: + cc_pair.status = status + db_session.commit() + + +def update_connector_credential_pair_from_id( + db_session: Session, + cc_pair_id: int, + status: ConnectorCredentialPairStatus | None = None, + net_docs: int | None = None, + run_dt: datetime | None = None, +) -> None: + cc_pair = get_connector_credential_pair_from_id(cc_pair_id, db_session) + if not cc_pair: + logger.warning( + f"Attempted to update pair for Connector Credential Pair '{cc_pair_id}'" + f" but it does not exist" + ) + return + + _update_connector_credential_pair( + db_session=db_session, + cc_pair=cc_pair, + status=status, + net_docs=net_docs, + run_dt=run_dt, + ) + + def update_connector_credential_pair( db_session: Session, connector_id: int, credential_id: int, + status: ConnectorCredentialPairStatus | None = None, net_docs: int | None = None, run_dt: datetime | None = None, ) -> None: @@ -123,13 +172,14 @@ def update_connector_credential_pair( f"and credential id {credential_id}" ) return - # simply don't update last_successful_index_time if run_dt is not specified - # at worst, this would result in re-indexing documents that were already indexed - if run_dt is not None: - cc_pair.last_successful_index_time = run_dt - if net_docs is not None: - cc_pair.total_docs_indexed += net_docs - db_session.commit() + + _update_connector_credential_pair( + db_session=db_session, + cc_pair=cc_pair, + status=status, + net_docs=net_docs, + run_dt=run_dt, + ) def delete_connector_credential_pair__no_commit( @@ -160,6 +210,8 @@ def associate_default_cc_pair(db_session: Session) -> None: connector_id=0, credential_id=0, name="DefaultCCPair", + status=ConnectorCredentialPairStatus.ACTIVE, + is_public=True, ) db_session.add(association) db_session.commit() @@ -204,6 +256,7 @@ def add_credential_to_connector( connector_id=connector_id, credential_id=credential_id, name=cc_pair_name, + status=ConnectorCredentialPairStatus.ACTIVE, is_public=is_public, ) db_session.add(association) diff --git a/backend/danswer/db/deletion_attempt.py b/backend/danswer/db/deletion_attempt.py index b66e6f585..2ce94cd82 100644 --- a/backend/danswer/db/deletion_attempt.py +++ b/backend/danswer/db/deletion_attempt.py @@ -1,6 +1,7 @@ from sqlalchemy.orm import Session from danswer.db.embedding_model import get_current_db_embedding_model +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.index_attempt import get_last_attempt from danswer.db.models import ConnectorCredentialPair from danswer.db.models import IndexingStatus @@ -13,7 +14,7 @@ def check_deletion_attempt_is_allowed( ) -> str | None: """ To be deletable: - (1) connector should be disabled + (1) connector should be paused (2) there should be no in-progress/planned index attempts Returns an error message if the deletion attempt is not allowed, otherwise None. @@ -23,7 +24,10 @@ def check_deletion_attempt_is_allowed( f"'{connector_credential_pair.credential_id}' is not deletable." ) - if not connector_credential_pair.connector.disabled: + if ( + connector_credential_pair.status != ConnectorCredentialPairStatus.PAUSED + and connector_credential_pair.status != ConnectorCredentialPairStatus.DELETING + ): return base_error_msg + " Connector must be paused." connector_id = connector_credential_pair.connector_id diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index 9cf63720a..2ea77d072 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -17,6 +17,7 @@ from sqlalchemy.exc import OperationalError from sqlalchemy.orm import Session from danswer.configs.constants import DEFAULT_BOOST +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.feedback import delete_document_feedback_for_documents__no_commit from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Credential @@ -110,36 +111,19 @@ def get_document_cnts_for_cc_pairs( def get_acccess_info_for_documents( db_session: Session, document_ids: list[str], - cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None, ) -> Sequence[tuple[str, list[UUID | None], bool]]: """Gets back all relevant access info for the given documents. This includes the user_ids for cc pairs that the document is associated with + whether any of the associated cc pairs are intending to make the document globally public. - - If `cc_pair_to_delete` is specified, gets the above access info as if that - pair had been deleted. This is needed since we want to delete from the Vespa - before deleting from Postgres to ensure that the state of Postgres never "loses" - documents that still exist in Vespa. """ - stmt = select( - DocumentByConnectorCredentialPair.id, - func.array_agg(Credential.user_id).label("user_ids"), - func.bool_or(ConnectorCredentialPair.is_public).label("public_doc"), - ).where(DocumentByConnectorCredentialPair.id.in_(document_ids)) - - # pretend that the specified cc pair doesn't exist - if cc_pair_to_delete: - stmt = stmt.where( - and_( - DocumentByConnectorCredentialPair.connector_id - != cc_pair_to_delete.connector_id, - DocumentByConnectorCredentialPair.credential_id - != cc_pair_to_delete.credential_id, - ) - ) - stmt = ( - stmt.join( + select( + DocumentByConnectorCredentialPair.id, + func.array_agg(Credential.user_id).label("user_ids"), + func.bool_or(ConnectorCredentialPair.is_public).label("public_doc"), + ) + .where(DocumentByConnectorCredentialPair.id.in_(document_ids)) + .join( Credential, DocumentByConnectorCredentialPair.credential_id == Credential.id, ) @@ -152,6 +136,9 @@ def get_acccess_info_for_documents( == ConnectorCredentialPair.credential_id, ), ) + # don't include CC pairs that are being deleted + # NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them + .where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING) .group_by(DocumentByConnectorCredentialPair.id) ) return db_session.execute(stmt).all() # type: ignore diff --git a/backend/danswer/db/document_set.py b/backend/danswer/db/document_set.py index 2cd563a60..3893ee202 100644 --- a/backend/danswer/db/document_set.py +++ b/backend/danswer/db/document_set.py @@ -9,6 +9,7 @@ from sqlalchemy import or_ from sqlalchemy import select from sqlalchemy.orm import Session +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Document from danswer.db.models import DocumentByConnectorCredentialPair @@ -270,37 +271,20 @@ def mark_document_set_as_to_be_deleted( raise -def mark_cc_pair__document_set_relationships_to_be_deleted__no_commit( - cc_pair_id: int, db_session: Session -) -> set[int]: - """Marks all CC Pair -> Document Set relationships for the specified - `cc_pair_id` as not current and returns the list of all document set IDs - affected. - - NOTE: raises a `ValueError` if any of the document sets are currently syncing - to avoid getting into a bad state.""" - document_set__cc_pair_relationships = db_session.scalars( - select(DocumentSet__ConnectorCredentialPair).where( +def delete_document_set_cc_pair_relationship__no_commit( + connector_id: int, credential_id: int, db_session: Session +) -> None: + """Deletes all rows from DocumentSet__ConnectorCredentialPair where the + connector_credential_pair_id matches the given cc_pair_id.""" + delete_stmt = delete(DocumentSet__ConnectorCredentialPair).where( + and_( + ConnectorCredentialPair.connector_id == connector_id, + ConnectorCredentialPair.credential_id == credential_id, DocumentSet__ConnectorCredentialPair.connector_credential_pair_id - == cc_pair_id + == ConnectorCredentialPair.id, ) - ).all() - - document_set_ids_touched: set[int] = set() - for document_set__cc_pair_relationship in document_set__cc_pair_relationships: - document_set__cc_pair_relationship.is_current = False - - if not document_set__cc_pair_relationship.document_set.is_up_to_date: - raise ValueError( - "Cannot delete CC pair while it is attached to a document set " - "that is syncing. Please wait for the document set to finish " - "syncing, and then try again." - ) - - document_set__cc_pair_relationship.document_set.is_up_to_date = False - document_set_ids_touched.add(document_set__cc_pair_relationship.document_set_id) - - return document_set_ids_touched + ) + db_session.execute(delete_stmt) def fetch_document_sets( @@ -431,8 +415,10 @@ def fetch_documents_for_document_set_paginated( def fetch_document_sets_for_documents( - document_ids: list[str], db_session: Session + document_ids: list[str], + db_session: Session, ) -> Sequence[tuple[str, list[str]]]: + """Gives back a list of (document_id, list[document_set_names]) tuples""" stmt = ( select(Document.id, func.array_agg(DocumentSetDBModel.name)) .join( @@ -459,6 +445,10 @@ def fetch_document_sets_for_documents( Document.id == DocumentByConnectorCredentialPair.id, ) .where(Document.id.in_(document_ids)) + # don't include CC pairs that are being deleted + # NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them + # as we can assume their document sets are no longer relevant + .where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING) .where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712 .group_by(Document.id) ) diff --git a/backend/danswer/db/enums.py b/backend/danswer/db/enums.py index 2a02e078c..55f0b9bcb 100644 --- a/backend/danswer/db/enums.py +++ b/backend/danswer/db/enums.py @@ -33,3 +33,9 @@ class IndexModelStatus(str, PyEnum): class ChatSessionSharedStatus(str, PyEnum): PUBLIC = "public" PRIVATE = "private" + + +class ConnectorCredentialPairStatus(str, PyEnum): + ACTIVE = "ACTIVE" + PAUSED = "PAUSED" + DELETING = "DELETING" diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 59805059e..2763f397b 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -340,15 +340,14 @@ def expire_index_attempts( db_session.commit() -def cancel_indexing_attempts_for_connector( - connector_id: int, +def cancel_indexing_attempts_for_ccpair( + cc_pair_id: int, db_session: Session, include_secondary_index: bool = False, ) -> None: stmt = ( delete(IndexAttempt) - .where(IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id) - .where(ConnectorCredentialPair.connector_id == connector_id) + .where(IndexAttempt.connector_credential_pair_id == cc_pair_id) .where(IndexAttempt.status == IndexingStatus.NOT_STARTED) ) @@ -366,6 +365,8 @@ def cancel_indexing_attempts_for_connector( def cancel_indexing_attempts_past_model( db_session: Session, ) -> None: + """Stops all indexing attempts that are in progress or not started for + any embedding model that not present/future""" db_session.execute( update(IndexAttempt) .where( diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index 927610b5e..806c1a2ac 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -43,6 +43,7 @@ from danswer.configs.constants import SearchFeedbackType from danswer.configs.constants import TokenRateLimitScope from danswer.connectors.models import InputType from danswer.db.enums import ChatSessionSharedStatus +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.enums import IndexingStatus from danswer.db.enums import IndexModelStatus from danswer.db.enums import TaskStatus @@ -361,6 +362,9 @@ class ConnectorCredentialPair(Base): nullable=False, ) name: Mapped[str] = mapped_column(String, nullable=False) + status: Mapped[ConnectorCredentialPairStatus] = mapped_column( + Enum(ConnectorCredentialPairStatus, native_enum=False), nullable=False + ) connector_id: Mapped[int] = mapped_column( ForeignKey("connector.id"), primary_key=True ) @@ -490,7 +494,6 @@ class Connector(Base): time_updated: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) - disabled: Mapped[bool] = mapped_column(Boolean, default=False) credentials: Mapped[list["ConnectorCredentialPair"]] = relationship( "ConnectorCredentialPair", diff --git a/backend/danswer/server/documents/cc_pair.py b/backend/danswer/server/documents/cc_pair.py index 1cb2385a7..7f8dcef57 100644 --- a/backend/danswer/server/documents/cc_pair.py +++ b/backend/danswer/server/documents/cc_pair.py @@ -1,17 +1,24 @@ from fastapi import APIRouter from fastapi import Depends from fastapi import HTTPException +from pydantic import BaseModel from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from danswer.auth.users import current_admin_user from danswer.auth.users import current_user -from danswer.background.celery.celery_utils import get_deletion_status +from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot from danswer.db.connector_credential_pair import add_credential_to_connector from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.connector_credential_pair import remove_credential_from_connector +from danswer.db.connector_credential_pair import ( + update_connector_credential_pair_from_id, +) from danswer.db.document import get_document_cnts_for_cc_pairs from danswer.db.engine import get_session +from danswer.db.enums import ConnectorCredentialPairStatus +from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair +from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import get_index_attempts_for_connector from danswer.db.models import User from danswer.server.documents.models import CCPairFullInfo @@ -58,20 +65,42 @@ def get_cc_pair_full_info( document_count_info_list[0][-1] if document_count_info_list else 0 ) - latest_deletion_attempt = get_deletion_status( - connector_id=cc_pair.connector.id, - credential_id=cc_pair.credential.id, - db_session=db_session, - ) - return CCPairFullInfo.from_models( cc_pair_model=cc_pair, index_attempt_models=list(index_attempts), - latest_deletion_attempt=latest_deletion_attempt, + latest_deletion_attempt=get_deletion_attempt_snapshot( + connector_id=cc_pair.connector_id, + credential_id=cc_pair.credential_id, + db_session=db_session, + ), num_docs_indexed=documents_indexed, ) +class CCStatusUpdateRequest(BaseModel): + status: ConnectorCredentialPairStatus + + +@router.put("/admin/cc-pair/{cc_pair_id}/status") +def update_cc_pair_status( + cc_pair_id: int, + status_update_request: CCStatusUpdateRequest, + _: User | None = Depends(current_admin_user), + db_session: Session = Depends(get_session), +) -> None: + if status_update_request.status == ConnectorCredentialPairStatus.PAUSED: + cancel_indexing_attempts_for_ccpair(cc_pair_id, db_session) + + # Just for good measure + cancel_indexing_attempts_past_model(db_session) + + update_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair_id, + status=status_update_request.status, + ) + + @router.put("/admin/cc-pair/{cc_pair_id}/name") def update_cc_pair_name( cc_pair_id: int, diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index dd17ceab6..e78b2ebb8 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -13,7 +13,7 @@ from sqlalchemy.orm import Session from danswer.auth.users import current_admin_user from danswer.auth.users import current_user -from danswer.background.celery.celery_utils import get_deletion_status +from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES from danswer.configs.constants import DocumentSource from danswer.configs.constants import FileOrigin @@ -62,8 +62,6 @@ from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.document import get_document_cnts_for_cc_pairs from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.engine import get_session -from danswer.db.index_attempt import cancel_indexing_attempts_for_connector -from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import create_index_attempt from danswer.db.index_attempt import get_index_attempts_for_cc_pair from danswer.db.index_attempt import get_latest_finished_index_attempt_for_cc_pair @@ -432,6 +430,7 @@ def get_connector_indexing_status( ConnectorIndexingStatus( cc_pair_id=cc_pair.id, name=cc_pair.name, + cc_pair_status=cc_pair.status, connector=ConnectorSnapshot.from_connector_db_model(connector), credential=CredentialSnapshot.from_credential_db_model(credential), public_doc=cc_pair.is_public, @@ -456,7 +455,7 @@ def get_connector_indexing_status( if latest_index_attempt else None ), - deletion_attempt=get_deletion_status( + deletion_attempt=get_deletion_attempt_snapshot( connector_id=connector.id, credential_id=credential.id, db_session=db_session, @@ -550,12 +549,6 @@ def update_connector_from_model( status_code=404, detail=f"Connector {connector_id} does not exist" ) - if updated_connector.disabled: - cancel_indexing_attempts_for_connector(connector_id, db_session) - - # Just for good measure - cancel_indexing_attempts_past_model(db_session) - return ConnectorSnapshot( id=updated_connector.id, name=updated_connector.name, @@ -570,7 +563,6 @@ def update_connector_from_model( indexing_start=updated_connector.indexing_start, time_created=updated_connector.time_created, time_updated=updated_connector.time_updated, - disabled=updated_connector.disabled, ) @@ -793,7 +785,6 @@ def get_connector_by_id( ], time_created=connector.time_created, time_updated=connector.time_updated, - disabled=connector.disabled, ) diff --git a/backend/danswer/server/documents/models.py b/backend/danswer/server/documents/models.py index 822f7c38d..135de446d 100644 --- a/backend/danswer/server/documents/models.py +++ b/backend/danswer/server/documents/models.py @@ -7,6 +7,7 @@ from pydantic import BaseModel from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX from danswer.configs.constants import DocumentSource from danswer.connectors.models import InputType +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import Connector from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Credential @@ -39,7 +40,6 @@ class ConnectorBase(BaseModel): connector_specific_config: dict[str, Any] refresh_freq: int | None # In seconds, None for one time index with no refresh prune_freq: int | None - disabled: bool indexing_start: datetime | None @@ -70,7 +70,6 @@ class ConnectorSnapshot(ConnectorBase): indexing_start=connector.indexing_start, time_created=connector.time_created, time_updated=connector.time_updated, - disabled=connector.disabled, ) @@ -151,6 +150,7 @@ class IndexAttemptSnapshot(BaseModel): class CCPairFullInfo(BaseModel): id: int name: str + status: ConnectorCredentialPairStatus num_docs_indexed: int connector: ConnectorSnapshot credential: CredentialSnapshot @@ -168,6 +168,7 @@ class CCPairFullInfo(BaseModel): return cls( id=cc_pair_model.id, name=cc_pair_model.name, + status=cc_pair_model.status, num_docs_indexed=num_docs_indexed, connector=ConnectorSnapshot.from_connector_db_model( cc_pair_model.connector @@ -188,6 +189,7 @@ class ConnectorIndexingStatus(BaseModel): cc_pair_id: int name: str | None + cc_pair_status: ConnectorCredentialPairStatus connector: ConnectorSnapshot credential: CredentialSnapshot owner: str diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index e2ec1d2a5..b931db6df 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -13,12 +13,16 @@ from danswer.configs.app_configs import GENERATIVE_MODEL_ACCESS_CHECK_FREQ from danswer.configs.constants import DocumentSource from danswer.configs.constants import KV_GEN_AI_KEY_CHECK_TIME from danswer.db.connector_credential_pair import get_connector_credential_pair +from danswer.db.connector_credential_pair import ( + update_connector_credential_pair_from_id, +) from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.engine import get_session +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.feedback import fetch_docs_ranked_by_boost from danswer.db.feedback import update_document_boost from danswer.db.feedback import update_document_hidden -from danswer.db.index_attempt import cancel_indexing_attempts_for_connector +from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair from danswer.db.models import User from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index @@ -164,8 +168,8 @@ def create_deletion_attempt_for_connector_id( ) # Cancel any scheduled indexing attempts - cancel_indexing_attempts_for_connector( - connector_id=connector_id, db_session=db_session, include_secondary_index=True + cancel_indexing_attempts_for_ccpair( + cc_pair_id=cc_pair.id, db_session=db_session, include_secondary_index=True ) # Check if the deletion attempt should be allowed @@ -178,6 +182,13 @@ def create_deletion_attempt_for_connector_id( detail=deletion_attempt_disallowed_reason, ) + # mark as deleting + update_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair.id, + status=ConnectorCredentialPairStatus.DELETING, + ) + # actually kick off the deletion cleanup_connector_credential_pair_task.apply_async( kwargs=dict(connector_id=connector_id, credential_id=credential_id), ) diff --git a/backend/danswer/utils/variable_functionality.py b/backend/danswer/utils/variable_functionality.py index e7161132a..3162869d8 100644 --- a/backend/danswer/utils/variable_functionality.py +++ b/backend/danswer/utils/variable_functionality.py @@ -76,3 +76,7 @@ def fetch_versioned_implementation_with_fallback( e, ) return fallback + + +def noop_fallback(*args: Any, **kwargs: Any) -> None: + pass diff --git a/backend/ee/danswer/access/access.py b/backend/ee/danswer/access/access.py index 254e76e66..c2b05ee88 100644 --- a/backend/ee/danswer/access/access.py +++ b/backend/ee/danswer/access/access.py @@ -7,7 +7,6 @@ from danswer.access.access import _get_acl_for_user as get_acl_for_user_without_ from danswer.access.models import DocumentAccess from danswer.access.utils import prefix_user_group from danswer.db.models import User -from danswer.server.documents.models import ConnectorCredentialPairIdentifier from ee.danswer.db.user_group import fetch_user_groups_for_documents from ee.danswer.db.user_group import fetch_user_groups_for_user @@ -15,19 +14,16 @@ from ee.danswer.db.user_group import fetch_user_groups_for_user def _get_access_for_documents( document_ids: list[str], db_session: Session, - cc_pair_to_delete: ConnectorCredentialPairIdentifier | None, ) -> dict[str, DocumentAccess]: non_ee_access_dict = get_access_for_documents_without_groups( document_ids=document_ids, db_session=db_session, - cc_pair_to_delete=cc_pair_to_delete, ) user_group_info = { document_id: group_names for document_id, group_names in fetch_user_groups_for_documents( db_session=db_session, document_ids=document_ids, - cc_pair_to_delete=cc_pair_to_delete, ) } diff --git a/backend/ee/danswer/db/connector_credential_pair.py b/backend/ee/danswer/db/connector_credential_pair.py index a49381385..a21729134 100644 --- a/backend/ee/danswer/db/connector_credential_pair.py +++ b/backend/ee/danswer/db/connector_credential_pair.py @@ -1,13 +1,36 @@ +from sqlalchemy import delete from sqlalchemy.orm import Session from danswer.configs.constants import DocumentSource +from danswer.db.connector_credential_pair import get_connector_credential_pair from danswer.db.models import Connector from danswer.db.models import ConnectorCredentialPair +from danswer.db.models import UserGroup__ConnectorCredentialPair from danswer.utils.logger import setup_logger logger = setup_logger() +def _delete_connector_credential_pair_user_groups_relationship__no_commit( + db_session: Session, connector_id: int, credential_id: int +) -> None: + cc_pair = get_connector_credential_pair( + db_session=db_session, + connector_id=connector_id, + credential_id=credential_id, + ) + if cc_pair is None: + raise ValueError( + f"ConnectorCredentialPair with connector_id: {connector_id} " + f"and credential_id: {credential_id} not found" + ) + + stmt = delete(UserGroup__ConnectorCredentialPair).where( + UserGroup__ConnectorCredentialPair.cc_pair_id == cc_pair.id, + ) + db_session.execute(stmt) + + def get_cc_pairs_by_source( source_type: DocumentSource, db_session: Session, diff --git a/backend/ee/danswer/db/user_group.py b/backend/ee/danswer/db/user_group.py index 357c17b57..bdcb296fa 100644 --- a/backend/ee/danswer/db/user_group.py +++ b/backend/ee/danswer/db/user_group.py @@ -2,10 +2,13 @@ from collections.abc import Sequence from operator import and_ from uuid import UUID +from sqlalchemy import delete from sqlalchemy import func from sqlalchemy import select from sqlalchemy.orm import Session +from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id +from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Document from danswer.db.models import DocumentByConnectorCredentialPair @@ -15,7 +18,6 @@ from danswer.db.models import User from danswer.db.models import User__UserGroup from danswer.db.models import UserGroup from danswer.db.models import UserGroup__ConnectorCredentialPair -from danswer.server.documents.models import ConnectorCredentialPairIdentifier from ee.danswer.server.user_group.models import UserGroupCreate from ee.danswer.server.user_group.models import UserGroupUpdate @@ -90,7 +92,6 @@ def fetch_documents_for_user_group_paginated( def fetch_user_groups_for_documents( db_session: Session, document_ids: list[str], - cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None, ) -> Sequence[tuple[int, list[str]]]: stmt = ( select(Document.id, func.array_agg(UserGroup.name)) @@ -114,19 +115,12 @@ def fetch_user_groups_for_documents( .join(Document, Document.id == DocumentByConnectorCredentialPair.id) .where(Document.id.in_(document_ids)) .where(UserGroup__ConnectorCredentialPair.is_current == True) # noqa: E712 + # don't include CC pairs that are being deleted + # NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them + .where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING) .group_by(Document.id) ) - # pretend that the specified cc pair doesn't exist - if cc_pair_to_delete is not None: - stmt = stmt.where( - and_( - ConnectorCredentialPair.connector_id != cc_pair_to_delete.connector_id, - ConnectorCredentialPair.credential_id - != cc_pair_to_delete.credential_id, - ) - ) - return db_session.execute(stmt).all() # type: ignore @@ -343,3 +337,25 @@ def delete_user_group(db_session: Session, user_group: UserGroup) -> None: db_session.delete(user_group) db_session.commit() + + +def delete_user_group_cc_pair_relationship__no_commit( + cc_pair_id: int, db_session: Session +) -> None: + """Deletes all rows from UserGroup__ConnectorCredentialPair where the + connector_credential_pair_id matches the given cc_pair_id. + + Should be used very carefully (only for connectors that are being deleted).""" + cc_pair = get_connector_credential_pair_from_id(cc_pair_id, db_session) + if not cc_pair: + raise ValueError(f"Connector Credential Pair '{cc_pair_id}' does not exist") + + if cc_pair.status != ConnectorCredentialPairStatus.DELETING: + raise ValueError( + f"Connector Credential Pair '{cc_pair_id}' is not in the DELETING state" + ) + + delete_stmt = delete(UserGroup__ConnectorCredentialPair).where( + UserGroup__ConnectorCredentialPair.cc_pair_id == cc_pair_id, + ) + db_session.execute(delete_stmt) diff --git a/backend/scripts/force_delete_connector_by_id.py b/backend/scripts/force_delete_connector_by_id.py index 0ed9b3193..b3a5b8f9e 100644 --- a/backend/scripts/force_delete_connector_by_id.py +++ b/backend/scripts/force_delete_connector_by_id.py @@ -5,6 +5,8 @@ import sys from sqlalchemy import delete from sqlalchemy.orm import Session +from danswer.db.enums import ConnectorCredentialPairStatus + # Modify sys.path current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) @@ -22,7 +24,7 @@ from danswer.db.connector import fetch_connector_by_id from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.index_attempt import ( delete_index_attempts, - cancel_indexing_attempts_for_connector, + cancel_indexing_attempts_for_ccpair, ) from danswer.db.models import ConnectorCredentialPair from danswer.document_index.interfaces import DocumentIndex @@ -141,10 +143,10 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None: logger.error(f"Connector credential pair with ID {cc_pair_id} not found") return - if not cc_pair.connector.disabled: + if cc_pair.status != ConnectorCredentialPairStatus.PAUSED: logger.error( - f"Connector {cc_pair.connector.name} is not disabled, cannot continue. \ - Please navigate to the connector and disbale before attempting again" + f"Connector {cc_pair.connector.name} is not paused, cannot continue. \ + Please navigate to the connector and pause before attempting again" ) return @@ -159,8 +161,8 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None: return logger.info("Cancelling indexing attempt for the connector") - cancel_indexing_attempts_for_connector( - connector_id=connector_id, db_session=db_session, include_secondary_index=True + cancel_indexing_attempts_for_ccpair( + cc_pair_id=cc_pair_id, db_session=db_session, include_secondary_index=True ) validated_cc_pair = get_connector_credential_pair( diff --git a/backend/tests/integration/common/chat.py b/backend/tests/integration/common/chat.py new file mode 100644 index 000000000..cd33d4edc --- /dev/null +++ b/backend/tests/integration/common/chat.py @@ -0,0 +1,66 @@ +import requests +from sqlalchemy.orm import Session + +from danswer.db.models import User + + +def test_create_chat_session_and_send_messages(db_session: Session) -> None: + # Create a test user + test_user = User(email="test@example.com", hashed_password="dummy_hash") + db_session.add(test_user) + db_session.commit() + + base_url = "http://localhost:8080" # Adjust this to your API's base URL + headers = {"Authorization": f"Bearer {test_user.id}"} + + # Create a new chat session + create_session_response = requests.post( + f"{base_url}/chat/create-chat-session", + json={ + "description": "Test Chat", + "persona_id": 1, + }, # Assuming persona_id 1 exists + headers=headers, + ) + assert create_session_response.status_code == 200 + chat_session_id = create_session_response.json()["chat_session_id"] + + # Send first message + first_message = "Hello, this is a test message." + send_message_response = requests.post( + f"{base_url}/chat/send-message", + json={ + "chat_session_id": chat_session_id, + "message": first_message, + "prompt_id": None, + "retrieval_options": {"top_k": 3}, + "stream_response": False, + }, + headers=headers, + ) + assert send_message_response.status_code == 200 + + # Send second message + second_message = "Can you provide more information?" + send_message_response = requests.post( + f"{base_url}/chat/send-message", + json={ + "chat_session_id": chat_session_id, + "message": second_message, + "prompt_id": None, + "retrieval_options": {"top_k": 3}, + "stream_response": False, + }, + headers=headers, + ) + assert send_message_response.status_code == 200 + + # Verify chat session details + get_session_response = requests.get( + f"{base_url}/chat/get-chat-session/{chat_session_id}", headers=headers + ) + assert get_session_response.status_code == 200 + session_details = get_session_response.json() + assert session_details["chat_session_id"] == chat_session_id + assert session_details["description"] == "Test Chat" + assert len(session_details["messages"]) == 4 # 2 user messages + 2 AI responses diff --git a/backend/tests/integration/common/connectors.py b/backend/tests/integration/common/connectors.py new file mode 100644 index 000000000..0477f9ba9 --- /dev/null +++ b/backend/tests/integration/common/connectors.py @@ -0,0 +1,114 @@ +import uuid +from typing import cast + +import requests +from pydantic import BaseModel + +from danswer.configs.constants import DocumentSource +from danswer.db.enums import ConnectorCredentialPairStatus +from tests.integration.common.constants import API_SERVER_URL + + +class ConnectorCreationDetails(BaseModel): + connector_id: int + credential_id: int + cc_pair_id: int + + +class ConnectorClient: + @staticmethod + def create_connector( + name_prefix: str = "test_connector", credential_id: int | None = None + ) -> ConnectorCreationDetails: + unique_id = uuid.uuid4() + + connector_name = f"{name_prefix}_{unique_id}" + connector_data = { + "name": connector_name, + "source": DocumentSource.NOT_APPLICABLE, + "input_type": "load_state", + "connector_specific_config": {}, + "refresh_freq": 60, + "disabled": True, + } + response = requests.post( + f"{API_SERVER_URL}/manage/admin/connector", + json=connector_data, + ) + response.raise_for_status() + connector_id = response.json()["id"] + + # associate the credential with the connector + if not credential_id: + print("ID not specified, creating new credential") + # Create a new credential + credential_data = { + "credential_json": {}, + "admin_public": True, + "source": DocumentSource.NOT_APPLICABLE, + } + response = requests.post( + f"{API_SERVER_URL}/manage/credential", + json=credential_data, + ) + response.raise_for_status() + credential_id = cast(int, response.json()["id"]) + + cc_pair_metadata = {"name": f"test_cc_pair_{unique_id}", "is_public": True} + response = requests.put( + f"{API_SERVER_URL}/manage/connector/{connector_id}/credential/{credential_id}", + json=cc_pair_metadata, + ) + response.raise_for_status() + + # fetch the conenector credential pair id using the indexing status API + response = requests.get( + f"{API_SERVER_URL}/manage/admin/connector/indexing-status" + ) + response.raise_for_status() + indexing_statuses = response.json() + + cc_pair_id = None + for status in indexing_statuses: + if ( + status["connector"]["id"] == connector_id + and status["credential"]["id"] == credential_id + ): + cc_pair_id = status["cc_pair_id"] + break + + if cc_pair_id is None: + raise ValueError("Could not find the connector credential pair id") + + print( + f"Created connector with connector_id: {connector_id}, credential_id: {credential_id}, cc_pair_id: {cc_pair_id}" + ) + return ConnectorCreationDetails( + connector_id=int(connector_id), + credential_id=int(credential_id), + cc_pair_id=int(cc_pair_id), + ) + + @staticmethod + def update_connector_status( + cc_pair_id: int, status: ConnectorCredentialPairStatus + ) -> None: + response = requests.put( + f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair_id}/status", + json={"status": status}, + ) + response.raise_for_status() + + @staticmethod + def delete_connector(connector_id: int, credential_id: int) -> None: + response = requests.post( + f"{API_SERVER_URL}/manage/admin/deletion-attempt", + json={"connector_id": connector_id, "credential_id": credential_id}, + ) + response.raise_for_status() + + @staticmethod + def get_connectors() -> list[dict]: + response = requests.get(f"{API_SERVER_URL}/manage/connector") + response.raise_for_status() + return response.json() diff --git a/backend/tests/integration/common/constants.py b/backend/tests/integration/common/constants.py index 304a31b62..c41e3cd56 100644 --- a/backend/tests/integration/common/constants.py +++ b/backend/tests/integration/common/constants.py @@ -1 +1,2 @@ API_SERVER_URL = "http://localhost:8080" +MAX_DELAY = 30 diff --git a/backend/tests/integration/common/document_sets.py b/backend/tests/integration/common/document_sets.py new file mode 100644 index 000000000..d55dce552 --- /dev/null +++ b/backend/tests/integration/common/document_sets.py @@ -0,0 +1,30 @@ +from typing import cast + +import requests + +from danswer.server.features.document_set.models import DocumentSet +from danswer.server.features.document_set.models import DocumentSetCreationRequest +from tests.integration.common.constants import API_SERVER_URL + + +class DocumentSetClient: + @staticmethod + def create_document_set( + doc_set_creation_request: DocumentSetCreationRequest, + ) -> int: + response = requests.post( + f"{API_SERVER_URL}/manage/admin/document-set", + json=doc_set_creation_request.dict(), + ) + response.raise_for_status() + return cast(int, response.json()) + + @staticmethod + def fetch_document_sets() -> list[DocumentSet]: + response = requests.get(f"{API_SERVER_URL}/manage/admin/document-set") + response.raise_for_status() + + document_sets = [ + DocumentSet.parse_obj(doc_set_data) for doc_set_data in response.json() + ] + return document_sets diff --git a/backend/tests/integration/common/reset.py b/backend/tests/integration/common/reset.py index 56760a74c..456acd05e 100644 --- a/backend/tests/integration/common/reset.py +++ b/backend/tests/integration/common/reset.py @@ -82,6 +82,10 @@ def reset_postgres(database: str = "postgres") -> None: if table_name == "alembic_version": continue + # Don't touch Kombu + if table_name == "kombu_message" or table_name == "kombu_queue": + continue + cur.execute(f'DELETE FROM "{table_name}"') # Re-enable triggers diff --git a/backend/tests/integration/common/seed_documents.py b/backend/tests/integration/common/seed_documents.py index 3993aadf5..9aebc30a0 100644 --- a/backend/tests/integration/common/seed_documents.py +++ b/backend/tests/integration/common/seed_documents.py @@ -1,10 +1,10 @@ import uuid -from typing import cast import requests from pydantic import BaseModel from danswer.configs.constants import DocumentSource +from tests.integration.common.connectors import ConnectorClient from tests.integration.common.constants import API_SERVER_URL @@ -15,34 +15,12 @@ class SeedDocumentResponse(BaseModel): class TestDocumentClient: @staticmethod - def seed_documents(num_docs: int = 5) -> SeedDocumentResponse: - unique_id = uuid.uuid4() - - # Create a connector - connector_name = f"test_connector_{unique_id}" - connector_data = { - "name": connector_name, - "source": DocumentSource.NOT_APPLICABLE, - "input_type": "load_state", - "connector_specific_config": {}, - "refresh_freq": 60, - "disabled": True, - } - response = requests.post( - f"{API_SERVER_URL}/manage/admin/connector", - json=connector_data, - ) - response.raise_for_status() - connector_id = response.json()["id"] - - # Associate the credential with the connector - cc_pair_metadata = {"name": f"test_cc_pair_{unique_id}", "is_public": True} - response = requests.put( - f"{API_SERVER_URL}/manage/connector/{connector_id}/credential/0", - json=cc_pair_metadata, - ) - response.raise_for_status() - cc_pair_id = cast(int, response.json()["data"]) + def seed_documents( + num_docs: int = 5, cc_pair_id: int | None = None + ) -> SeedDocumentResponse: + if not cc_pair_id: + connector_details = ConnectorClient.create_connector() + cc_pair_id = connector_details.cc_pair_id # Create and ingest some documents document_ids: list[str] = [] diff --git a/backend/tests/integration/common/user_groups.py b/backend/tests/integration/common/user_groups.py new file mode 100644 index 000000000..1819b4a3a --- /dev/null +++ b/backend/tests/integration/common/user_groups.py @@ -0,0 +1,24 @@ +from typing import cast + +import requests + +from ee.danswer.server.user_group.models import UserGroup +from ee.danswer.server.user_group.models import UserGroupCreate +from tests.integration.common.constants import API_SERVER_URL + + +class UserGroupClient: + @staticmethod + def create_user_group(user_group_creation_request: UserGroupCreate) -> int: + response = requests.post( + f"{API_SERVER_URL}/manage/admin/user-group", + json=user_group_creation_request.dict(), + ) + response.raise_for_status() + return cast(int, response.json()["id"]) + + @staticmethod + def fetch_user_groups() -> list[UserGroup]: + response = requests.get(f"{API_SERVER_URL}/manage/admin/user-group") + response.raise_for_status() + return [UserGroup(**ug) for ug in response.json()] diff --git a/backend/tests/integration/connector/test_deletion.py b/backend/tests/integration/connector/test_deletion.py new file mode 100644 index 000000000..4d45efa04 --- /dev/null +++ b/backend/tests/integration/connector/test_deletion.py @@ -0,0 +1,190 @@ +import time + +from danswer.db.enums import ConnectorCredentialPairStatus +from danswer.server.features.document_set.models import DocumentSetCreationRequest +from tests.integration.common.connectors import ConnectorClient +from tests.integration.common.constants import MAX_DELAY +from tests.integration.common.document_sets import DocumentSetClient +from tests.integration.common.seed_documents import TestDocumentClient +from tests.integration.common.user_groups import UserGroupClient +from tests.integration.common.user_groups import UserGroupCreate +from tests.integration.common.vespa import TestVespaClient + + +def test_connector_deletion(reset: None, vespa_client: TestVespaClient) -> None: + # create connectors + c1_details = ConnectorClient.create_connector(name_prefix="tc1") + c2_details = ConnectorClient.create_connector(name_prefix="tc2") + c1_seed_res = TestDocumentClient.seed_documents( + num_docs=5, cc_pair_id=c1_details.cc_pair_id + ) + c2_seed_res = TestDocumentClient.seed_documents( + num_docs=5, cc_pair_id=c2_details.cc_pair_id + ) + + # create document sets + doc_set_1_id = DocumentSetClient.create_document_set( + DocumentSetCreationRequest( + name="Test Document Set 1", + description="Intially connector to be deleted, should be empty after test", + cc_pair_ids=[c1_details.cc_pair_id], + is_public=True, + users=[], + groups=[], + ) + ) + + doc_set_2_id = DocumentSetClient.create_document_set( + DocumentSetCreationRequest( + name="Test Document Set 2", + description="Intially both connectors, should contain undeleted connector after test", + cc_pair_ids=[c1_details.cc_pair_id, c2_details.cc_pair_id], + is_public=True, + users=[], + groups=[], + ) + ) + + # wait for document sets to be synced + start = time.time() + while True: + doc_sets = DocumentSetClient.fetch_document_sets() + doc_set_1 = next( + (doc_set for doc_set in doc_sets if doc_set.id == doc_set_1_id), None + ) + doc_set_2 = next( + (doc_set for doc_set in doc_sets if doc_set.id == doc_set_2_id), None + ) + + if not doc_set_1 or not doc_set_2: + raise RuntimeError("Document set not found") + + if doc_set_1.is_up_to_date and doc_set_2.is_up_to_date: + break + + if time.time() - start > MAX_DELAY: + raise TimeoutError("Document sets were not synced within the max delay") + + time.sleep(2) + + print("Document sets created and synced") + + # if so, create ACLs + user_group_1 = UserGroupClient.create_user_group( + UserGroupCreate( + name="Test User Group 1", user_ids=[], cc_pair_ids=[c1_details.cc_pair_id] + ) + ) + user_group_2 = UserGroupClient.create_user_group( + UserGroupCreate( + name="Test User Group 2", + user_ids=[], + cc_pair_ids=[c1_details.cc_pair_id, c2_details.cc_pair_id], + ) + ) + + # wait for user groups to be available + start = time.time() + while True: + user_groups = {ug.id: ug for ug in UserGroupClient.fetch_user_groups()} + + if not ( + user_group_1 in user_groups.keys() and user_group_2 in user_groups.keys() + ): + raise RuntimeError("User groups not found") + + if ( + user_groups[user_group_1].is_up_to_date + and user_groups[user_group_2].is_up_to_date + ): + break + + if time.time() - start > MAX_DELAY: + raise TimeoutError("User groups were not synced within the max delay") + + time.sleep(2) + + print("User groups created and synced") + + # delete connector 1 + ConnectorClient.update_connector_status( + cc_pair_id=c1_details.cc_pair_id, status=ConnectorCredentialPairStatus.PAUSED + ) + ConnectorClient.delete_connector( + connector_id=c1_details.connector_id, credential_id=c1_details.credential_id + ) + + start = time.time() + while True: + connectors = ConnectorClient.get_connectors() + + if c1_details.connector_id not in [c["id"] for c in connectors]: + break + + if time.time() - start > MAX_DELAY: + raise TimeoutError("Connector 1 was not deleted within the max delay") + + time.sleep(2) + + print("Connector 1 deleted") + + # validate vespa documents + c1_vespa_docs = vespa_client.get_documents_by_id(c1_seed_res.document_ids)[ + "documents" + ] + c2_vespa_docs = vespa_client.get_documents_by_id(c2_seed_res.document_ids)[ + "documents" + ] + + assert len(c1_vespa_docs) == 0 + assert len(c2_vespa_docs) == 5 + + for doc in c2_vespa_docs: + assert doc["fields"]["access_control_list"] == { + "PUBLIC": 1, + "group:Test User Group 2": 1, + } + assert doc["fields"]["document_sets"] == {"Test Document Set 2": 1} + + # check that only connector 1 is deleted + # TODO: check for the CC pair rather than the connector once the refactor is done + all_connectors = ConnectorClient.get_connectors() + assert len(all_connectors) == 1 + assert all_connectors[0]["id"] == c2_details.connector_id + + # validate document sets + all_doc_sets = DocumentSetClient.fetch_document_sets() + assert len(all_doc_sets) == 2 + + doc_set_1_found = False + doc_set_2_found = False + for doc_set in all_doc_sets: + if doc_set.id == doc_set_1_id: + doc_set_1_found = True + assert doc_set.cc_pair_descriptors == [] + + if doc_set.id == doc_set_2_id: + doc_set_2_found = True + assert len(doc_set.cc_pair_descriptors) == 1 + assert doc_set.cc_pair_descriptors[0].id == c2_details.cc_pair_id + + assert doc_set_1_found + assert doc_set_2_found + + # validate user groups + all_user_groups = UserGroupClient.fetch_user_groups() + assert len(all_user_groups) == 2 + + user_group_1_found = False + user_group_2_found = False + for user_group in all_user_groups: + if user_group.id == user_group_1: + user_group_1_found = True + assert user_group.cc_pairs == [] + if user_group.id == user_group_2: + user_group_2_found = True + assert len(user_group.cc_pairs) == 1 + assert user_group.cc_pairs[0].id == c2_details.cc_pair_id + + assert user_group_1_found + assert user_group_2_found diff --git a/backend/tests/regression/answer_quality/api_utils.py b/backend/tests/regression/answer_quality/api_utils.py index 13455832b..5d74b7a5a 100644 --- a/backend/tests/regression/answer_quality/api_utils.py +++ b/backend/tests/regression/answer_quality/api_utils.py @@ -165,7 +165,6 @@ def create_connector(env_name: str, file_paths: list[str]) -> int: connector_specific_config={"file_locations": file_paths}, refresh_freq=None, prune_freq=None, - disabled=False, indexing_start=None, ) diff --git a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx index 49ac99a8d..7ea03747b 100644 --- a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx +++ b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx @@ -1,7 +1,7 @@ "use client"; import { Button } from "@tremor/react"; -import { CCPairFullInfo } from "./types"; +import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types"; import { usePopup } from "@/components/admin/connectors/Popup"; import { FiTrash } from "react-icons/fi"; import { deleteCCPair } from "@/lib/documentDeletion"; @@ -16,7 +16,7 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { ccPair?.latest_deletion_attempt?.status === "STARTED"; let tooltip: string; - if (ccPair.connector.disabled) { + if (ccPair.status !== ConnectorCredentialPairStatus.ACTIVE) { if (isDeleting) { tooltip = "This connector is currently being deleted"; } else { @@ -41,7 +41,9 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { ) } icon={FiTrash} - disabled={!ccPair.connector.disabled || isDeleting} + disabled={ + ccPair.status === ConnectorCredentialPairStatus.ACTIVE || isDeleting + } tooltip={tooltip} > Delete diff --git a/web/src/app/admin/connector/[ccPairId]/ModifyStatusButtonCluster.tsx b/web/src/app/admin/connector/[ccPairId]/ModifyStatusButtonCluster.tsx index 83d6363f6..10460459e 100644 --- a/web/src/app/admin/connector/[ccPairId]/ModifyStatusButtonCluster.tsx +++ b/web/src/app/admin/connector/[ccPairId]/ModifyStatusButtonCluster.tsx @@ -1,11 +1,11 @@ "use client"; import { Button } from "@tremor/react"; -import { CCPairFullInfo } from "./types"; +import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types"; import { usePopup } from "@/components/admin/connectors/Popup"; -import { disableConnector } from "@/lib/connector"; import { mutate } from "swr"; import { buildCCPairInfoUrl } from "./lib"; +import { setCCPairStatus } from "@/lib/ccPair"; export function ModifyStatusButtonCluster({ ccPair, @@ -17,13 +17,16 @@ export function ModifyStatusButtonCluster({ return ( <> {popup} - {ccPair.connector.disabled ? ( + {ccPair.status === ConnectorCredentialPairStatus.PAUSED ? (