first full cut

This commit is contained in:
Richard Kuo (Danswer) 2025-01-30 15:21:52 -08:00
parent 7ccfe85ee5
commit d8578bc1cb
12 changed files with 168 additions and 75 deletions

View File

@ -21,13 +21,16 @@ from onyx.background.celery.tasks.indexing.utils import (
get_unfenced_index_attempt_ids, get_unfenced_index_attempt_ids,
) )
from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME from onyx.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from onyx.db.engine import get_session_with_default_tenant from onyx.db.engine import get_session_with_default_tenant
from onyx.db.engine import SqlEngine from onyx.db.engine import SqlEngine
from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import get_index_attempt
from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_canceled
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_credential_pair import (
RedisGlobalConnectorCredentialPair,
)
from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
@ -141,23 +144,16 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
r.delete(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK) r.delete(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK)
r.delete(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) r.delete(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
r.delete(RedisConnectorCredentialPair.get_taskset_key()) r.delete(OnyxRedisConstants.ACTIVE_FENCES)
r.delete(RedisConnectorCredentialPair.get_fence_key())
RedisGlobalConnectorCredentialPair.reset_all(r)
RedisDocumentSet.reset_all(r) RedisDocumentSet.reset_all(r)
RedisUserGroup.reset_all(r) RedisUserGroup.reset_all(r)
RedisConnectorDelete.reset_all(r) RedisConnectorDelete.reset_all(r)
RedisConnectorPrune.reset_all(r) RedisConnectorPrune.reset_all(r)
RedisConnectorIndex.reset_all(r) RedisConnectorIndex.reset_all(r)
RedisConnectorStop.reset_all(r) RedisConnectorStop.reset_all(r)
RedisConnectorPermissionSync.reset_all(r) RedisConnectorPermissionSync.reset_all(r)
RedisConnectorExternalGroupSync.reset_all(r) RedisConnectorExternalGroupSync.reset_all(r)
# mark orphaned index attempts as failed # mark orphaned index attempts as failed

View File

@ -5,6 +5,7 @@ from collections.abc import Callable
from datetime import datetime from datetime import datetime
from datetime import timezone from datetime import timezone
from http import HTTPStatus from http import HTTPStatus
from typing import Any
from typing import cast from typing import cast
import httpx import httpx
@ -35,6 +36,7 @@ from onyx.configs.app_configs import VESPA_SYNC_MAX_TASKS
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisLocks
from onyx.db.connector import fetch_connector_by_id from onyx.db.connector import fetch_connector_by_id
from onyx.db.connector_credential_pair import add_deletion_failure_message from onyx.db.connector_credential_pair import add_deletion_failure_message
@ -71,6 +73,9 @@ from onyx.document_index.interfaces import VespaDocumentFields
from onyx.httpx.httpx_pool import HttpxPool from onyx.httpx.httpx_pool import HttpxPool
from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from onyx.redis.redis_connector_credential_pair import (
RedisGlobalConnectorCredentialPair,
)
from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_index import RedisConnectorIndex
@ -201,10 +206,12 @@ def try_generate_stale_document_sync_tasks(
tenant_id: str | None, tenant_id: str | None,
) -> int | None: ) -> int | None:
# the fence is up, do nothing # the fence is up, do nothing
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
redis_global_ccpair = RedisGlobalConnectorCredentialPair(r)
if redis_global_ccpair.fenced:
return None return None
r.delete(RedisConnectorCredentialPair.get_taskset_key()) # delete the taskset redis_global_ccpair.delete_taskset()
# add tasks to celery and build up the task set to monitor in redis # add tasks to celery and build up the task set to monitor in redis
stale_doc_count = count_documents_by_needs_sync(db_session) stale_doc_count = count_documents_by_needs_sync(db_session)
@ -262,7 +269,7 @@ def try_generate_stale_document_sync_tasks(
f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}" f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}"
) )
r.set(RedisConnectorCredentialPair.get_fence_key(), total_tasks_generated) redis_global_ccpair.set_fence(total_tasks_generated)
return total_tasks_generated return total_tasks_generated
@ -413,23 +420,17 @@ def try_generate_user_group_sync_tasks(
def monitor_connector_taskset(r: Redis) -> None: def monitor_connector_taskset(r: Redis) -> None:
fence_value = r.get(RedisConnectorCredentialPair.get_fence_key()) redis_global_ccpair = RedisGlobalConnectorCredentialPair(r)
if fence_value is None: initial_count = redis_global_ccpair.payload
if initial_count is None:
return return
try: remaining = redis_global_ccpair.get_remaining()
initial_count = int(cast(int, fence_value))
except ValueError:
task_logger.error("The value is not an integer.")
return
count = r.scard(RedisConnectorCredentialPair.get_taskset_key())
task_logger.info( task_logger.info(
f"Stale document sync progress: remaining={count} initial={initial_count}" f"Stale document sync progress: remaining={remaining} initial={initial_count}"
) )
if count == 0: if remaining == 0:
r.delete(RedisConnectorCredentialPair.get_taskset_key()) redis_global_ccpair.reset()
r.delete(RedisConnectorCredentialPair.get_fence_key())
task_logger.info(f"Successfully synced stale documents. count={initial_count}") task_logger.info(f"Successfully synced stale documents. count={initial_count}")
@ -889,52 +890,43 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
# timings["queues"] = time.monotonic() - phase_start # timings["queues"] = time.monotonic() - phase_start
# timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) # timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
keys = r.smembers("active_fences") keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys: for key in keys:
key_bytes = cast(bytes, key) key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if key_str == RedisConnectorCredentialPair.get_fence_key(): if not r.exists(key_bytes):
if r.exists(key_str): r.srem(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
monitor_connector_taskset(r) continue
elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
if r.exists(key_str): key_str = key_bytes.decode("utf-8")
monitor_connector_deletion_taskset(tenant_id, key_bytes, r) if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY:
monitor_connector_taskset(r)
elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX): elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX):
if r.exists(key_str): with get_session_with_tenant(tenant_id) as db_session:
with get_session_with_tenant(tenant_id) as db_session: monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
monitor_document_set_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisUserGroup.FENCE_PREFIX): elif key_str.startswith(RedisUserGroup.FENCE_PREFIX):
if r.exists(key_str): monitor_usergroup_taskset = (
monitor_usergroup_taskset = ( fetch_versioned_implementation_with_fallback(
fetch_versioned_implementation_with_fallback( "onyx.background.celery.tasks.vespa.tasks",
"onyx.background.celery.tasks.vespa.tasks", "monitor_usergroup_taskset",
"monitor_usergroup_taskset", noop_fallback,
noop_fallback,
)
) )
with get_session_with_tenant(tenant_id) as db_session: )
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
if r.exists(key_str): with get_session_with_tenant(tenant_id) as db_session:
with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
monitor_ccpair_pruning_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
if r.exists(key_str): with get_session_with_tenant(tenant_id) as db_session:
with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
monitor_ccpair_indexing_taskset(
tenant_id, key_bytes, r, db_session
)
elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
if r.exists(key_str): with get_session_with_tenant(tenant_id) as db_session:
with get_session_with_tenant(tenant_id) as db_session: monitor_ccpair_permissions_taskset(
monitor_ccpair_permissions_taskset( tenant_id, key_bytes, r, db_session
tenant_id, key_bytes, r, db_session )
)
else: else:
pass pass
@ -1035,6 +1027,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
task_logger.info( task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
) )
return False
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()

View File

@ -302,6 +302,10 @@ class OnyxRedisSignals:
VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences"
class OnyxRedisConstants:
ACTIVE_FENCES = "active_fences"
class OnyxCeleryPriority(int, Enum): class OnyxCeleryPriority(int, Enum):
HIGHEST = 0 HIGHEST = 0
HIGH = auto() HIGH = auto()

View File

@ -17,6 +17,8 @@ class RedisConnector:
associated background tasks / associated redis interactions.""" associated background tasks / associated redis interactions."""
def __init__(self, tenant_id: str | None, id: int) -> None: def __init__(self, tenant_id: str | None, id: int) -> None:
"""id: a connector credential pair id"""
self.tenant_id: str | None = tenant_id self.tenant_id: str | None = tenant_id
self.id: int = id self.id: int = id
self.redis: redis.Redis = get_redis_client(tenant_id=tenant_id) self.redis: redis.Redis = get_redis_client(tenant_id=tenant_id)

View File

@ -2,6 +2,7 @@ import time
from typing import cast from typing import cast
from uuid import uuid4 from uuid import uuid4
import redis
from celery import Celery from celery import Celery
from redis import Redis from redis import Redis
from redis.lock import Lock as RedisLock from redis.lock import Lock as RedisLock
@ -12,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import ( from onyx.db.document import (
construct_document_select_for_connector_credential_pair_by_needs_sync, construct_document_select_for_connector_credential_pair_by_needs_sync,
@ -28,10 +30,9 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
all connectors and is not per connector.""" all connectors and is not per connector."""
PREFIX = "connectorsync" PREFIX = "connectorsync"
FENCE_PREFIX = PREFIX + "_fence"
TASKSET_PREFIX = PREFIX + "_taskset" TASKSET_PREFIX = PREFIX + "_taskset"
SYNCING_PREFIX = PREFIX + ":vespa_syncing" # SYNCING_PREFIX = PREFIX + ":vespa_syncing"
def __init__(self, tenant_id: str | None, id: int) -> None: def __init__(self, tenant_id: str | None, id: int) -> None:
super().__init__(tenant_id, str(id)) super().__init__(tenant_id, str(id))
@ -39,10 +40,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
# documents that should be skipped # documents that should be skipped
self.skip_docs: set[str] = set() self.skip_docs: set[str] = set()
@classmethod
def get_fence_key(cls) -> str:
return RedisConnectorCredentialPair.FENCE_PREFIX
@classmethod @classmethod
def get_taskset_key(cls) -> str: def get_taskset_key(cls) -> str:
return RedisConnectorCredentialPair.TASKSET_PREFIX return RedisConnectorCredentialPair.TASKSET_PREFIX
@ -51,18 +48,18 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
def taskset_key(self) -> str: def taskset_key(self) -> str:
"""Notice that this is intentionally reusing the same taskset for all """Notice that this is intentionally reusing the same taskset for all
connector syncs""" connector syncs"""
# example: connector_taskset # example: connectorsync_taskset
return f"{self.TASKSET_PREFIX}" return f"{self.TASKSET_PREFIX}"
def set_skip_docs(self, skip_docs: set[str]) -> None: def set_skip_docs(self, skip_docs: set[str]) -> None:
# documents that should be skipped. Note that this classes updates # documents that should be skipped. Note that this class updates
# the list on the fly # the list on the fly
self.skip_docs = skip_docs self.skip_docs = skip_docs
@staticmethod # @staticmethod
def make_redis_syncing_key(doc_id: str) -> str: # def make_redis_syncing_key(doc_id: str) -> str:
"""used to create a key in redis to block a doc from syncing""" # """used to create a key in redis to block a doc from syncing"""
return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" # return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"
def generate_tasks( def generate_tasks(
self, self,
@ -148,3 +145,78 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
break break
return len(async_results), num_docs return len(async_results), num_docs
class RedisGlobalConnectorCredentialPair:
"""This class is used to scan documents by cc_pair in the db and collect them into
a unified set for syncing.
It differs from the other redis helpers in that the taskset used spans
all connectors and is not per connector."""
PREFIX = "connectorsync"
FENCE_KEY = PREFIX + "_fence"
TASKSET_KEY = PREFIX + "_taskset"
def __init__(self, redis: redis.Redis) -> None:
self.redis = redis
@property
def fenced(self) -> bool:
if self.redis.exists(self.fence_key):
return True
return False
@property
def payload(self) -> int | None:
bytes = self.redis.get(self.fence_key)
if bytes is None:
return None
progress = int(cast(int, bytes))
return progress
def get_remaining(self) -> int:
remaining = cast(int, self.redis.scard(self.taskset_key))
return remaining
@property
def fence_key(self) -> str:
"""Notice that this is intentionally reusing the same fence for all
connector syncs"""
# example: connectorsync_fence
return f"{self.FENCE_KEY}"
@property
def taskset_key(self) -> str:
"""Notice that this is intentionally reusing the same taskset for all
connector syncs"""
# example: connectorsync_taskset
return f"{self.TASKSET_KEY}"
def set_fence(self, payload: int | None) -> None:
if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def delete_taskset(self) -> None:
self.redis.delete(self.taskset_key)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)
@staticmethod
def reset_all(r: redis.Redis) -> None:
r.srem(
OnyxRedisConstants.ACTIVE_FENCES,
RedisGlobalConnectorCredentialPair.FENCE_KEY,
)
r.delete(RedisGlobalConnectorCredentialPair.TASKSET_KEY)
r.delete(RedisGlobalConnectorCredentialPair.FENCE_KEY)

View File

@ -14,6 +14,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import construct_document_select_for_connector_credential_pair from onyx.db.document import construct_document_select_for_connector_credential_pair
from onyx.db.models import Document as DbDocument from onyx.db.models import Document as DbDocument
@ -69,10 +70,12 @@ class RedisConnectorDelete:
def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None: def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None:
if not payload: if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, payload.model_dump_json()) self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def _generate_task_id(self) -> str: def _generate_task_id(self) -> str:
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
@ -136,6 +139,7 @@ class RedisConnectorDelete:
return len(async_results) return len(async_results)
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key) self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
@ -102,10 +103,12 @@ class RedisConnectorPermissionSync:
payload: RedisConnectorPermissionSyncPayload | None, payload: RedisConnectorPermissionSyncPayload | None,
) -> None: ) -> None:
if not payload: if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, payload.model_dump_json()) self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property @property
def generator_complete(self) -> int | None: def generator_complete(self) -> int | None:
@ -173,6 +176,7 @@ class RedisConnectorPermissionSync:
return len(async_results) return len(async_results)
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key) self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key) self.redis.delete(self.taskset_key)

View File

@ -5,6 +5,8 @@ from uuid import uuid4
import redis import redis
from pydantic import BaseModel from pydantic import BaseModel
from onyx.configs.constants import OnyxRedisConstants
class RedisConnectorIndexPayload(BaseModel): class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None index_attempt_id: int | None
@ -103,10 +105,12 @@ class RedisConnectorIndex:
payload: RedisConnectorIndexPayload | None, payload: RedisConnectorIndexPayload | None,
) -> None: ) -> None:
if not payload: if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, payload.model_dump_json()) self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def terminating(self, celery_task_id: str) -> bool: def terminating(self, celery_task_id: str) -> bool:
if self.redis.exists(f"{self.terminate_key}_{celery_task_id}"): if self.redis.exists(f"{self.terminate_key}_{celery_task_id}"):
@ -188,6 +192,7 @@ class RedisConnectorIndex:
return status return status
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.active_key) self.redis.delete(self.active_key)
self.redis.delete(self.generator_lock_key) self.redis.delete(self.generator_lock_key)
self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_progress_key)

View File

@ -11,6 +11,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
@ -79,10 +80,12 @@ class RedisConnectorPrune:
def set_fence(self, value: bool) -> None: def set_fence(self, value: bool) -> None:
if not value: if not value:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, 0) self.redis.set(self.fence_key, 0)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property @property
def generator_complete(self) -> int | None: def generator_complete(self) -> int | None:
@ -158,6 +161,7 @@ class RedisConnectorPrune:
return len(async_results) return len(async_results)
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key) self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key) self.redis.delete(self.taskset_key)

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.document_set import construct_document_select_by_docset from onyx.db.document_set import construct_document_select_by_docset
from onyx.db.models import Document from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.redis.redis_object_helper import RedisObjectHelper
@ -35,10 +36,12 @@ class RedisDocumentSet(RedisObjectHelper):
def set_fence(self, payload: int | None) -> None: def set_fence(self, payload: int | None) -> None:
if payload is None: if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, payload) self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property @property
def payload(self) -> int | None: def payload(self) -> int | None:
@ -96,6 +99,7 @@ class RedisDocumentSet(RedisObjectHelper):
return len(async_results), len(async_results) return len(async_results), len(async_results)
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key) self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)

View File

@ -113,6 +113,7 @@ class TenantRedis(redis.Redis):
"reacquire", "reacquire",
"create_lock", "create_lock",
"startswith", "startswith",
"smembers",
"sadd", "sadd",
"srem", "srem",
"scard", "scard",

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.models import Document from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.redis.redis_object_helper import RedisObjectHelper
from onyx.utils.variable_functionality import fetch_versioned_implementation from onyx.utils.variable_functionality import fetch_versioned_implementation
@ -36,10 +37,12 @@ class RedisUserGroup(RedisObjectHelper):
def set_fence(self, payload: int | None) -> None: def set_fence(self, payload: int | None) -> None:
if payload is None: if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)
return return
self.redis.set(self.fence_key, payload) self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property @property
def payload(self) -> int | None: def payload(self) -> int | None:
@ -109,6 +112,7 @@ class RedisUserGroup(RedisObjectHelper):
return len(async_results), len(async_results) return len(async_results), len(async_results)
def reset(self) -> None: def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key) self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key) self.redis.delete(self.fence_key)