diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index c3c8c42ab..a286f6657 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -1,3 +1,4 @@ +import random import time import traceback from datetime import datetime @@ -77,6 +78,7 @@ from onyx.utils.variable_functionality import ( ) from onyx.utils.variable_functionality import global_version from onyx.utils.variable_functionality import noop_fallback +from shared_configs.configs import MULTI_TENANT logger = setup_logger() @@ -781,66 +783,72 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: return False # print current queue lengths - r_celery = self.app.broker_connection().channel().client # type: ignore - n_celery = celery_get_queue_length("celery", r_celery) - n_indexing = celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery - ) - n_sync = celery_get_queue_length(OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery) - n_deletion = celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_DELETION, r_celery - ) - n_pruning = celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery - ) - n_permissions_sync = celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery - ) - n_external_group_sync = celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery - ) - n_permissions_upsert = celery_get_queue_length( - OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery - ) + phase_start = time.monotonic() + # we don't need every tenant polling redis for this info. + if not MULTI_TENANT or random.randint(1, 100) == 100: + r_celery = self.app.broker_connection().channel().client # type: ignore + n_celery = celery_get_queue_length("celery", r_celery) + n_indexing = celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery + ) + n_sync = celery_get_queue_length( + OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery + ) + n_deletion = celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_DELETION, r_celery + ) + n_pruning = celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery + ) + n_permissions_sync = celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery + ) + n_external_group_sync = celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery + ) + n_permissions_upsert = celery_get_queue_length( + OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery + ) - prefetched = celery_get_unacked_task_ids( - OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery - ) + prefetched = celery_get_unacked_task_ids( + OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery + ) - task_logger.info( - f"Queue lengths: celery={n_celery} " - f"indexing={n_indexing} " - f"indexing_prefetched={len(prefetched)} " - f"sync={n_sync} " - f"deletion={n_deletion} " - f"pruning={n_pruning} " - f"permissions_sync={n_permissions_sync} " - f"external_group_sync={n_external_group_sync} " - f"permissions_upsert={n_permissions_upsert} " - ) - - timings["queues"] = time.monotonic() - timings["start"] + task_logger.info( + f"Queue lengths: celery={n_celery} " + f"indexing={n_indexing} " + f"indexing_prefetched={len(prefetched)} " + f"sync={n_sync} " + f"deletion={n_deletion} " + f"pruning={n_pruning} " + f"permissions_sync={n_permissions_sync} " + f"external_group_sync={n_external_group_sync} " + f"permissions_upsert={n_permissions_upsert} " + ) + timings["queues"] = time.monotonic() - phase_start # scan and monitor activity to completion + phase_start = time.monotonic() lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): monitor_connector_taskset(r) + timings["connector"] = time.monotonic() - phase_start - timings["connector"] = time.monotonic() - timings["queues"] - + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"): lock_beat.reacquire() monitor_connector_deletion_taskset(tenant_id, key_bytes, r) - timings["connector_deletion"] = time.monotonic() - timings["connector"] + timings["connector_deletion"] = time.monotonic() - phase_start + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) + timings["document_set"] = time.monotonic() - phase_start - timings["document_set"] = time.monotonic() - timings["connector_deletion"] - + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"): lock_beat.reacquire() monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback( @@ -850,29 +858,29 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: ) with get_session_with_tenant(tenant_id) as db_session: monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + timings["usergroup"] = time.monotonic() - phase_start - timings["usergroup"] = time.monotonic() - timings["document_set"] - + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) + timings["pruning"] = time.monotonic() - phase_start - timings["pruning"] = time.monotonic() - timings["usergroup"] - + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) + timings["indexing"] = time.monotonic() - phase_start - timings["indexing"] = time.monotonic() - timings["pruning"] - + phase_start = time.monotonic() for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"): lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session) - timings["permissions"] = time.monotonic() - timings["indexing"] + timings["permissions"] = time.monotonic() - phase_start except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully."