From 7ccfe85ee5f1d6800f3256557fd57d727ff9502f Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Wed, 29 Jan 2025 22:52:21 -0800 Subject: [PATCH 01/12] WIP --- .../background/celery/tasks/vespa/tasks.py | 236 +++++++++++------- 1 file changed, 141 insertions(+), 95 deletions(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 7152903bf..ca5941c75 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -5,7 +5,6 @@ from collections.abc import Callable from datetime import datetime from datetime import timezone from http import HTTPStatus -from typing import Any from typing import cast import httpx @@ -78,9 +77,7 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_prune import RedisConnectorPrune from onyx.redis.redis_document_set import RedisDocumentSet from onyx.redis.redis_pool import get_redis_client -from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.redis.redis_usergroup import RedisUserGroup from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import fetch_versioned_implementation @@ -820,8 +817,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: time_start = time.monotonic() - timings: dict[str, Any] = {} - timings["start"] = time_start + # timings: dict[str, Any] = {} + # timings["start"] = time_start r = get_redis_client(tenant_id=tenant_id) @@ -834,7 +831,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # then read from the replica. In this case, monitoring work could be done on a fence # that no longer exists. To avoid this, we scan from the replica, but double check # the result on the master. - r_replica = get_redis_replica_client(tenant_id=tenant_id) + # r_replica = get_redis_replica_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK, @@ -847,7 +844,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: try: # print current queue lengths - phase_start = time.monotonic() + time.monotonic() # we don't need every tenant polling redis for this info. if not MULTI_TENANT or random.randint(1, 10) == 10: r_celery = self.app.broker_connection().channel().client # type: ignore @@ -889,102 +886,151 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: f"external_group_sync={n_external_group_sync} " f"permissions_upsert={n_permissions_upsert} " ) - timings["queues"] = time.monotonic() - phase_start - timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # timings["queues"] = time.monotonic() - phase_start + # timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + + keys = r.smembers("active_fences") + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + + if key_str == RedisConnectorCredentialPair.get_fence_key(): + if r.exists(key_str): + monitor_connector_taskset(r) + elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): + if r.exists(key_str): + monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX): + if r.exists(key_str): + with get_session_with_tenant(tenant_id) as db_session: + monitor_document_set_taskset( + tenant_id, key_bytes, r, db_session + ) + elif key_str.startswith(RedisUserGroup.FENCE_PREFIX): + if r.exists(key_str): + monitor_usergroup_taskset = ( + fetch_versioned_implementation_with_fallback( + "onyx.background.celery.tasks.vespa.tasks", + "monitor_usergroup_taskset", + noop_fallback, + ) + ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): + if r.exists(key_str): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_pruning_taskset( + tenant_id, key_bytes, r, db_session + ) + elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): + if r.exists(key_str): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_indexing_taskset( + tenant_id, key_bytes, r, db_session + ) + elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): + if r.exists(key_str): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_permissions_taskset( + tenant_id, key_bytes, r, db_session + ) + else: + pass # scan and monitor activity to completion - phase_start = time.monotonic() - lock_beat.reacquire() - if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()): - if r.exists(RedisConnectorCredentialPair.get_fence_key()): - monitor_connector_taskset(r) - timings["connector"] = time.monotonic() - phase_start - timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # phase_start = time.monotonic() + # lock_beat.reacquire() + # if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()): + # if r.exists(RedisConnectorCredentialPair.get_fence_key()): + # monitor_connector_taskset(r) + # timings["connector"] = time.monotonic() - phase_start + # timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - if r.exists(key_bytes): - monitor_connector_deletion_taskset(tenant_id, key_bytes, r) - lock_beat.reacquire() + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + # ): + # if r.exists(key_bytes): + # monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + # lock_beat.reacquire() - timings["connector_deletion"] = time.monotonic() - phase_start - timings["connector_deletion_ttl"] = r.ttl( - OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK - ) + # timings["connector_deletion"] = time.monotonic() - phase_start + # timings["connector_deletion_ttl"] = r.ttl( + # OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK + # ) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - if r.exists(key_bytes): - with get_session_with_tenant(tenant_id) as db_session: - monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) - lock_beat.reacquire() - timings["documentset"] = time.monotonic() - phase_start - timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + # ): + # if r.exists(key_bytes): + # with get_session_with_tenant(tenant_id) as db_session: + # monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) + # lock_beat.reacquire() + # timings["documentset"] = time.monotonic() - phase_start + # timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - if r.exists(key_bytes): - monitor_usergroup_taskset = ( - fetch_versioned_implementation_with_fallback( - "onyx.background.celery.tasks.vespa.tasks", - "monitor_usergroup_taskset", - noop_fallback, - ) - ) - with get_session_with_tenant(tenant_id) as db_session: - monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) - lock_beat.reacquire() - timings["usergroup"] = time.monotonic() - phase_start - timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + # ): + # if r.exists(key_bytes): + # monitor_usergroup_taskset = ( + # fetch_versioned_implementation_with_fallback( + # "onyx.background.celery.tasks.vespa.tasks", + # "monitor_usergroup_taskset", + # noop_fallback, + # ) + # ) + # with get_session_with_tenant(tenant_id) as db_session: + # monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + # lock_beat.reacquire() + # timings["usergroup"] = time.monotonic() - phase_start + # timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - if r.exists(key_bytes): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) - lock_beat.reacquire() - timings["pruning"] = time.monotonic() - phase_start - timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + # ): + # if r.exists(key_bytes): + # with get_session_with_tenant(tenant_id) as db_session: + # monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) + # lock_beat.reacquire() + # timings["pruning"] = time.monotonic() - phase_start + # timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - if r.exists(key_bytes): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) - lock_beat.reacquire() - timings["indexing"] = time.monotonic() - phase_start - timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + # ): + # if r.exists(key_bytes): + # with get_session_with_tenant(tenant_id) as db_session: + # monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) + # lock_beat.reacquire() + # timings["indexing"] = time.monotonic() - phase_start + # timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - phase_start = time.monotonic() - lock_beat.reacquire() - for key_bytes in r_replica.scan_iter( - RedisConnectorPermissionSync.FENCE_PREFIX + "*", - count=SCAN_ITER_COUNT_DEFAULT, - ): - if r.exists(key_bytes): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_permissions_taskset( - tenant_id, key_bytes, r, db_session - ) - lock_beat.reacquire() + # phase_start = time.monotonic() + # lock_beat.reacquire() + # for key_bytes in r_replica.scan_iter( + # RedisConnectorPermissionSync.FENCE_PREFIX + "*", + # count=SCAN_ITER_COUNT_DEFAULT, + # ): + # if r.exists(key_bytes): + # with get_session_with_tenant(tenant_id) as db_session: + # monitor_ccpair_permissions_taskset( + # tenant_id, key_bytes, r, db_session + # ) + # lock_beat.reacquire() - timings["permissions"] = time.monotonic() - phase_start - timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) + # timings["permissions"] = time.monotonic() - phase_start + # timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -995,8 +1041,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: else: task_logger.error( "monitor_vespa_sync - Lock not owned on completion: " - f"tenant={tenant_id} " - f"timings={timings}" + f"tenant={tenant_id}" + # f"timings={timings}" ) redis_lock_dump(lock_beat, r) From d8578bc1cb7c46ad020328d8430c7e2c147fff3b Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 30 Jan 2025 15:21:52 -0800 Subject: [PATCH 02/12] first full cut --- .../onyx/background/celery/apps/primary.py | 16 ++- .../background/celery/tasks/vespa/tasks.py | 99 +++++++++---------- backend/onyx/configs/constants.py | 4 + backend/onyx/redis/redis_connector.py | 2 + .../redis/redis_connector_credential_pair.py | 96 +++++++++++++++--- backend/onyx/redis/redis_connector_delete.py | 4 + .../redis/redis_connector_doc_perm_sync.py | 4 + backend/onyx/redis/redis_connector_index.py | 5 + backend/onyx/redis/redis_connector_prune.py | 4 + backend/onyx/redis/redis_document_set.py | 4 + backend/onyx/redis/redis_pool.py | 1 + backend/onyx/redis/redis_usergroup.py | 4 + 12 files changed, 168 insertions(+), 75 deletions(-) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 435a88e2c..5696d75bc 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -21,13 +21,16 @@ from onyx.background.celery.tasks.indexing.utils import ( get_unfenced_index_attempt_ids, ) from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME from onyx.db.engine import get_session_with_default_tenant from onyx.db.engine import SqlEngine from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import mark_attempt_canceled -from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair +from onyx.redis.redis_connector_credential_pair import ( + RedisGlobalConnectorCredentialPair, +) from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync @@ -141,23 +144,16 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: r.delete(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK) r.delete(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - r.delete(RedisConnectorCredentialPair.get_taskset_key()) - r.delete(RedisConnectorCredentialPair.get_fence_key()) + r.delete(OnyxRedisConstants.ACTIVE_FENCES) + RedisGlobalConnectorCredentialPair.reset_all(r) RedisDocumentSet.reset_all(r) - RedisUserGroup.reset_all(r) - RedisConnectorDelete.reset_all(r) - RedisConnectorPrune.reset_all(r) - RedisConnectorIndex.reset_all(r) - RedisConnectorStop.reset_all(r) - RedisConnectorPermissionSync.reset_all(r) - RedisConnectorExternalGroupSync.reset_all(r) # mark orphaned index attempts as failed diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index ca5941c75..3b509ac02 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -5,6 +5,7 @@ from collections.abc import Callable from datetime import datetime from datetime import timezone from http import HTTPStatus +from typing import Any from typing import cast import httpx @@ -35,6 +36,7 @@ from onyx.configs.app_configs import VESPA_SYNC_MAX_TASKS from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.db.connector import fetch_connector_by_id from onyx.db.connector_credential_pair import add_deletion_failure_message @@ -71,6 +73,9 @@ from onyx.document_index.interfaces import VespaDocumentFields from onyx.httpx.httpx_pool import HttpxPool from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair +from onyx.redis.redis_connector_credential_pair import ( + RedisGlobalConnectorCredentialPair, +) from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_index import RedisConnectorIndex @@ -201,10 +206,12 @@ def try_generate_stale_document_sync_tasks( tenant_id: str | None, ) -> int | None: # the fence is up, do nothing - if r.exists(RedisConnectorCredentialPair.get_fence_key()): + + redis_global_ccpair = RedisGlobalConnectorCredentialPair(r) + if redis_global_ccpair.fenced: return None - r.delete(RedisConnectorCredentialPair.get_taskset_key()) # delete the taskset + redis_global_ccpair.delete_taskset() # add tasks to celery and build up the task set to monitor in redis stale_doc_count = count_documents_by_needs_sync(db_session) @@ -262,7 +269,7 @@ def try_generate_stale_document_sync_tasks( f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}" ) - r.set(RedisConnectorCredentialPair.get_fence_key(), total_tasks_generated) + redis_global_ccpair.set_fence(total_tasks_generated) return total_tasks_generated @@ -413,23 +420,17 @@ def try_generate_user_group_sync_tasks( def monitor_connector_taskset(r: Redis) -> None: - fence_value = r.get(RedisConnectorCredentialPair.get_fence_key()) - if fence_value is None: + redis_global_ccpair = RedisGlobalConnectorCredentialPair(r) + initial_count = redis_global_ccpair.payload + if initial_count is None: return - try: - initial_count = int(cast(int, fence_value)) - except ValueError: - task_logger.error("The value is not an integer.") - return - - count = r.scard(RedisConnectorCredentialPair.get_taskset_key()) + remaining = redis_global_ccpair.get_remaining() task_logger.info( - f"Stale document sync progress: remaining={count} initial={initial_count}" + f"Stale document sync progress: remaining={remaining} initial={initial_count}" ) - if count == 0: - r.delete(RedisConnectorCredentialPair.get_taskset_key()) - r.delete(RedisConnectorCredentialPair.get_fence_key()) + if remaining == 0: + redis_global_ccpair.reset() task_logger.info(f"Successfully synced stale documents. count={initial_count}") @@ -889,52 +890,43 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # timings["queues"] = time.monotonic() - phase_start # timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - keys = r.smembers("active_fences") + keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) - key_str = key_bytes.decode("utf-8") - if key_str == RedisConnectorCredentialPair.get_fence_key(): - if r.exists(key_str): - monitor_connector_taskset(r) - elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): - if r.exists(key_str): - monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + if not r.exists(key_bytes): + r.srem(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) + continue + + key_str = key_bytes.decode("utf-8") + if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY: + monitor_connector_taskset(r) elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_document_set_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisUserGroup.FENCE_PREFIX): - if r.exists(key_str): - monitor_usergroup_taskset = ( - fetch_versioned_implementation_with_fallback( - "onyx.background.celery.tasks.vespa.tasks", - "monitor_usergroup_taskset", - noop_fallback, - ) + monitor_usergroup_taskset = ( + fetch_versioned_implementation_with_fallback( + "onyx.background.celery.tasks.vespa.tasks", + "monitor_usergroup_taskset", + noop_fallback, ) - with get_session_with_tenant(tenant_id) as db_session: - monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): + monitor_connector_deletion_taskset(tenant_id, key_bytes, r) elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_pruning_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_indexing_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_permissions_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_permissions_taskset( + tenant_id, key_bytes, r, db_session + ) else: pass @@ -1035,6 +1027,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." ) + return False finally: if lock_beat.owned(): lock_beat.release() diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 09a90a9ee..b4a47813d 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -302,6 +302,10 @@ class OnyxRedisSignals: VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" +class OnyxRedisConstants: + ACTIVE_FENCES = "active_fences" + + class OnyxCeleryPriority(int, Enum): HIGHEST = 0 HIGH = auto() diff --git a/backend/onyx/redis/redis_connector.py b/backend/onyx/redis/redis_connector.py index f55ff4a89..196f2306c 100644 --- a/backend/onyx/redis/redis_connector.py +++ b/backend/onyx/redis/redis_connector.py @@ -17,6 +17,8 @@ class RedisConnector: associated background tasks / associated redis interactions.""" def __init__(self, tenant_id: str | None, id: int) -> None: + """id: a connector credential pair id""" + self.tenant_id: str | None = tenant_id self.id: int = id self.redis: redis.Redis = get_redis_client(tenant_id=tenant_id) diff --git a/backend/onyx/redis/redis_connector_credential_pair.py b/backend/onyx/redis/redis_connector_credential_pair.py index e648bc563..52beefa5b 100644 --- a/backend/onyx/redis/redis_connector_credential_pair.py +++ b/backend/onyx/redis/redis_connector_credential_pair.py @@ -2,6 +2,7 @@ import time from typing import cast from uuid import uuid4 +import redis from celery import Celery from redis import Redis from redis.lock import Lock as RedisLock @@ -12,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import ( construct_document_select_for_connector_credential_pair_by_needs_sync, @@ -28,10 +30,9 @@ class RedisConnectorCredentialPair(RedisObjectHelper): all connectors and is not per connector.""" PREFIX = "connectorsync" - FENCE_PREFIX = PREFIX + "_fence" TASKSET_PREFIX = PREFIX + "_taskset" - SYNCING_PREFIX = PREFIX + ":vespa_syncing" + # SYNCING_PREFIX = PREFIX + ":vespa_syncing" def __init__(self, tenant_id: str | None, id: int) -> None: super().__init__(tenant_id, str(id)) @@ -39,10 +40,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): # documents that should be skipped self.skip_docs: set[str] = set() - @classmethod - def get_fence_key(cls) -> str: - return RedisConnectorCredentialPair.FENCE_PREFIX - @classmethod def get_taskset_key(cls) -> str: return RedisConnectorCredentialPair.TASKSET_PREFIX @@ -51,18 +48,18 @@ class RedisConnectorCredentialPair(RedisObjectHelper): def taskset_key(self) -> str: """Notice that this is intentionally reusing the same taskset for all connector syncs""" - # example: connector_taskset + # example: connectorsync_taskset return f"{self.TASKSET_PREFIX}" def set_skip_docs(self, skip_docs: set[str]) -> None: - # documents that should be skipped. Note that this classes updates + # documents that should be skipped. Note that this class updates # the list on the fly self.skip_docs = skip_docs - @staticmethod - def make_redis_syncing_key(doc_id: str) -> str: - """used to create a key in redis to block a doc from syncing""" - return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" + # @staticmethod + # def make_redis_syncing_key(doc_id: str) -> str: + # """used to create a key in redis to block a doc from syncing""" + # return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" def generate_tasks( self, @@ -148,3 +145,78 @@ class RedisConnectorCredentialPair(RedisObjectHelper): break return len(async_results), num_docs + + +class RedisGlobalConnectorCredentialPair: + """This class is used to scan documents by cc_pair in the db and collect them into + a unified set for syncing. + + It differs from the other redis helpers in that the taskset used spans + all connectors and is not per connector.""" + + PREFIX = "connectorsync" + FENCE_KEY = PREFIX + "_fence" + TASKSET_KEY = PREFIX + "_taskset" + + def __init__(self, redis: redis.Redis) -> None: + self.redis = redis + + @property + def fenced(self) -> bool: + if self.redis.exists(self.fence_key): + return True + + return False + + @property + def payload(self) -> int | None: + bytes = self.redis.get(self.fence_key) + if bytes is None: + return None + + progress = int(cast(int, bytes)) + return progress + + def get_remaining(self) -> int: + remaining = cast(int, self.redis.scard(self.taskset_key)) + return remaining + + @property + def fence_key(self) -> str: + """Notice that this is intentionally reusing the same fence for all + connector syncs""" + # example: connectorsync_fence + return f"{self.FENCE_KEY}" + + @property + def taskset_key(self) -> str: + """Notice that this is intentionally reusing the same taskset for all + connector syncs""" + # example: connectorsync_taskset + return f"{self.TASKSET_KEY}" + + def set_fence(self, payload: int | None) -> None: + if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.fence_key) + return + + self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + + def delete_taskset(self) -> None: + self.redis.delete(self.taskset_key) + + def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.taskset_key) + self.redis.delete(self.fence_key) + + @staticmethod + def reset_all(r: redis.Redis) -> None: + r.srem( + OnyxRedisConstants.ACTIVE_FENCES, + RedisGlobalConnectorCredentialPair.FENCE_KEY, + ) + r.delete(RedisGlobalConnectorCredentialPair.TASKSET_KEY) + r.delete(RedisGlobalConnectorCredentialPair.FENCE_KEY) diff --git a/backend/onyx/redis/redis_connector_delete.py b/backend/onyx/redis/redis_connector_delete.py index 17651bf66..3ac5edb72 100644 --- a/backend/onyx/redis/redis_connector_delete.py +++ b/backend/onyx/redis/redis_connector_delete.py @@ -14,6 +14,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import construct_document_select_for_connector_credential_pair from onyx.db.models import Document as DbDocument @@ -69,10 +70,12 @@ class RedisConnectorDelete: def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) def _generate_task_id(self) -> str: # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" @@ -136,6 +139,7 @@ class RedisConnectorDelete: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key) diff --git a/backend/onyx/redis/redis_connector_doc_perm_sync.py b/backend/onyx/redis/redis_connector_doc_perm_sync.py index 99f891e14..23b0b1657 100644 --- a/backend/onyx/redis/redis_connector_doc_perm_sync.py +++ b/backend/onyx/redis/redis_connector_doc_perm_sync.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT @@ -102,10 +103,12 @@ class RedisConnectorPermissionSync: payload: RedisConnectorPermissionSyncPayload | None, ) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def generator_complete(self) -> int | None: @@ -173,6 +176,7 @@ class RedisConnectorPermissionSync: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) diff --git a/backend/onyx/redis/redis_connector_index.py b/backend/onyx/redis/redis_connector_index.py index 5b62da7b6..bd766fc89 100644 --- a/backend/onyx/redis/redis_connector_index.py +++ b/backend/onyx/redis/redis_connector_index.py @@ -5,6 +5,8 @@ from uuid import uuid4 import redis from pydantic import BaseModel +from onyx.configs.constants import OnyxRedisConstants + class RedisConnectorIndexPayload(BaseModel): index_attempt_id: int | None @@ -103,10 +105,12 @@ class RedisConnectorIndex: payload: RedisConnectorIndexPayload | None, ) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) def terminating(self, celery_task_id: str) -> bool: if self.redis.exists(f"{self.terminate_key}_{celery_task_id}"): @@ -188,6 +192,7 @@ class RedisConnectorIndex: return status def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.active_key) self.redis.delete(self.generator_lock_key) self.redis.delete(self.generator_progress_key) diff --git a/backend/onyx/redis/redis_connector_prune.py b/backend/onyx/redis/redis_connector_prune.py index ea4a923eb..bc04cd516 100644 --- a/backend/onyx/redis/redis_connector_prune.py +++ b/backend/onyx/redis/redis_connector_prune.py @@ -11,6 +11,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT @@ -79,10 +80,12 @@ class RedisConnectorPrune: def set_fence(self, value: bool) -> None: if not value: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, 0) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def generator_complete(self) -> int | None: @@ -158,6 +161,7 @@ class RedisConnectorPrune: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) diff --git a/backend/onyx/redis/redis_document_set.py b/backend/onyx/redis/redis_document_set.py index aa219d6dd..c0c3ce2a0 100644 --- a/backend/onyx/redis/redis_document_set.py +++ b/backend/onyx/redis/redis_document_set.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.document_set import construct_document_select_by_docset from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper @@ -35,10 +36,12 @@ class RedisDocumentSet(RedisObjectHelper): def set_fence(self, payload: int | None) -> None: if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def payload(self) -> int | None: @@ -96,6 +99,7 @@ class RedisDocumentSet(RedisObjectHelper): return len(async_results), len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key) diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index 10a2c7655..146bc19a3 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -113,6 +113,7 @@ class TenantRedis(redis.Redis): "reacquire", "create_lock", "startswith", + "smembers", "sadd", "srem", "scard", diff --git a/backend/onyx/redis/redis_usergroup.py b/backend/onyx/redis/redis_usergroup.py index 00981e9a1..880806850 100644 --- a/backend/onyx/redis/redis_usergroup.py +++ b/backend/onyx/redis/redis_usergroup.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.utils.variable_functionality import fetch_versioned_implementation @@ -36,10 +37,12 @@ class RedisUserGroup(RedisObjectHelper): def set_fence(self, payload: int | None) -> None: if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def payload(self) -> int | None: @@ -109,6 +112,7 @@ class RedisUserGroup(RedisObjectHelper): return len(async_results), len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key) From 30e8fb12e4ffde929b541d239c26eb739281b059 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 30 Jan 2025 15:34:00 -0800 Subject: [PATCH 03/12] remove commented code --- .../background/celery/tasks/vespa/tasks.py | 96 ------------------- 1 file changed, 96 deletions(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 3b509ac02..dc5c64fff 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -887,8 +887,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: f"external_group_sync={n_external_group_sync} " f"permissions_upsert={n_permissions_upsert} " ) - # timings["queues"] = time.monotonic() - phase_start - # timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: @@ -929,100 +927,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: ) else: pass - - # scan and monitor activity to completion - # phase_start = time.monotonic() - # lock_beat.reacquire() - # if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()): - # if r.exists(RedisConnectorCredentialPair.get_fence_key()): - # monitor_connector_taskset(r) - # timings["connector"] = time.monotonic() - phase_start - # timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - # ): - # if r.exists(key_bytes): - # monitor_connector_deletion_taskset(tenant_id, key_bytes, r) - # lock_beat.reacquire() - - # timings["connector_deletion"] = time.monotonic() - phase_start - # timings["connector_deletion_ttl"] = r.ttl( - # OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK - # ) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - # ): - # if r.exists(key_bytes): - # with get_session_with_tenant(tenant_id) as db_session: - # monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) - # lock_beat.reacquire() - # timings["documentset"] = time.monotonic() - phase_start - # timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - # ): - # if r.exists(key_bytes): - # monitor_usergroup_taskset = ( - # fetch_versioned_implementation_with_fallback( - # "onyx.background.celery.tasks.vespa.tasks", - # "monitor_usergroup_taskset", - # noop_fallback, - # ) - # ) - # with get_session_with_tenant(tenant_id) as db_session: - # monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) - # lock_beat.reacquire() - # timings["usergroup"] = time.monotonic() - phase_start - # timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - # ): - # if r.exists(key_bytes): - # with get_session_with_tenant(tenant_id) as db_session: - # monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) - # lock_beat.reacquire() - # timings["pruning"] = time.monotonic() - phase_start - # timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - # ): - # if r.exists(key_bytes): - # with get_session_with_tenant(tenant_id) as db_session: - # monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) - # lock_beat.reacquire() - # timings["indexing"] = time.monotonic() - phase_start - # timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - - # phase_start = time.monotonic() - # lock_beat.reacquire() - # for key_bytes in r_replica.scan_iter( - # RedisConnectorPermissionSync.FENCE_PREFIX + "*", - # count=SCAN_ITER_COUNT_DEFAULT, - # ): - # if r.exists(key_bytes): - # with get_session_with_tenant(tenant_id) as db_session: - # monitor_ccpair_permissions_taskset( - # tenant_id, key_bytes, r, db_session - # ) - # lock_beat.reacquire() - - # timings["permissions"] = time.monotonic() - phase_start - # timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." From b64545c7c7b77d4155f7ff8f88ed360e7b1a61d2 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 31 Jan 2025 12:12:52 -0800 Subject: [PATCH 04/12] build a lookup table every so often to handle cloud migration --- .../background/celery/tasks/vespa/tasks.py | 39 ++++++++++++++++++- backend/onyx/configs/constants.py | 1 + 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index e7dde69a2..05bd30298 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -38,6 +38,7 @@ from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks +from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import fetch_connector_by_id from onyx.db.connector_credential_pair import add_deletion_failure_message from onyx.db.connector_credential_pair import ( @@ -82,7 +83,9 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_prune import RedisConnectorPrune from onyx.redis.redis_document_set import RedisDocumentSet from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump +from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.redis.redis_usergroup import RedisUserGroup from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import fetch_versioned_implementation @@ -832,7 +835,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # then read from the replica. In this case, monitoring work could be done on a fence # that no longer exists. To avoid this, we scan from the replica, but double check # the result on the master. - # r_replica = get_redis_replica_client(tenant_id=tenant_id) + r_replica = get_redis_replica_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK, @@ -888,6 +891,20 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: f"permissions_upsert={n_permissions_upsert} " ) + # we want to run this less frequently than the overall task + if not r.exists(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE): + # build a lookup table of existing fences + # this is just a migration concern and should be unnecessary once + # lookup tables are rolled out + for key_bytes in r_replica.scan_iter(count=SCAN_ITER_COUNT_DEFAULT): + if is_fence(key_bytes): + logger.warning(f"Adding {key_bytes} to the lookup table.") + r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) + + r.set(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE, 1, ex=120) + + # use a lookup table to find active fences. We still have to verify the fence + # exists since it is an optimization and not the source of truth. keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) @@ -1056,3 +1073,23 @@ def vespa_metadata_sync_task( self.retry(exc=e, countdown=countdown) return True + + +def is_fence(key_bytes: bytes) -> bool: + key_str = key_bytes.decode("utf-8") + if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY: + return True + if key_str.startswith(RedisDocumentSet.FENCE_PREFIX): + return True + if key_str.startswith(RedisUserGroup.FENCE_PREFIX): + return True + if key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): + return True + if key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): + return True + if key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): + return True + if key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): + return True + + return False diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index ad3703496..4fd13e6a0 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -302,6 +302,7 @@ class OnyxRedisSignals: VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = "signal:validate_external_group_sync_fences" VALIDATE_PERMISSION_SYNC_FENCES = "signal:validate_permission_sync_fences" + BUILD_FENCE_LOOKUP_TABLE = "signal:build_fence_lookup_table" class OnyxRedisConstants: From 69f16cc97211f32262c6d8ca403f7c9f499907b4 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 31 Jan 2025 13:23:52 -0800 Subject: [PATCH 05/12] dont add to the lookup table if it already exists --- backend/onyx/background/celery/tasks/vespa/tasks.py | 4 +++- backend/onyx/redis/redis_pool.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 05bd30298..27f1b8b9c 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -897,7 +897,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # this is just a migration concern and should be unnecessary once # lookup tables are rolled out for key_bytes in r_replica.scan_iter(count=SCAN_ITER_COUNT_DEFAULT): - if is_fence(key_bytes): + if is_fence(key_bytes) and not r.sismember( + OnyxRedisConstants.ACTIVE_FENCES, key_bytes + ): logger.warning(f"Adding {key_bytes} to the lookup table.") r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index 07fae38c5..dd2111178 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -114,6 +114,7 @@ class TenantRedis(redis.Redis): "create_lock", "startswith", "smembers", + "sismember", "sadd", "srem", "scard", From 618e4addd88700c82b0043bc4f7ac2f6edc8363f Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 31 Jan 2025 13:25:27 -0800 Subject: [PATCH 06/12] better signal names --- .../celery/tasks/doc_permission_syncing/tasks.py | 4 ++-- .../onyx/background/celery/tasks/indexing/tasks.py | 4 ++-- backend/onyx/background/celery/tasks/vespa/tasks.py | 4 ++-- backend/onyx/configs/constants.py | 12 ++++++++---- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 1791e5585..6d0b11dc5 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -158,7 +158,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool # we want to run this less frequently than the overall task lock_beat.reacquire() - if not r.exists(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES): + if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES): # clear any permission fences that don't have associated celery tasks in progress # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # or be currently executing @@ -169,7 +169,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool "Exception while validating permission sync fences" ) - r.set(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60) + r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 2e3c0e3ae..002582baa 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -224,7 +224,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: lock_beat.reacquire() # we want to run this less frequently than the overall task - if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES): + if not redis_client.exists(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES): # clear any indexing fences that don't have associated celery tasks in progress # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # or be currently executing @@ -235,7 +235,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: except Exception: task_logger.exception("Exception while validating indexing fences") - redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60) + redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 27f1b8b9c..76214c00b 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -892,7 +892,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: ) # we want to run this less frequently than the overall task - if not r.exists(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE): + if not r.exists(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE): # build a lookup table of existing fences # this is just a migration concern and should be unnecessary once # lookup tables are rolled out @@ -903,7 +903,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: logger.warning(f"Adding {key_bytes} to the lookup table.") r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) - r.set(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE, 1, ex=120) + r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=120) # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth. diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 4fd13e6a0..baca5265a 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -299,10 +299,14 @@ class OnyxRedisLocks: class OnyxRedisSignals: - VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" - VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = "signal:validate_external_group_sync_fences" - VALIDATE_PERMISSION_SYNC_FENCES = "signal:validate_permission_sync_fences" - BUILD_FENCE_LOOKUP_TABLE = "signal:build_fence_lookup_table" + BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences" + BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = ( + "signal:block_validate_external_group_sync_fences" + ) + BLOCK_VALIDATE_PERMISSION_SYNC_FENCES = ( + "signal:block_validate_permission_sync_fences" + ) + BLOCK_BUILD_FENCE_LOOKUP_TABLE = "signal:block_build_fence_lookup_table" class OnyxRedisConstants: From d3cf18160eaac32365daa84d719d7c0c4c6f9d3c Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 31 Jan 2025 16:13:13 -0800 Subject: [PATCH 07/12] lower CLOUD_BEAT_SCHEDULE_MULTIPLIER to 4 --- backend/onyx/background/celery/tasks/beat_schedule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index c2e1f163f..6db15cd83 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -18,7 +18,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 +CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 # tasks that only run in the cloud # the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered From 3a950721b98fc363d7bae9179af6ed9047c675a9 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Sun, 2 Feb 2025 01:14:10 -0800 Subject: [PATCH 08/12] get rid of some more scan_iter --- .../tasks/doc_permission_syncing/tasks.py | 23 ++++++++++++------- .../background/celery/tasks/indexing/utils.py | 18 +++++++++++---- .../background/celery/tasks/vespa/tasks.py | 2 +- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 6d0b11dc5..a4608762b 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -3,6 +3,7 @@ from datetime import datetime from datetime import timedelta from datetime import timezone from time import sleep +from typing import Any from typing import cast from uuid import uuid4 @@ -38,6 +39,7 @@ from onyx.configs.constants import DocumentSource from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_cc_pair_as_permissions_synced @@ -58,7 +60,6 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyn from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.server.utils import make_short_id from onyx.utils.logger import doc_permission_sync_ctx from onyx.utils.logger import LoggerContextVars @@ -169,7 +170,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool "Exception while validating permission sync fences" ) - r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60) + r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -506,12 +507,15 @@ def validate_permission_sync_fences( OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery ) - # validate all existing indexing jobs - for key_bytes in r.scan_iter( - RedisConnectorPermissionSync.FENCE_PREFIX + "*", - count=SCAN_ITER_COUNT_DEFAULT, - ): - lock_beat.reacquire() + # validate all existing permission sync jobs + lock_beat.reacquire() + keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): + continue + validate_permission_sync_fence( tenant_id, key_bytes, @@ -520,6 +524,9 @@ def validate_permission_sync_fences( r, r_celery, ) + + lock_beat.reacquire() + return diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py index e14e79b5f..0eee8a36f 100644 --- a/backend/onyx/background/celery/tasks/indexing/utils.py +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -1,6 +1,8 @@ import time from datetime import datetime from datetime import timezone +from typing import Any +from typing import cast import redis from celery import Celery @@ -19,6 +21,7 @@ from onyx.configs.constants import DocumentSource from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.engine import get_db_current_time from onyx.db.engine import get_session_with_tenant from onyx.db.enums import ConnectorCredentialPairStatus @@ -37,7 +40,6 @@ from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_index import RedisConnectorIndexPayload from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.utils.logger import setup_logger logger = setup_logger() @@ -304,10 +306,13 @@ def validate_indexing_fences( # Use replica for this because the worst thing that happens # is that we don't run the validation on this pass - for key_bytes in r_replica.scan_iter( - RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - lock_beat.reacquire() + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): + continue + with get_session_with_tenant(tenant_id) as db_session: validate_indexing_fence( tenant_id, @@ -316,6 +321,9 @@ def validate_indexing_fences( r_celery, db_session, ) + + lock_beat.reacquire() + return diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 76214c00b..e50c0fdfc 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -903,7 +903,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: logger.warning(f"Adding {key_bytes} to the lookup table.") r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) - r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=120) + r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=300) # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth. From fd947aadea493812e02a468b833b9e9593aef1dc Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 3 Feb 2025 00:32:23 -0800 Subject: [PATCH 09/12] slow down to 8 again --- backend/onyx/background/celery/tasks/beat_schedule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 6db15cd83..c2e1f163f 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -18,7 +18,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 +CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 # tasks that only run in the cloud # the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered From 6f018d75ee98df5f9594a1f34734b50caba96513 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 3 Feb 2025 10:10:05 -0800 Subject: [PATCH 10/12] use replica, remove some commented code --- .../celery/tasks/doc_permission_syncing/tasks.py | 9 +++++++-- .../onyx/background/celery/tasks/vespa/tasks.py | 9 --------- .../redis/redis_connector_credential_pair.py | 16 ---------------- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index a4608762b..a17be42dc 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -59,6 +59,7 @@ from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump from onyx.server.utils import make_short_id from onyx.utils.logger import doc_permission_sync_ctx @@ -124,6 +125,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool # we need to use celery's redis client to access its redis data # (which lives on a different db number) r = get_redis_client(tenant_id=tenant_id) + r_replica = get_redis_replica_client(tenant_id=tenant_id) r_celery: Redis = self.app.broker_connection().channel().client # type: ignore lock_beat: RedisLock = r.lock( @@ -164,7 +166,9 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # or be currently executing try: - validate_permission_sync_fences(tenant_id, r, r_celery, lock_beat) + validate_permission_sync_fences( + tenant_id, r, r_replica, r_celery, lock_beat + ) except Exception: task_logger.exception( "Exception while validating permission sync fences" @@ -487,6 +491,7 @@ def update_external_document_permissions_task( def validate_permission_sync_fences( tenant_id: str | None, r: Redis, + r_replica: Redis, r_celery: Redis, lock_beat: RedisLock, ) -> None: @@ -509,7 +514,7 @@ def validate_permission_sync_fences( # validate all existing permission sync jobs lock_beat.reacquire() - keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) key_str = key_bytes.decode("utf-8") diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index e50c0fdfc..af2d2aad7 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -1025,15 +1025,6 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) - # this code checks for and removes a per document sync key that is - # used to block out the same doc from continualy resyncing - # a quick hack that is only needed for production issues - # redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key( - # document_id - # ) - # r = get_redis_client(tenant_id=tenant_id) - # r.delete(redis_syncing_key) - elapsed = time.monotonic() - start task_logger.info( f"doc={document_id} " diff --git a/backend/onyx/redis/redis_connector_credential_pair.py b/backend/onyx/redis/redis_connector_credential_pair.py index 52beefa5b..db8d526c0 100644 --- a/backend/onyx/redis/redis_connector_credential_pair.py +++ b/backend/onyx/redis/redis_connector_credential_pair.py @@ -32,8 +32,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): PREFIX = "connectorsync" TASKSET_PREFIX = PREFIX + "_taskset" - # SYNCING_PREFIX = PREFIX + ":vespa_syncing" - def __init__(self, tenant_id: str | None, id: int) -> None: super().__init__(tenant_id, str(id)) @@ -56,11 +54,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): # the list on the fly self.skip_docs = skip_docs - # @staticmethod - # def make_redis_syncing_key(doc_id: str) -> str: - # """used to create a key in redis to block a doc from syncing""" - # return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" - def generate_tasks( self, max_tasks: int, @@ -108,15 +101,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): if doc.id in self.skip_docs: continue - # an arbitrary number in seconds to prevent the same doc from syncing repeatedly - # SYNC_EXPIRATION = 24 * 60 * 60 - - # a quick hack that can be uncommented to prevent a doc from resyncing over and over - # redis_syncing_key = self.make_redis_syncing_key(doc.id) - # if redis_client.exists(redis_syncing_key): - # continue - # redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION) - # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # we prefix the task id so it's easier to keep track of who created the task From 02a068a68b5929beb4061285c2edbd4ab4fc4f93 Mon Sep 17 00:00:00 2001 From: Richard Kuo Date: Mon, 3 Feb 2025 23:59:36 -0800 Subject: [PATCH 11/12] multiplier from 8 to 4 --- backend/onyx/background/celery/tasks/beat_schedule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index c2e1f163f..6db15cd83 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -18,7 +18,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 +CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 # tasks that only run in the cloud # the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered From aff4ee5ebff9e6a7aa02b8b3f80cb9782c6c96a6 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 4 Feb 2025 15:56:18 -0800 Subject: [PATCH 12/12] commented code --- backend/onyx/background/celery/tasks/vespa/tasks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index af2d2aad7..a31e6a9c6 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -821,9 +821,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: time_start = time.monotonic() - # timings: dict[str, Any] = {} - # timings["start"] = time_start - r = get_redis_client(tenant_id=tenant_id) # Replica usage notes