From d972a78f45a696999abf727f9b38c49fada5ea47 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 9 Jan 2025 17:39:45 -0800 Subject: [PATCH] Make connector pause and delete fast (#3646) * first cut * refresh on delete --------- Co-authored-by: Richard Kuo (Danswer) --- .../onyx/background/celery/celery_utils.py | 22 +++-- backend/onyx/server/documents/cc_pair.py | 97 +++++-------------- .../connector/[ccPairId]/DeletionButton.tsx | 32 ++++-- .../app/admin/connector/[ccPairId]/page.tsx | 2 +- 4 files changed, 63 insertions(+), 90 deletions(-) diff --git a/backend/onyx/background/celery/celery_utils.py b/backend/onyx/background/celery/celery_utils.py index fc6fef1fa..394dff352 100644 --- a/backend/onyx/background/celery/celery_utils.py +++ b/backend/onyx/background/celery/celery_utils.py @@ -14,6 +14,7 @@ from onyx.connectors.interfaces import PollConnector from onyx.connectors.interfaces import SlimConnector from onyx.connectors.models import Document from onyx.db.connector_credential_pair import get_connector_credential_pair +from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import TaskStatus from onyx.db.models import TaskQueueState from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface @@ -41,14 +42,21 @@ def _get_deletion_status( return None redis_connector = RedisConnector(tenant_id, cc_pair.id) - if not redis_connector.delete.fenced: - return None + if redis_connector.delete.fenced: + return TaskQueueState( + task_id="", + task_name=redis_connector.delete.fence_key, + status=TaskStatus.STARTED, + ) - return TaskQueueState( - task_id="", - task_name=redis_connector.delete.fence_key, - status=TaskStatus.STARTED, - ) + if cc_pair.status == ConnectorCredentialPairStatus.DELETING: + return TaskQueueState( + task_id="", + task_name=redis_connector.delete.fence_key, + status=TaskStatus.PENDING, + ) + + return None def get_deletion_attempt_snapshot( diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index cf8746953..64086de5d 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -164,17 +164,12 @@ def update_cc_pair_status( db_session: Session = Depends(get_session), tenant_id: str | None = Depends(get_current_tenant_id), ) -> JSONResponse: - """This method may wait up to 30 seconds if pausing the connector due to the need to - terminate tasks in progress. Tasks are not guaranteed to terminate within the - timeout. + """This method returns nearly immediately. It simply sets some signals and + optimistically assumes any running background processes will clean themselves up. + This is done to improve the perceived end user experience. Returns HTTPStatus.OK if everything finished. - Returns HTTPStatus.ACCEPTED if the connector is being paused, but background tasks - did not finish within the timeout. """ - WAIT_TIMEOUT = 15.0 - still_terminating = False - cc_pair = get_connector_credential_pair_from_id( cc_pair_id=cc_pair_id, db_session=db_session, @@ -188,73 +183,37 @@ def update_cc_pair_status( detail="Connection not found for current user's permissions", ) + redis_connector = RedisConnector(tenant_id, cc_pair_id) if status_update_request.status == ConnectorCredentialPairStatus.PAUSED: + redis_connector.stop.set_fence(True) + search_settings_list: list[SearchSettings] = get_active_search_settings( db_session ) - redis_connector = RedisConnector(tenant_id, cc_pair_id) + while True: + for search_settings in search_settings_list: + redis_connector_index = redis_connector.new_index(search_settings.id) + if not redis_connector_index.fenced: + continue - try: - redis_connector.stop.set_fence(True) - while True: - logger.debug( - f"Wait for indexing soft termination starting: cc_pair={cc_pair_id}" - ) - wait_succeeded = redis_connector.wait_for_indexing_termination( - search_settings_list, WAIT_TIMEOUT - ) - if wait_succeeded: - logger.debug( - f"Wait for indexing soft termination succeeded: cc_pair={cc_pair_id}" - ) - break + index_payload = redis_connector_index.payload + if not index_payload: + continue - logger.debug( - "Wait for indexing soft termination timed out. " - f"Moving to hard termination: cc_pair={cc_pair_id} timeout={WAIT_TIMEOUT:.2f}" - ) + if not index_payload.celery_task_id: + continue - for search_settings in search_settings_list: - redis_connector_index = redis_connector.new_index( - search_settings.id - ) - if not redis_connector_index.fenced: - continue + # Revoke the task to prevent it from running + primary_app.control.revoke(index_payload.celery_task_id) - index_payload = redis_connector_index.payload - if not index_payload: - continue + # If it is running, then signaling for termination will get the + # watchdog thread to kill the spawned task + redis_connector_index.set_terminate(index_payload.celery_task_id) - if not index_payload.celery_task_id: - continue - - # Revoke the task to prevent it from running - primary_app.control.revoke(index_payload.celery_task_id) - - # If it is running, then signaling for termination will get the - # watchdog thread to kill the spawned task - redis_connector_index.set_terminate(index_payload.celery_task_id) - - logger.debug( - f"Wait for indexing hard termination starting: cc_pair={cc_pair_id}" - ) - wait_succeeded = redis_connector.wait_for_indexing_termination( - search_settings_list, WAIT_TIMEOUT - ) - if wait_succeeded: - logger.debug( - f"Wait for indexing hard termination succeeded: cc_pair={cc_pair_id}" - ) - break - - logger.debug( - f"Wait for indexing hard termination timed out: cc_pair={cc_pair_id}" - ) - still_terminating = True - break - finally: - redis_connector.stop.set_fence(False) + break + else: + redis_connector.stop.set_fence(False) update_connector_credential_pair_from_id( db_session=db_session, @@ -264,14 +223,6 @@ def update_cc_pair_status( db_session.commit() - if still_terminating: - return JSONResponse( - status_code=HTTPStatus.ACCEPTED, - content={ - "message": "Request accepted, background task termination still in progress" - }, - ) - return JSONResponse( status_code=HTTPStatus.OK, content={"message": str(HTTPStatus.OK)} ) diff --git a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx index ccef14b5a..fe430af33 100644 --- a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx +++ b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx @@ -8,7 +8,13 @@ import { deleteCCPair } from "@/lib/documentDeletion"; import { mutate } from "swr"; import { buildCCPairInfoUrl } from "./lib"; -export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { +export function DeletionButton({ + ccPair, + refresh, +}: { + ccPair: CCPairFullInfo; + refresh: () => void; +}) { const { popup, setPopup } = usePopup(); const isDeleting = @@ -31,14 +37,22 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { {popup}