From c9a3b45ad46313264042da598420a3e4172d933d Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Mon, 24 Feb 2025 14:41:13 -0800 Subject: [PATCH] more aggressive handling of tasks blocking deletion (#4093) * more aggressive handling of tasks blocking deletion * comment updated --------- Co-authored-by: Richard Kuo (Danswer) --- .../celery/tasks/connector_deletion/tasks.py | 80 ++++++++++++++++++- backend/onyx/redis/redis_connector_stop.py | 24 +++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 066f520ec..f54aea791 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -52,6 +52,51 @@ class TaskDependencyError(RuntimeError): with connector deletion.""" +def revoke_tasks_blocking_deletion( + redis_connector: RedisConnector, db_session: Session, app: Celery +) -> None: + search_settings_list = get_all_search_settings(db_session) + for search_settings in search_settings_list: + redis_connector_index = redis_connector.new_index(search_settings.id) + try: + index_payload = redis_connector_index.payload + if index_payload and index_payload.celery_task_id: + app.control.revoke(index_payload.celery_task_id) + task_logger.info( + f"Revoked indexing task {index_payload.celery_task_id}." + ) + except Exception: + task_logger.exception("Exception while revoking indexing task") + + try: + permissions_sync_payload = redis_connector.permissions.payload + if permissions_sync_payload and permissions_sync_payload.celery_task_id: + app.control.revoke(permissions_sync_payload.celery_task_id) + task_logger.info( + f"Revoked permissions sync task {permissions_sync_payload.celery_task_id}." + ) + except Exception: + task_logger.exception("Exception while revoking pruning task") + + try: + prune_payload = redis_connector.prune.payload + if prune_payload and prune_payload.celery_task_id: + app.control.revoke(prune_payload.celery_task_id) + task_logger.info(f"Revoked pruning task {prune_payload.celery_task_id}.") + except Exception: + task_logger.exception("Exception while revoking permissions sync task") + + try: + external_group_sync_payload = redis_connector.external_group_sync.payload + if external_group_sync_payload and external_group_sync_payload.celery_task_id: + app.control.revoke(external_group_sync_payload.celery_task_id) + task_logger.info( + f"Revoked external group sync task {external_group_sync_payload.celery_task_id}." + ) + except Exception: + task_logger.exception("Exception while revoking external group sync task") + + @shared_task( name=OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, ignore_result=True, @@ -70,7 +115,7 @@ def check_for_connector_deletion_task( timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) - # these tasks should never overlap + # Prevent this task from overlapping with itself if not lock_beat.acquire(blocking=False): return None @@ -92,9 +137,38 @@ def check_for_connector_deletion_task( ) except TaskDependencyError as e: # this means we wanted to start deleting but dependent tasks were running - # Leave a stop signal to clear indexing and pruning tasks more quickly + # on the first error, we set a stop signal and revoke the dependent tasks + # on subsequent errors, we hard reset blocking fences after our specified timeout + # is exceeded task_logger.info(str(e)) - redis_connector.stop.set_fence(True) + + if not redis_connector.stop.fenced: + # one time revoke of celery tasks + task_logger.info("Revoking any tasks blocking deletion.") + revoke_tasks_blocking_deletion( + redis_connector, db_session, self.app + ) + redis_connector.stop.set_fence(True) + redis_connector.stop.set_timeout() + else: + # stop signal already set + if redis_connector.stop.timed_out: + # waiting too long, just reset blocking fences + task_logger.info( + "Timed out waiting for tasks blocking deletion. Resetting blocking fences." + ) + search_settings_list = get_all_search_settings(db_session) + for search_settings in search_settings_list: + redis_connector_index = redis_connector.new_index( + search_settings.id + ) + redis_connector_index.reset() + redis_connector.prune.reset() + redis_connector.permissions.reset() + redis_connector.external_group_sync.reset() + else: + # just wait + pass else: # clear the stop signal if it exists ... no longer needed redis_connector.stop.set_fence(False) diff --git a/backend/onyx/redis/redis_connector_stop.py b/backend/onyx/redis/redis_connector_stop.py index c65c57ff7..3567cf9b6 100644 --- a/backend/onyx/redis/redis_connector_stop.py +++ b/backend/onyx/redis/redis_connector_stop.py @@ -5,7 +5,13 @@ class RedisConnectorStop: """Manages interactions with redis for stop signaling. Should only be accessed through RedisConnector.""" - FENCE_PREFIX = "connectorstop_fence" + PREFIX = "connectorstop" + FENCE_PREFIX = f"{PREFIX}_fence" + + # if this timeout is exceeded, the caller may decide to take more + # drastic measures + TIMEOUT_PREFIX = f"{PREFIX}_timeout" + TIMEOUT_TTL = 300 def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None: self.tenant_id: str | None = tenant_id @@ -13,6 +19,7 @@ class RedisConnectorStop: self.redis = redis self.fence_key: str = f"{self.FENCE_PREFIX}_{id}" + self.timeout_key: str = f"{self.TIMEOUT_PREFIX}_{id}" @property def fenced(self) -> bool: @@ -28,7 +35,22 @@ class RedisConnectorStop: self.redis.set(self.fence_key, 0) + @property + def timed_out(self) -> bool: + if self.redis.exists(self.timeout_key): + return False + + return True + + def set_timeout(self) -> None: + """After calling this, call timed_out to determine if the timeout has been + exceeded.""" + self.redis.set(f"{self.timeout_key}", 0, ex=self.TIMEOUT_TTL) + @staticmethod def reset_all(r: redis.Redis) -> None: for key in r.scan_iter(RedisConnectorStop.FENCE_PREFIX + "*"): r.delete(key) + + for key in r.scan_iter(RedisConnectorStop.TIMEOUT_PREFIX + "*"): + r.delete(key)