From d8578bc1cb7c46ad020328d8430c7e2c147fff3b Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 30 Jan 2025 15:21:52 -0800 Subject: [PATCH] first full cut --- .../onyx/background/celery/apps/primary.py | 16 ++- .../background/celery/tasks/vespa/tasks.py | 99 +++++++++---------- backend/onyx/configs/constants.py | 4 + backend/onyx/redis/redis_connector.py | 2 + .../redis/redis_connector_credential_pair.py | 96 +++++++++++++++--- backend/onyx/redis/redis_connector_delete.py | 4 + .../redis/redis_connector_doc_perm_sync.py | 4 + backend/onyx/redis/redis_connector_index.py | 5 + backend/onyx/redis/redis_connector_prune.py | 4 + backend/onyx/redis/redis_document_set.py | 4 + backend/onyx/redis/redis_pool.py | 1 + backend/onyx/redis/redis_usergroup.py | 4 + 12 files changed, 168 insertions(+), 75 deletions(-) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 435a88e2c..5696d75bc 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -21,13 +21,16 @@ from onyx.background.celery.tasks.indexing.utils import ( get_unfenced_index_attempt_ids, ) from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME from onyx.db.engine import get_session_with_default_tenant from onyx.db.engine import SqlEngine from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import mark_attempt_canceled -from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair +from onyx.redis.redis_connector_credential_pair import ( + RedisGlobalConnectorCredentialPair, +) from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync @@ -141,23 +144,16 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: r.delete(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK) r.delete(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - r.delete(RedisConnectorCredentialPair.get_taskset_key()) - r.delete(RedisConnectorCredentialPair.get_fence_key()) + r.delete(OnyxRedisConstants.ACTIVE_FENCES) + RedisGlobalConnectorCredentialPair.reset_all(r) RedisDocumentSet.reset_all(r) - RedisUserGroup.reset_all(r) - RedisConnectorDelete.reset_all(r) - RedisConnectorPrune.reset_all(r) - RedisConnectorIndex.reset_all(r) - RedisConnectorStop.reset_all(r) - RedisConnectorPermissionSync.reset_all(r) - RedisConnectorExternalGroupSync.reset_all(r) # mark orphaned index attempts as failed diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index ca5941c75..3b509ac02 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -5,6 +5,7 @@ from collections.abc import Callable from datetime import datetime from datetime import timezone from http import HTTPStatus +from typing import Any from typing import cast import httpx @@ -35,6 +36,7 @@ from onyx.configs.app_configs import VESPA_SYNC_MAX_TASKS from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.db.connector import fetch_connector_by_id from onyx.db.connector_credential_pair import add_deletion_failure_message @@ -71,6 +73,9 @@ from onyx.document_index.interfaces import VespaDocumentFields from onyx.httpx.httpx_pool import HttpxPool from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair +from onyx.redis.redis_connector_credential_pair import ( + RedisGlobalConnectorCredentialPair, +) from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_index import RedisConnectorIndex @@ -201,10 +206,12 @@ def try_generate_stale_document_sync_tasks( tenant_id: str | None, ) -> int | None: # the fence is up, do nothing - if r.exists(RedisConnectorCredentialPair.get_fence_key()): + + redis_global_ccpair = RedisGlobalConnectorCredentialPair(r) + if redis_global_ccpair.fenced: return None - r.delete(RedisConnectorCredentialPair.get_taskset_key()) # delete the taskset + redis_global_ccpair.delete_taskset() # add tasks to celery and build up the task set to monitor in redis stale_doc_count = count_documents_by_needs_sync(db_session) @@ -262,7 +269,7 @@ def try_generate_stale_document_sync_tasks( f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}" ) - r.set(RedisConnectorCredentialPair.get_fence_key(), total_tasks_generated) + redis_global_ccpair.set_fence(total_tasks_generated) return total_tasks_generated @@ -413,23 +420,17 @@ def try_generate_user_group_sync_tasks( def monitor_connector_taskset(r: Redis) -> None: - fence_value = r.get(RedisConnectorCredentialPair.get_fence_key()) - if fence_value is None: + redis_global_ccpair = RedisGlobalConnectorCredentialPair(r) + initial_count = redis_global_ccpair.payload + if initial_count is None: return - try: - initial_count = int(cast(int, fence_value)) - except ValueError: - task_logger.error("The value is not an integer.") - return - - count = r.scard(RedisConnectorCredentialPair.get_taskset_key()) + remaining = redis_global_ccpair.get_remaining() task_logger.info( - f"Stale document sync progress: remaining={count} initial={initial_count}" + f"Stale document sync progress: remaining={remaining} initial={initial_count}" ) - if count == 0: - r.delete(RedisConnectorCredentialPair.get_taskset_key()) - r.delete(RedisConnectorCredentialPair.get_fence_key()) + if remaining == 0: + redis_global_ccpair.reset() task_logger.info(f"Successfully synced stale documents. count={initial_count}") @@ -889,52 +890,43 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # timings["queues"] = time.monotonic() - phase_start # timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - keys = r.smembers("active_fences") + keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) - key_str = key_bytes.decode("utf-8") - if key_str == RedisConnectorCredentialPair.get_fence_key(): - if r.exists(key_str): - monitor_connector_taskset(r) - elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): - if r.exists(key_str): - monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + if not r.exists(key_bytes): + r.srem(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) + continue + + key_str = key_bytes.decode("utf-8") + if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY: + monitor_connector_taskset(r) elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_document_set_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisUserGroup.FENCE_PREFIX): - if r.exists(key_str): - monitor_usergroup_taskset = ( - fetch_versioned_implementation_with_fallback( - "onyx.background.celery.tasks.vespa.tasks", - "monitor_usergroup_taskset", - noop_fallback, - ) + monitor_usergroup_taskset = ( + fetch_versioned_implementation_with_fallback( + "onyx.background.celery.tasks.vespa.tasks", + "monitor_usergroup_taskset", + noop_fallback, ) - with get_session_with_tenant(tenant_id) as db_session: - monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX): + monitor_connector_deletion_taskset(tenant_id, key_bytes, r) elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_pruning_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_indexing_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): - if r.exists(key_str): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_permissions_taskset( - tenant_id, key_bytes, r, db_session - ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_permissions_taskset( + tenant_id, key_bytes, r, db_session + ) else: pass @@ -1035,6 +1027,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." ) + return False finally: if lock_beat.owned(): lock_beat.release() diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 09a90a9ee..b4a47813d 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -302,6 +302,10 @@ class OnyxRedisSignals: VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" +class OnyxRedisConstants: + ACTIVE_FENCES = "active_fences" + + class OnyxCeleryPriority(int, Enum): HIGHEST = 0 HIGH = auto() diff --git a/backend/onyx/redis/redis_connector.py b/backend/onyx/redis/redis_connector.py index f55ff4a89..196f2306c 100644 --- a/backend/onyx/redis/redis_connector.py +++ b/backend/onyx/redis/redis_connector.py @@ -17,6 +17,8 @@ class RedisConnector: associated background tasks / associated redis interactions.""" def __init__(self, tenant_id: str | None, id: int) -> None: + """id: a connector credential pair id""" + self.tenant_id: str | None = tenant_id self.id: int = id self.redis: redis.Redis = get_redis_client(tenant_id=tenant_id) diff --git a/backend/onyx/redis/redis_connector_credential_pair.py b/backend/onyx/redis/redis_connector_credential_pair.py index e648bc563..52beefa5b 100644 --- a/backend/onyx/redis/redis_connector_credential_pair.py +++ b/backend/onyx/redis/redis_connector_credential_pair.py @@ -2,6 +2,7 @@ import time from typing import cast from uuid import uuid4 +import redis from celery import Celery from redis import Redis from redis.lock import Lock as RedisLock @@ -12,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import ( construct_document_select_for_connector_credential_pair_by_needs_sync, @@ -28,10 +30,9 @@ class RedisConnectorCredentialPair(RedisObjectHelper): all connectors and is not per connector.""" PREFIX = "connectorsync" - FENCE_PREFIX = PREFIX + "_fence" TASKSET_PREFIX = PREFIX + "_taskset" - SYNCING_PREFIX = PREFIX + ":vespa_syncing" + # SYNCING_PREFIX = PREFIX + ":vespa_syncing" def __init__(self, tenant_id: str | None, id: int) -> None: super().__init__(tenant_id, str(id)) @@ -39,10 +40,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): # documents that should be skipped self.skip_docs: set[str] = set() - @classmethod - def get_fence_key(cls) -> str: - return RedisConnectorCredentialPair.FENCE_PREFIX - @classmethod def get_taskset_key(cls) -> str: return RedisConnectorCredentialPair.TASKSET_PREFIX @@ -51,18 +48,18 @@ class RedisConnectorCredentialPair(RedisObjectHelper): def taskset_key(self) -> str: """Notice that this is intentionally reusing the same taskset for all connector syncs""" - # example: connector_taskset + # example: connectorsync_taskset return f"{self.TASKSET_PREFIX}" def set_skip_docs(self, skip_docs: set[str]) -> None: - # documents that should be skipped. Note that this classes updates + # documents that should be skipped. Note that this class updates # the list on the fly self.skip_docs = skip_docs - @staticmethod - def make_redis_syncing_key(doc_id: str) -> str: - """used to create a key in redis to block a doc from syncing""" - return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" + # @staticmethod + # def make_redis_syncing_key(doc_id: str) -> str: + # """used to create a key in redis to block a doc from syncing""" + # return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" def generate_tasks( self, @@ -148,3 +145,78 @@ class RedisConnectorCredentialPair(RedisObjectHelper): break return len(async_results), num_docs + + +class RedisGlobalConnectorCredentialPair: + """This class is used to scan documents by cc_pair in the db and collect them into + a unified set for syncing. + + It differs from the other redis helpers in that the taskset used spans + all connectors and is not per connector.""" + + PREFIX = "connectorsync" + FENCE_KEY = PREFIX + "_fence" + TASKSET_KEY = PREFIX + "_taskset" + + def __init__(self, redis: redis.Redis) -> None: + self.redis = redis + + @property + def fenced(self) -> bool: + if self.redis.exists(self.fence_key): + return True + + return False + + @property + def payload(self) -> int | None: + bytes = self.redis.get(self.fence_key) + if bytes is None: + return None + + progress = int(cast(int, bytes)) + return progress + + def get_remaining(self) -> int: + remaining = cast(int, self.redis.scard(self.taskset_key)) + return remaining + + @property + def fence_key(self) -> str: + """Notice that this is intentionally reusing the same fence for all + connector syncs""" + # example: connectorsync_fence + return f"{self.FENCE_KEY}" + + @property + def taskset_key(self) -> str: + """Notice that this is intentionally reusing the same taskset for all + connector syncs""" + # example: connectorsync_taskset + return f"{self.TASKSET_KEY}" + + def set_fence(self, payload: int | None) -> None: + if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.fence_key) + return + + self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + + def delete_taskset(self) -> None: + self.redis.delete(self.taskset_key) + + def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.taskset_key) + self.redis.delete(self.fence_key) + + @staticmethod + def reset_all(r: redis.Redis) -> None: + r.srem( + OnyxRedisConstants.ACTIVE_FENCES, + RedisGlobalConnectorCredentialPair.FENCE_KEY, + ) + r.delete(RedisGlobalConnectorCredentialPair.TASKSET_KEY) + r.delete(RedisGlobalConnectorCredentialPair.FENCE_KEY) diff --git a/backend/onyx/redis/redis_connector_delete.py b/backend/onyx/redis/redis_connector_delete.py index 17651bf66..3ac5edb72 100644 --- a/backend/onyx/redis/redis_connector_delete.py +++ b/backend/onyx/redis/redis_connector_delete.py @@ -14,6 +14,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import construct_document_select_for_connector_credential_pair from onyx.db.models import Document as DbDocument @@ -69,10 +70,12 @@ class RedisConnectorDelete: def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) def _generate_task_id(self) -> str: # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" @@ -136,6 +139,7 @@ class RedisConnectorDelete: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key) diff --git a/backend/onyx/redis/redis_connector_doc_perm_sync.py b/backend/onyx/redis/redis_connector_doc_perm_sync.py index 99f891e14..23b0b1657 100644 --- a/backend/onyx/redis/redis_connector_doc_perm_sync.py +++ b/backend/onyx/redis/redis_connector_doc_perm_sync.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT @@ -102,10 +103,12 @@ class RedisConnectorPermissionSync: payload: RedisConnectorPermissionSyncPayload | None, ) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def generator_complete(self) -> int | None: @@ -173,6 +176,7 @@ class RedisConnectorPermissionSync: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) diff --git a/backend/onyx/redis/redis_connector_index.py b/backend/onyx/redis/redis_connector_index.py index 5b62da7b6..bd766fc89 100644 --- a/backend/onyx/redis/redis_connector_index.py +++ b/backend/onyx/redis/redis_connector_index.py @@ -5,6 +5,8 @@ from uuid import uuid4 import redis from pydantic import BaseModel +from onyx.configs.constants import OnyxRedisConstants + class RedisConnectorIndexPayload(BaseModel): index_attempt_id: int | None @@ -103,10 +105,12 @@ class RedisConnectorIndex: payload: RedisConnectorIndexPayload | None, ) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) def terminating(self, celery_task_id: str) -> bool: if self.redis.exists(f"{self.terminate_key}_{celery_task_id}"): @@ -188,6 +192,7 @@ class RedisConnectorIndex: return status def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.active_key) self.redis.delete(self.generator_lock_key) self.redis.delete(self.generator_progress_key) diff --git a/backend/onyx/redis/redis_connector_prune.py b/backend/onyx/redis/redis_connector_prune.py index ea4a923eb..bc04cd516 100644 --- a/backend/onyx/redis/redis_connector_prune.py +++ b/backend/onyx/redis/redis_connector_prune.py @@ -11,6 +11,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT @@ -79,10 +80,12 @@ class RedisConnectorPrune: def set_fence(self, value: bool) -> None: if not value: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, 0) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def generator_complete(self) -> int | None: @@ -158,6 +161,7 @@ class RedisConnectorPrune: return len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) diff --git a/backend/onyx/redis/redis_document_set.py b/backend/onyx/redis/redis_document_set.py index aa219d6dd..c0c3ce2a0 100644 --- a/backend/onyx/redis/redis_document_set.py +++ b/backend/onyx/redis/redis_document_set.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.document_set import construct_document_select_by_docset from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper @@ -35,10 +36,12 @@ class RedisDocumentSet(RedisObjectHelper): def set_fence(self, payload: int | None) -> None: if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def payload(self) -> int | None: @@ -96,6 +99,7 @@ class RedisDocumentSet(RedisObjectHelper): return len(async_results), len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key) diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index 10a2c7655..146bc19a3 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -113,6 +113,7 @@ class TenantRedis(redis.Redis): "reacquire", "create_lock", "startswith", + "smembers", "sadd", "srem", "scard", diff --git a/backend/onyx/redis/redis_usergroup.py b/backend/onyx/redis/redis_usergroup.py index 00981e9a1..880806850 100644 --- a/backend/onyx/redis/redis_usergroup.py +++ b/backend/onyx/redis/redis_usergroup.py @@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.utils.variable_functionality import fetch_versioned_implementation @@ -36,10 +37,12 @@ class RedisUserGroup(RedisObjectHelper): def set_fence(self, payload: int | None) -> None: if payload is None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) @property def payload(self) -> int | None: @@ -109,6 +112,7 @@ class RedisUserGroup(RedisObjectHelper): return len(async_results), len(async_results) def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.taskset_key) self.redis.delete(self.fence_key)