From 1cbc067483ffc9935b761b22c1d2977f01cabf5a Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 9 Oct 2024 13:37:34 -0700 Subject: [PATCH] print various celery queue lengths (#2729) * print various celery queue lengths * use the correct redis client * mypy ignore --- .../background/celery/tasks/vespa/tasks.py | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 3f347cbab..39b6f8a91 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session from danswer.access.access import get_access_for_document from danswer.background.celery.celery_app import celery_app from danswer.background.celery.celery_app import task_logger +from danswer.background.celery.celery_redis import celery_get_queue_length from danswer.background.celery.celery_redis import RedisConnectorCredentialPair from danswer.background.celery.celery_redis import RedisConnectorDeletion from danswer.background.celery.celery_redis import RedisConnectorPruning @@ -18,6 +19,7 @@ from danswer.background.celery.celery_redis import RedisDocumentSet from danswer.background.celery.celery_redis import RedisUserGroup from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from danswer.configs.constants import DanswerCeleryQueues from danswer.configs.constants import DanswerRedisLocks from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import mark_ccpair_as_pruned @@ -468,8 +470,8 @@ def monitor_ccpair_pruning_taskset( r.delete(rcp.fence_key) -@shared_task(name="monitor_vespa_sync", soft_time_limit=300) -def monitor_vespa_sync() -> None: +@shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True) +def monitor_vespa_sync(self: Task) -> None: """This is a celery beat task that monitors and finalizes metadata sync tasksets. It scans for fence values and then gets the counts of any associated tasksets. If the count is 0, that means all tasks finished and we should clean up. @@ -479,7 +481,7 @@ def monitor_vespa_sync() -> None: """ r = get_redis_client() - lock_beat = r.lock( + lock_beat: redis.lock.Lock = r.lock( DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, ) @@ -489,16 +491,37 @@ def monitor_vespa_sync() -> None: if not lock_beat.acquire(blocking=False): return + # print current queue lengths + r_celery = self.app.broker_connection().channel().client # type: ignore + n_celery = celery_get_queue_length("celery", r) + n_sync = celery_get_queue_length( + DanswerCeleryQueues.VESPA_METADATA_SYNC, r_celery + ) + n_deletion = celery_get_queue_length( + DanswerCeleryQueues.CONNECTOR_DELETION, r_celery + ) + n_pruning = celery_get_queue_length( + DanswerCeleryQueues.CONNECTOR_PRUNING, r_celery + ) + + task_logger.info( + f"Queue lengths: celery={n_celery} sync={n_sync} deletion={n_deletion} pruning={n_pruning}" + ) + + lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): monitor_connector_taskset(r) + lock_beat.reacquire() for key_bytes in r.scan_iter(RedisConnectorDeletion.FENCE_PREFIX + "*"): monitor_connector_deletion_taskset(key_bytes, r) with Session(get_sqlalchemy_engine()) as db_session: + lock_beat.reacquire() for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"): monitor_document_set_taskset(key_bytes, r, db_session) + lock_beat.reacquire() for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"): monitor_usergroup_taskset = ( fetch_versioned_implementation_with_fallback( @@ -509,6 +532,7 @@ def monitor_vespa_sync() -> None: ) monitor_usergroup_taskset(key_bytes, r, db_session) + lock_beat.reacquire() for key_bytes in r.scan_iter(RedisConnectorPruning.FENCE_PREFIX + "*"): monitor_ccpair_pruning_taskset(key_bytes, r, db_session)