From 9b6c7625fd136a3c8ad62c87e50fc66d2c4fc63e Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Tue, 8 Apr 2025 12:47:07 -0700 Subject: [PATCH] Bugfix/cloud checkpoint cleanup (#4478) * use send_task to be consistent * add pidbox monitoring task * add logging so we can track the task execution * log the idletime of the pidbox --------- Co-authored-by: Richard Kuo (Onyx) --- .../background/celery/tasks/beat_schedule.py | 12 ++++- .../background/celery/tasks/indexing/tasks.py | 25 ++++++++-- .../celery/tasks/monitoring/tasks.py | 48 +++++++++++++++++++ .../celery/tasks/tenant_provisioning/tasks.py | 2 +- backend/onyx/configs/constants.py | 7 ++- 5 files changed, 86 insertions(+), 8 deletions(-) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 8bcef470f..c161aff34 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -179,7 +179,7 @@ beat_cloud_tasks: list[dict] = [ }, { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants", - "task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, + "task": OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS, "schedule": timedelta(minutes=10), "options": { "queue": OnyxCeleryQueues.MONITORING, @@ -187,6 +187,16 @@ beat_cloud_tasks: list[dict] = [ "expires": BEAT_EXPIRES_DEFAULT, }, }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-celery-pidbox", + "task": OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX, + "schedule": timedelta(hours=4), + "options": { + "queue": OnyxCeleryQueues.MONITORING, + "priority": OnyxCeleryPriority.HIGH, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, ] # tasks that only run self hosted diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 68d1afcd7..2158b283b 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -44,6 +44,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT +from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisConstants @@ -1234,8 +1235,9 @@ def connector_indexing_proxy_task( @shared_task( name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP, soft_time_limit=300, + bind=True, ) -def check_for_checkpoint_cleanup(*, tenant_id: str) -> None: +def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None: """Clean up old checkpoints that are older than 7 days.""" locked = False redis_client = get_redis_client(tenant_id=tenant_id) @@ -1256,14 +1258,15 @@ def check_for_checkpoint_cleanup(*, tenant_id: str) -> None: task_logger.info( f"Cleaning up checkpoint for index attempt {attempt.id}" ) - cleanup_checkpoint_task.apply_async( + self.app.send_task( + OnyxCeleryTask.CLEANUP_CHECKPOINT, kwargs={ "index_attempt_id": attempt.id, "tenant_id": tenant_id, }, queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP, + priority=OnyxCeleryPriority.MEDIUM, ) - except Exception: task_logger.exception("Unexpected exception during checkpoint cleanup") return None @@ -1287,5 +1290,17 @@ def cleanup_checkpoint_task( self: Task, *, index_attempt_id: int, tenant_id: str | None ) -> None: """Clean up a checkpoint for a given index attempt""" - with get_session_with_current_tenant() as db_session: - cleanup_checkpoint(db_session, index_attempt_id) + + start = time.monotonic() + + try: + with get_session_with_current_tenant() as db_session: + cleanup_checkpoint(db_session, index_attempt_id) + finally: + elapsed = time.monotonic() - start + + task_logger.info( + f"cleanup_checkpoint_task completed: tenant_id={tenant_id} " + f"index_attempt_id={index_attempt_id} " + f"elapsed={elapsed:.2f}" + ) diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 836c58dc5..26c775975 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -4,6 +4,7 @@ from collections.abc import Callable from datetime import timedelta from itertools import islice from typing import Any +from typing import cast from typing import Literal import psutil @@ -998,3 +999,50 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None: except Exception: task_logger.exception("Error in monitor_process_memory task") + + +@shared_task( + name=OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX, ignore_result=True, bind=True +) +def cloud_monitor_celery_pidbox( + self: Task, +) -> None: + """ + Celery can leave behind orphaned pidboxes from old workers that are idle and never cleaned up. + This task removes them based on idle time to avoid Redis clutter and overflowing the instance. + This is a real issue we've observed in production. + + Note: + - Setting CELERY_ENABLE_REMOTE_CONTROL = False would prevent pidbox keys entirely, + but might also disable features like inspect, broadcast, and worker remote control. + Use with caution. + """ + + num_deleted = 0 + + MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds + r_celery: Redis = self.app.broker_connection().channel().client # type: ignore + for key in r_celery.scan_iter("*.reply.celery.pidbox"): + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if key_str.startswith("_kombu"): + continue + + idletime_raw = r_celery.object("idletime", key) + if idletime_raw is None: + continue + + idletime = cast(int, idletime_raw) + if idletime < MAX_PIDBOX_IDLE: + continue + + r_celery.delete(key) + task_logger.info( + f"Deleted idle pidbox: pidbox={key_str} " + f"idletime={idletime} " + f"max_idletime={MAX_PIDBOX_IDLE}" + ) + num_deleted += 1 + + # Enable later in case we want some aggregate metrics + # task_logger.info(f"Deleted idle pidbox: pidbox={key_str}") diff --git a/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py index 133b9f95d..50bb201e6 100644 --- a/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py +++ b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py @@ -35,7 +35,7 @@ _TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes @shared_task( - name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, + name=OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS, queue=OnyxCeleryQueues.MONITORING, ignore_result=True, soft_time_limit=JOB_TIMEOUT, diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index e6a81de6a..5a3d4faf0 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -398,7 +398,12 @@ class OnyxCeleryTask: CLOUD_MONITOR_CELERY_QUEUES = ( f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues" ) - CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants" + CLOUD_CHECK_AVAILABLE_TENANTS = ( + f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants" + ) + CLOUD_MONITOR_CELERY_PIDBOX = ( + f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_pidbox" + ) # Tenant pre-provisioning PRE_PROVISION_TENANT = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_pre_provision_tenant"