mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-28 21:05:17 +02:00
add detailed timings to monitor vespa sync
This commit is contained in:
@@ -761,6 +761,10 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
Returns True if the task actually did work, False if it exited early to prevent overlap
|
Returns True if the task actually did work, False if it exited early to prevent overlap
|
||||||
"""
|
"""
|
||||||
time_start = time.monotonic()
|
time_start = time.monotonic()
|
||||||
|
|
||||||
|
timings: dict[str, float] = {}
|
||||||
|
timings["start"] = time_start
|
||||||
|
|
||||||
r = get_redis_client(tenant_id=tenant_id)
|
r = get_redis_client(tenant_id=tenant_id)
|
||||||
|
|
||||||
lock_beat: RedisLock = r.lock(
|
lock_beat: RedisLock = r.lock(
|
||||||
@@ -812,20 +816,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
f"permissions_upsert={n_permissions_upsert} "
|
f"permissions_upsert={n_permissions_upsert} "
|
||||||
)
|
)
|
||||||
|
|
||||||
|
timings["queues"] = time.monotonic() - timings["start"]
|
||||||
|
|
||||||
# scan and monitor activity to completion
|
# scan and monitor activity to completion
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
|
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
|
||||||
monitor_connector_taskset(r)
|
monitor_connector_taskset(r)
|
||||||
|
|
||||||
|
timings["connector"] = time.monotonic() - timings["queues"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
|
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
|
||||||
|
|
||||||
|
timings["connector_deletion"] = time.monotonic() - timings["connector"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
|
timings["document_set"] = time.monotonic() - timings["connector_deletion"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
|
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
|
||||||
@@ -836,21 +848,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
|
timings["usergroup"] = time.monotonic() - timings["document_set"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
|
timings["pruning"] = time.monotonic() - timings["usergroup"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
|
timings["indexing"] = time.monotonic() - timings["pruning"]
|
||||||
|
|
||||||
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
|
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
with get_session_with_tenant(tenant_id) as db_session:
|
with get_session_with_tenant(tenant_id) as db_session:
|
||||||
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
|
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
|
||||||
|
|
||||||
|
timings["permissions"] = time.monotonic() - timings["indexing"]
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
"Soft time limit exceeded, task is being terminated gracefully."
|
"Soft time limit exceeded, task is being terminated gracefully."
|
||||||
@@ -858,6 +877,21 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
|||||||
finally:
|
finally:
|
||||||
if lock_beat.owned():
|
if lock_beat.owned():
|
||||||
lock_beat.release()
|
lock_beat.release()
|
||||||
|
else:
|
||||||
|
t = timings
|
||||||
|
task_logger.error(
|
||||||
|
"monitor_vespa_sync - Lock not owned on completion: "
|
||||||
|
f"tenant={tenant_id} "
|
||||||
|
f"queues={t.get('queues')} "
|
||||||
|
f"connector={t.get('connector')} "
|
||||||
|
f"connector_deletion={t.get('connector_deletion')} "
|
||||||
|
f"document_set={t.get('document_set')} "
|
||||||
|
f"usergroup={t.get('usergroup')} "
|
||||||
|
f"pruning={t.get('pruning')} "
|
||||||
|
f"indexing={t.get('indexing')} "
|
||||||
|
f"permissions={t.get('permissions')}"
|
||||||
|
)
|
||||||
|
redis_lock_dump(lock_beat, r)
|
||||||
|
|
||||||
time_elapsed = time.monotonic() - time_start
|
time_elapsed = time.monotonic() - time_start
|
||||||
task_logger.debug(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}")
|
task_logger.debug(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}")
|
||||||
|
@@ -282,7 +282,7 @@ def redis_lock_dump(lock: RedisLock, r: Redis) -> None:
|
|||||||
remote_token = None
|
remote_token = None
|
||||||
|
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"RedisLock diagnostic logging: "
|
f"RedisLock diagnostic: "
|
||||||
f"name={name} "
|
f"name={name} "
|
||||||
f"locked={locked} "
|
f"locked={locked} "
|
||||||
f"owned={owned} "
|
f"owned={owned} "
|
||||||
|
Reference in New Issue
Block a user