diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 62ea96658b36..caa697f88371 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -89,12 +89,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: app_base.wait_for_db(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs) + logger.info("Running as the primary celery worker.") + # Less startup checks in multi-tenant case if MULTI_TENANT: return - logger.info("Running as the primary celery worker.") - # This is singleton work that should be done on startup exactly once # by the primary worker. This is unnecessary in the multi tenant scenario r = get_redis_client(tenant_id=None) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 57d8793eadcc..c3c8c42abcda 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -760,7 +760,13 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: Returns True if the task actually did work, False if it exited early to prevent overlap """ + task_logger.info(f"monitor_vespa_sync starting: tenant={tenant_id}") + time_start = time.monotonic() + + timings: dict[str, float] = {} + timings["start"] = time_start + r = get_redis_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( @@ -771,6 +777,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: try: # prevent overlapping tasks if not lock_beat.acquire(blocking=False): + task_logger.info("monitor_vespa_sync exiting due to overlap") return False # print current queue lengths @@ -812,20 +819,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: f"permissions_upsert={n_permissions_upsert} " ) + timings["queues"] = time.monotonic() - timings["start"] + # scan and monitor activity to completion lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): monitor_connector_taskset(r) + timings["connector"] = time.monotonic() - timings["queues"] + for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"): lock_beat.reacquire() monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + timings["connector_deletion"] = time.monotonic() - timings["connector"] + for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) + timings["document_set"] = time.monotonic() - timings["connector_deletion"] + for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"): lock_beat.reacquire() monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback( @@ -836,21 +851,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: with get_session_with_tenant(tenant_id) as db_session: monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + timings["usergroup"] = time.monotonic() - timings["document_set"] + for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) + timings["pruning"] = time.monotonic() - timings["usergroup"] + for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) + timings["indexing"] = time.monotonic() - timings["pruning"] + for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session) + timings["permissions"] = time.monotonic() - timings["indexing"] except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -858,9 +880,24 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: finally: if lock_beat.owned(): lock_beat.release() + else: + t = timings + task_logger.error( + "monitor_vespa_sync - Lock not owned on completion: " + f"tenant={tenant_id} " + f"queues={t.get('queues')} " + f"connector={t.get('connector')} " + f"connector_deletion={t.get('connector_deletion')} " + f"document_set={t.get('document_set')} " + f"usergroup={t.get('usergroup')} " + f"pruning={t.get('pruning')} " + f"indexing={t.get('indexing')} " + f"permissions={t.get('permissions')}" + ) + redis_lock_dump(lock_beat, r) time_elapsed = time.monotonic() - time_start - task_logger.debug(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}") + task_logger.info(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}") return True diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index e253caaf00ec..acca2db8a567 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -282,7 +282,7 @@ def redis_lock_dump(lock: RedisLock, r: Redis) -> None: remote_token = None logger.warning( - f"RedisLock diagnostic logging: " + f"RedisLock diagnostic: " f"name={name} " f"locked={locked} " f"owned={owned} "