mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-19 01:01:14 +02:00
Bugfix/cloud checkpoint cleanup (#4478)
* use send_task to be consistent * add pidbox monitoring task * add logging so we can track the task execution * log the idletime of the pidbox --------- Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
parent
634d990cb8
commit
9b6c7625fd
@ -179,7 +179,7 @@ beat_cloud_tasks: list[dict] = [
|
||||
},
|
||||
{
|
||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants",
|
||||
"task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
|
||||
"task": OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
|
||||
"schedule": timedelta(minutes=10),
|
||||
"options": {
|
||||
"queue": OnyxCeleryQueues.MONITORING,
|
||||
@ -187,6 +187,16 @@ beat_cloud_tasks: list[dict] = [
|
||||
"expires": BEAT_EXPIRES_DEFAULT,
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-celery-pidbox",
|
||||
"task": OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX,
|
||||
"schedule": timedelta(hours=4),
|
||||
"options": {
|
||||
"queue": OnyxCeleryQueues.MONITORING,
|
||||
"priority": OnyxCeleryPriority.HIGH,
|
||||
"expires": BEAT_EXPIRES_DEFAULT,
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
# tasks that only run self hosted
|
||||
|
@ -44,6 +44,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
|
||||
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
|
||||
from onyx.configs.constants import OnyxCeleryPriority
|
||||
from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisConstants
|
||||
@ -1234,8 +1235,9 @@ def connector_indexing_proxy_task(
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP,
|
||||
soft_time_limit=300,
|
||||
bind=True,
|
||||
)
|
||||
def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
|
||||
def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
|
||||
"""Clean up old checkpoints that are older than 7 days."""
|
||||
locked = False
|
||||
redis_client = get_redis_client(tenant_id=tenant_id)
|
||||
@ -1256,14 +1258,15 @@ def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
|
||||
task_logger.info(
|
||||
f"Cleaning up checkpoint for index attempt {attempt.id}"
|
||||
)
|
||||
cleanup_checkpoint_task.apply_async(
|
||||
self.app.send_task(
|
||||
OnyxCeleryTask.CLEANUP_CHECKPOINT,
|
||||
kwargs={
|
||||
"index_attempt_id": attempt.id,
|
||||
"tenant_id": tenant_id,
|
||||
},
|
||||
queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP,
|
||||
priority=OnyxCeleryPriority.MEDIUM,
|
||||
)
|
||||
|
||||
except Exception:
|
||||
task_logger.exception("Unexpected exception during checkpoint cleanup")
|
||||
return None
|
||||
@ -1287,5 +1290,17 @@ def cleanup_checkpoint_task(
|
||||
self: Task, *, index_attempt_id: int, tenant_id: str | None
|
||||
) -> None:
|
||||
"""Clean up a checkpoint for a given index attempt"""
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
cleanup_checkpoint(db_session, index_attempt_id)
|
||||
|
||||
start = time.monotonic()
|
||||
|
||||
try:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
cleanup_checkpoint(db_session, index_attempt_id)
|
||||
finally:
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
task_logger.info(
|
||||
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} "
|
||||
f"index_attempt_id={index_attempt_id} "
|
||||
f"elapsed={elapsed:.2f}"
|
||||
)
|
||||
|
@ -4,6 +4,7 @@ from collections.abc import Callable
|
||||
from datetime import timedelta
|
||||
from itertools import islice
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Literal
|
||||
|
||||
import psutil
|
||||
@ -998,3 +999,50 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None:
|
||||
|
||||
except Exception:
|
||||
task_logger.exception("Error in monitor_process_memory task")
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX, ignore_result=True, bind=True
|
||||
)
|
||||
def cloud_monitor_celery_pidbox(
|
||||
self: Task,
|
||||
) -> None:
|
||||
"""
|
||||
Celery can leave behind orphaned pidboxes from old workers that are idle and never cleaned up.
|
||||
This task removes them based on idle time to avoid Redis clutter and overflowing the instance.
|
||||
This is a real issue we've observed in production.
|
||||
|
||||
Note:
|
||||
- Setting CELERY_ENABLE_REMOTE_CONTROL = False would prevent pidbox keys entirely,
|
||||
but might also disable features like inspect, broadcast, and worker remote control.
|
||||
Use with caution.
|
||||
"""
|
||||
|
||||
num_deleted = 0
|
||||
|
||||
MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds
|
||||
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
|
||||
for key in r_celery.scan_iter("*.reply.celery.pidbox"):
|
||||
key_bytes = cast(bytes, key)
|
||||
key_str = key_bytes.decode("utf-8")
|
||||
if key_str.startswith("_kombu"):
|
||||
continue
|
||||
|
||||
idletime_raw = r_celery.object("idletime", key)
|
||||
if idletime_raw is None:
|
||||
continue
|
||||
|
||||
idletime = cast(int, idletime_raw)
|
||||
if idletime < MAX_PIDBOX_IDLE:
|
||||
continue
|
||||
|
||||
r_celery.delete(key)
|
||||
task_logger.info(
|
||||
f"Deleted idle pidbox: pidbox={key_str} "
|
||||
f"idletime={idletime} "
|
||||
f"max_idletime={MAX_PIDBOX_IDLE}"
|
||||
)
|
||||
num_deleted += 1
|
||||
|
||||
# Enable later in case we want some aggregate metrics
|
||||
# task_logger.info(f"Deleted idle pidbox: pidbox={key_str}")
|
||||
|
@ -35,7 +35,7 @@ _TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
|
||||
name=OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
|
||||
queue=OnyxCeleryQueues.MONITORING,
|
||||
ignore_result=True,
|
||||
soft_time_limit=JOB_TIMEOUT,
|
||||
|
@ -398,7 +398,12 @@ class OnyxCeleryTask:
|
||||
CLOUD_MONITOR_CELERY_QUEUES = (
|
||||
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues"
|
||||
)
|
||||
CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
|
||||
CLOUD_CHECK_AVAILABLE_TENANTS = (
|
||||
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
|
||||
)
|
||||
CLOUD_MONITOR_CELERY_PIDBOX = (
|
||||
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_pidbox"
|
||||
)
|
||||
|
||||
# Tenant pre-provisioning
|
||||
PRE_PROVISION_TENANT = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_pre_provision_tenant"
|
||||
|
Loading…
x
Reference in New Issue
Block a user