This commit is contained in:
Richard Kuo (Danswer) 2025-01-29 22:52:21 -08:00
parent 95701db1bd
commit 7ccfe85ee5

View File

@ -5,7 +5,6 @@ from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from http import HTTPStatus
from typing import Any
from typing import cast
import httpx
@ -78,9 +77,7 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_usergroup import RedisUserGroup
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
@ -820,8 +817,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
time_start = time.monotonic()
timings: dict[str, Any] = {}
timings["start"] = time_start
# timings: dict[str, Any] = {}
# timings["start"] = time_start
r = get_redis_client(tenant_id=tenant_id)
@ -834,7 +831,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
# then read from the replica. In this case, monitoring work could be done on a fence
# that no longer exists. To avoid this, we scan from the replica, but double check
# the result on the master.
r_replica = get_redis_replica_client(tenant_id=tenant_id)
# r_replica = get_redis_replica_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK,
@ -847,7 +844,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
try:
# print current queue lengths
phase_start = time.monotonic()
time.monotonic()
# we don't need every tenant polling redis for this info.
if not MULTI_TENANT or random.randint(1, 10) == 10:
r_celery = self.app.broker_connection().channel().client # type: ignore
@ -889,102 +886,151 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
f"external_group_sync={n_external_group_sync} "
f"permissions_upsert={n_permissions_upsert} "
)
timings["queues"] = time.monotonic() - phase_start
timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# timings["queues"] = time.monotonic() - phase_start
# timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
keys = r.smembers("active_fences")
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if key_str == RedisConnectorCredentialPair.get_fence_key():
if r.exists(key_str):
monitor_connector_taskset(r)
elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
if r.exists(key_str):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX):
if r.exists(key_str):
with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisUserGroup.FENCE_PREFIX):
if r.exists(key_str):
monitor_usergroup_taskset = (
fetch_versioned_implementation_with_fallback(
"onyx.background.celery.tasks.vespa.tasks",
"monitor_usergroup_taskset",
noop_fallback,
)
)
with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
if r.exists(key_str):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
if r.exists(key_str):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
if r.exists(key_str):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
else:
pass
# scan and monitor activity to completion
phase_start = time.monotonic()
lock_beat.reacquire()
if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()):
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
monitor_connector_taskset(r)
timings["connector"] = time.monotonic() - phase_start
timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# phase_start = time.monotonic()
# lock_beat.reacquire()
# if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()):
# if r.exists(RedisConnectorCredentialPair.get_fence_key()):
# monitor_connector_taskset(r)
# timings["connector"] = time.monotonic() - phase_start
# timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
lock_beat.reacquire()
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
# ):
# if r.exists(key_bytes):
# monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
# lock_beat.reacquire()
timings["connector_deletion"] = time.monotonic() - phase_start
timings["connector_deletion_ttl"] = r.ttl(
OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK
)
# timings["connector_deletion"] = time.monotonic() - phase_start
# timings["connector_deletion_ttl"] = r.ttl(
# OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK
# )
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["documentset"] = time.monotonic() - phase_start
timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
# ):
# if r.exists(key_bytes):
# with get_session_with_tenant(tenant_id) as db_session:
# monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
# lock_beat.reacquire()
# timings["documentset"] = time.monotonic() - phase_start
# timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
monitor_usergroup_taskset = (
fetch_versioned_implementation_with_fallback(
"onyx.background.celery.tasks.vespa.tasks",
"monitor_usergroup_taskset",
noop_fallback,
)
)
with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["usergroup"] = time.monotonic() - phase_start
timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
# ):
# if r.exists(key_bytes):
# monitor_usergroup_taskset = (
# fetch_versioned_implementation_with_fallback(
# "onyx.background.celery.tasks.vespa.tasks",
# "monitor_usergroup_taskset",
# noop_fallback,
# )
# )
# with get_session_with_tenant(tenant_id) as db_session:
# monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
# lock_beat.reacquire()
# timings["usergroup"] = time.monotonic() - phase_start
# timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["pruning"] = time.monotonic() - phase_start
timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
# ):
# if r.exists(key_bytes):
# with get_session_with_tenant(tenant_id) as db_session:
# monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
# lock_beat.reacquire()
# timings["pruning"] = time.monotonic() - phase_start
# timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["indexing"] = time.monotonic() - phase_start
timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
# ):
# if r.exists(key_bytes):
# with get_session_with_tenant(tenant_id) as db_session:
# monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
# lock_beat.reacquire()
# timings["indexing"] = time.monotonic() - phase_start
# timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
if r.exists(key_bytes):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
lock_beat.reacquire()
# phase_start = time.monotonic()
# lock_beat.reacquire()
# for key_bytes in r_replica.scan_iter(
# RedisConnectorPermissionSync.FENCE_PREFIX + "*",
# count=SCAN_ITER_COUNT_DEFAULT,
# ):
# if r.exists(key_bytes):
# with get_session_with_tenant(tenant_id) as db_session:
# monitor_ccpair_permissions_taskset(
# tenant_id, key_bytes, r, db_session
# )
# lock_beat.reacquire()
timings["permissions"] = time.monotonic() - phase_start
timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# timings["permissions"] = time.monotonic() - phase_start
# timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@ -995,8 +1041,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
else:
task_logger.error(
"monitor_vespa_sync - Lock not owned on completion: "
f"tenant={tenant_id} "
f"timings={timings}"
f"tenant={tenant_id}"
# f"timings={timings}"
)
redis_lock_dump(lock_beat, r)