mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-28 21:05:17 +02:00
fix timing calculations and don't spam the queue lengths check from every task
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import random
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -77,6 +78,7 @@ from onyx.utils.variable_functionality import (
|
|||||||
)
|
)
|
||||||
from onyx.utils.variable_functionality import global_version
|
from onyx.utils.variable_functionality import global_version
|
||||||
from onyx.utils.variable_functionality import noop_fallback
|
from onyx.utils.variable_functionality import noop_fallback
|
||||||
|
from shared_configs.configs import MULTI_TENANT
|
||||||
|
|
||||||
logger = setup_logger()
|
logger = setup_logger()
|
||||||
|
|
||||||
@@ -781,66 +783,72 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# print current queue lengths
|
# print current queue lengths
|
||||||
r_celery = self.app.broker_connection().channel().client # type: ignore
|
phase_start = time.monotonic()
|
||||||
n_celery = celery_get_queue_length("celery", r_celery)
|
# we don't need every tenant polling redis for this info.
|
||||||
n_indexing = celery_get_queue_length(
|
if not MULTI_TENANT or random.randint(1, 100) == 100:
|
||||||
OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
|
r_celery = self.app.broker_connection().channel().client # type: ignore
|
||||||
)
|
n_celery = celery_get_queue_length("celery", r_celery)
|
||||||
n_sync = celery_get_queue_length(OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery)
|
n_indexing = celery_get_queue_length(
|
||||||
n_deletion = celery_get_queue_length(
|
OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
|
||||||
OnyxCeleryQueues.CONNECTOR_DELETION, r_celery
|
)
|
||||||
)
|
n_sync = celery_get_queue_length(
|
||||||
n_pruning = celery_get_queue_length(
|
OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery
|
||||||
OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery
|
)
|
||||||
)
|
n_deletion = celery_get_queue_length(
|
||||||
n_permissions_sync = celery_get_queue_length(
|
OnyxCeleryQueues.CONNECTOR_DELETION, r_celery
|
||||||
OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery
|
)
|
||||||
)
|
n_pruning = celery_get_queue_length(
|
||||||
n_external_group_sync = celery_get_queue_length(
|
OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery
|
||||||
OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery
|
)
|
||||||
)
|
n_permissions_sync = celery_get_queue_length(
|
||||||
n_permissions_upsert = celery_get_queue_length(
|
OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery
|
||||||
OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery
|
)
|
||||||
)
|
n_external_group_sync = celery_get_queue_length(
|
||||||
|
OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery
|
||||||
|
)
|
||||||
|
n_permissions_upsert = celery_get_queue_length(
|
||||||
|
OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery
|
||||||
|
)
|
||||||
|
|
||||||
prefetched = celery_get_unacked_task_ids(
|
prefetched = celery_get_unacked_task_ids(
|
||||||
OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
|
OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
|
||||||
)
|
)
|
||||||
|
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
f"Queue lengths: celery={n_celery} "
|
f"Queue lengths: celery={n_celery} "
|
||||||
f"indexing={n_indexing} "
|
f"indexing={n_indexing} "
|
||||||
f"indexing_prefetched={len(prefetched)} "
|
f"indexing_prefetched={len(prefetched)} "
|
||||||
f"sync={n_sync} "
|
f"sync={n_sync} "
|
||||||
f"deletion={n_deletion} "
|
f"deletion={n_deletion} "
|
||||||
f"pruning={n_pruning} "
|
f"pruning={n_pruning} "
|
||||||
f"permissions_sync={n_permissions_sync} "
|
f"permissions_sync={n_permissions_sync} "
|
||||||
f"external_group_sync={n_external_group_sync} "
|
f"external_group_sync={n_external_group_sync} "
|
||||||
f"permissions_upsert={n_permissions_upsert} "
|
f"permissions_upsert={n_permissions_upsert} "
|
||||||
)
|
)
|
||||||
|
timings["queues"] = time.monotonic() - phase_start
|
||||||
timings["queues"] = time.monotonic() - timings["start"]
|
|
||||||
|
|
||||||
# scan and monitor activity to completion
|
# scan and monitor activity to completion
|
||||||
|
phase_start = time.monotonic()
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
|
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
|
||||||
monitor_connector_taskset(r)
|
monitor_connector_taskset(r)
|
||||||
|
timings["connector"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
timings["connector"] = time.monotonic() - timings["queues"]
|
phase_start = time.monotonic()
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
|
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
|
||||||
|
|
||||||
timings["connector_deletion"] = time.monotonic() - timings["connector"]
|
timings["connector_deletion"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
|
phase_start = time.monotonic()
|
||||||
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
timings["document_set"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
timings["document_set"] = time.monotonic() - timings["connector_deletion"]
|
phase_start = time.monotonic()
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
|
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
|
||||||
@@ -850,29 +858,29 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
)
|
)
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
timings["usergroup"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
timings["usergroup"] = time.monotonic() - timings["document_set"]
|
phase_start = time.monotonic()
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
timings["pruning"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
timings["pruning"] = time.monotonic() - timings["usergroup"]
|
phase_start = time.monotonic()
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
timings["indexing"] = time.monotonic() - phase_start
|
||||||
|
|
||||||
timings["indexing"] = time.monotonic() - timings["pruning"]
|
phase_start = time.monotonic()
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
timings["permissions"] = time.monotonic() - timings["indexing"]
|
timings["permissions"] = time.monotonic() - phase_start
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
"Soft time limit exceeded, task is being terminated gracefully."
|
"Soft time limit exceeded, task is being terminated gracefully."
|
||||||
|
Reference in New Issue
Block a user