Merge pull request #3607 from onyx-dot-app/bugfix/locking_redux

add detailed timings to monitor vespa sync
This commit is contained in:
rkuo-danswer
2025-01-05 20:03:17 -08:00
committed by GitHub
3 changed files with 41 additions and 4 deletions

View File

@@ -89,12 +89,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.wait_for_db(sender, **kwargs) app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs)
logger.info("Running as the primary celery worker.")
# Less startup checks in multi-tenant case # Less startup checks in multi-tenant case
if MULTI_TENANT: if MULTI_TENANT:
return return
logger.info("Running as the primary celery worker.")
# This is singleton work that should be done on startup exactly once # This is singleton work that should be done on startup exactly once
# by the primary worker. This is unnecessary in the multi tenant scenario # by the primary worker. This is unnecessary in the multi tenant scenario
r = get_redis_client(tenant_id=None) r = get_redis_client(tenant_id=None)

View File

@@ -760,7 +760,13 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
Returns True if the task actually did work, False if it exited early to prevent overlap Returns True if the task actually did work, False if it exited early to prevent overlap
""" """
task_logger.info(f"monitor_vespa_sync starting: tenant={tenant_id}")
time_start = time.monotonic() time_start = time.monotonic()
timings: dict[str, float] = {}
timings["start"] = time_start
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock( lock_beat: RedisLock = r.lock(
@@ -771,6 +777,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
try: try:
# prevent overlapping tasks # prevent overlapping tasks
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
task_logger.info("monitor_vespa_sync exiting due to overlap")
return False return False
# print current queue lengths # print current queue lengths
@@ -812,20 +819,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
f"permissions_upsert={n_permissions_upsert} " f"permissions_upsert={n_permissions_upsert} "
) )
timings["queues"] = time.monotonic() - timings["start"]
# scan and monitor activity to completion # scan and monitor activity to completion
lock_beat.reacquire() lock_beat.reacquire()
if r.exists(RedisConnectorCredentialPair.get_fence_key()): if r.exists(RedisConnectorCredentialPair.get_fence_key()):
monitor_connector_taskset(r) monitor_connector_taskset(r)
timings["connector"] = time.monotonic() - timings["queues"]
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
monitor_connector_deletion_taskset(tenant_id, key_bytes, r) monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
timings["connector_deletion"] = time.monotonic() - timings["connector"]
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
timings["document_set"] = time.monotonic() - timings["connector_deletion"]
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback( monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
@@ -836,21 +851,28 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
timings["usergroup"] = time.monotonic() - timings["document_set"]
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
timings["pruning"] = time.monotonic() - timings["usergroup"]
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
timings["indexing"] = time.monotonic() - timings["pruning"]
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"): for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
lock_beat.reacquire() lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session) monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
timings["permissions"] = time.monotonic() - timings["indexing"]
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
task_logger.info( task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
@@ -858,9 +880,24 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
else:
t = timings
task_logger.error(
"monitor_vespa_sync - Lock not owned on completion: "
f"tenant={tenant_id} "
f"queues={t.get('queues')} "
f"connector={t.get('connector')} "
f"connector_deletion={t.get('connector_deletion')} "
f"document_set={t.get('document_set')} "
f"usergroup={t.get('usergroup')} "
f"pruning={t.get('pruning')} "
f"indexing={t.get('indexing')} "
f"permissions={t.get('permissions')}"
)
redis_lock_dump(lock_beat, r)
time_elapsed = time.monotonic() - time_start time_elapsed = time.monotonic() - time_start
task_logger.debug(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}") task_logger.info(f"monitor_vespa_sync finished: elapsed={time_elapsed:.2f}")
return True return True

View File

@@ -282,7 +282,7 @@ def redis_lock_dump(lock: RedisLock, r: Redis) -> None:
remote_token = None remote_token = None
logger.warning( logger.warning(
f"RedisLock diagnostic logging: " f"RedisLock diagnostic: "
f"name={name} " f"name={name} "
f"locked={locked} " f"locked={locked} "
f"owned={owned} " f"owned={owned} "