Merge pull request #3856 from onyx-dot-app/feature/no_scan_iter

lessen usage of scan_iter
This commit is contained in:
rkuo-danswer 2025-02-04 15:57:03 -08:00 committed by GitHub
commit c0271a948a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 239 additions and 160 deletions

View File

@ -21,13 +21,16 @@ from onyx.background.celery.tasks.indexing.utils import (
get_unfenced_index_attempt_ids,
)
from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from onyx.db.engine import get_session_with_default_tenant
from onyx.db.engine import SqlEngine
from onyx.db.index_attempt import get_index_attempt
from onyx.db.index_attempt import mark_attempt_canceled
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from onyx.redis.redis_connector_credential_pair import (
RedisGlobalConnectorCredentialPair,
)
from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
@ -141,23 +144,16 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
r.delete(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK)
r.delete(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
r.delete(RedisConnectorCredentialPair.get_taskset_key())
r.delete(RedisConnectorCredentialPair.get_fence_key())
r.delete(OnyxRedisConstants.ACTIVE_FENCES)
RedisGlobalConnectorCredentialPair.reset_all(r)
RedisDocumentSet.reset_all(r)
RedisUserGroup.reset_all(r)
RedisConnectorDelete.reset_all(r)
RedisConnectorPrune.reset_all(r)
RedisConnectorIndex.reset_all(r)
RedisConnectorStop.reset_all(r)
RedisConnectorPermissionSync.reset_all(r)
RedisConnectorExternalGroupSync.reset_all(r)
# mark orphaned index attempts as failed

View File

@ -18,7 +18,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)
# hack to slow down task dispatch in the cloud until
# we have a better implementation (backpressure, etc)
CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8
CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4
# tasks that only run in the cloud
# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered

View File

@ -3,6 +3,7 @@ from datetime import datetime
from datetime import timedelta
from datetime import timezone
from time import sleep
from typing import Any
from typing import cast
from uuid import uuid4
@ -38,6 +39,7 @@ from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import mark_cc_pair_as_permissions_synced
@ -57,8 +59,8 @@ from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
from onyx.utils.logger import LoggerContextVars
@ -123,6 +125,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
r = get_redis_client(tenant_id=tenant_id)
r_replica = get_redis_replica_client(tenant_id=tenant_id)
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
@ -158,18 +161,20 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
# we want to run this less frequently than the overall task
lock_beat.reacquire()
if not r.exists(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES):
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES):
# clear any permission fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
validate_permission_sync_fences(tenant_id, r, r_celery, lock_beat)
validate_permission_sync_fences(
tenant_id, r, r_replica, r_celery, lock_beat
)
except Exception:
task_logger.exception(
"Exception while validating permission sync fences"
)
r.set(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60)
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@ -486,6 +491,7 @@ def update_external_document_permissions_task(
def validate_permission_sync_fences(
tenant_id: str | None,
r: Redis,
r_replica: Redis,
r_celery: Redis,
lock_beat: RedisLock,
) -> None:
@ -506,12 +512,15 @@ def validate_permission_sync_fences(
OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery
)
# validate all existing indexing jobs
for key_bytes in r.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
lock_beat.reacquire()
# validate all existing permission sync jobs
lock_beat.reacquire()
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
continue
validate_permission_sync_fence(
tenant_id,
key_bytes,
@ -520,6 +529,9 @@ def validate_permission_sync_fences(
r,
r_celery,
)
lock_beat.reacquire()
return

View File

@ -224,7 +224,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
lock_beat.reacquire()
# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
if not redis_client.exists(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES):
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
@ -235,7 +235,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
except Exception:
task_logger.exception("Exception while validating indexing fences")
redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)
redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."

View File

@ -1,6 +1,8 @@
import time
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
import redis
from celery import Celery
@ -19,6 +21,7 @@ from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.engine import get_db_current_time
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import ConnectorCredentialPairStatus
@ -37,7 +40,6 @@ from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.utils.logger import setup_logger
logger = setup_logger()
@ -304,10 +306,13 @@ def validate_indexing_fences(
# Use replica for this because the worst thing that happens
# is that we don't run the validation on this pass
for key_bytes in r_replica.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
lock_beat.reacquire()
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
continue
with get_session_with_tenant(tenant_id) as db_session:
validate_indexing_fence(
tenant_id,
@ -316,6 +321,9 @@ def validate_indexing_fences(
r_celery,
db_session,
)
lock_beat.reacquire()
return

View File

@ -36,7 +36,9 @@ from onyx.configs.app_configs import VESPA_SYNC_MAX_TASKS
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import fetch_connector_by_id
from onyx.db.connector_credential_pair import add_deletion_failure_message
from onyx.db.connector_credential_pair import (
@ -72,6 +74,9 @@ from onyx.document_index.interfaces import VespaDocumentFields
from onyx.httpx.httpx_pool import HttpxPool
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from onyx.redis.redis_connector_credential_pair import (
RedisGlobalConnectorCredentialPair,
)
from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_index import RedisConnectorIndex
@ -204,10 +209,12 @@ def try_generate_stale_document_sync_tasks(
tenant_id: str | None,
) -> int | None:
# the fence is up, do nothing
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
redis_global_ccpair = RedisGlobalConnectorCredentialPair(r)
if redis_global_ccpair.fenced:
return None
r.delete(RedisConnectorCredentialPair.get_taskset_key()) # delete the taskset
redis_global_ccpair.delete_taskset()
# add tasks to celery and build up the task set to monitor in redis
stale_doc_count = count_documents_by_needs_sync(db_session)
@ -265,7 +272,7 @@ def try_generate_stale_document_sync_tasks(
f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}"
)
r.set(RedisConnectorCredentialPair.get_fence_key(), total_tasks_generated)
redis_global_ccpair.set_fence(total_tasks_generated)
return total_tasks_generated
@ -416,23 +423,17 @@ def try_generate_user_group_sync_tasks(
def monitor_connector_taskset(r: Redis) -> None:
fence_value = r.get(RedisConnectorCredentialPair.get_fence_key())
if fence_value is None:
redis_global_ccpair = RedisGlobalConnectorCredentialPair(r)
initial_count = redis_global_ccpair.payload
if initial_count is None:
return
try:
initial_count = int(cast(int, fence_value))
except ValueError:
task_logger.error("The value is not an integer.")
return
count = r.scard(RedisConnectorCredentialPair.get_taskset_key())
remaining = redis_global_ccpair.get_remaining()
task_logger.info(
f"Stale document sync progress: remaining={count} initial={initial_count}"
f"Stale document sync progress: remaining={remaining} initial={initial_count}"
)
if count == 0:
r.delete(RedisConnectorCredentialPair.get_taskset_key())
r.delete(RedisConnectorCredentialPair.get_fence_key())
if remaining == 0:
redis_global_ccpair.reset()
task_logger.info(f"Successfully synced stale documents. count={initial_count}")
@ -820,9 +821,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
time_start = time.monotonic()
timings: dict[str, Any] = {}
timings["start"] = time_start
r = get_redis_client(tenant_id=tenant_id)
# Replica usage notes
@ -847,7 +845,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
try:
# print current queue lengths
phase_start = time.monotonic()
time.monotonic()
# we don't need every tenant polling redis for this info.
if not MULTI_TENANT or random.randint(1, 10) == 10:
r_celery = self.app.broker_connection().channel().client # type: ignore
@ -889,50 +887,38 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
f"external_group_sync={n_external_group_sync} "
f"permissions_upsert={n_permissions_upsert} "
)
timings["queues"] = time.monotonic() - phase_start
timings["queues_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
# scan and monitor activity to completion
phase_start = time.monotonic()
lock_beat.reacquire()
if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()):
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
# we want to run this less frequently than the overall task
if not r.exists(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE):
# build a lookup table of existing fences
# this is just a migration concern and should be unnecessary once
# lookup tables are rolled out
for key_bytes in r_replica.scan_iter(count=SCAN_ITER_COUNT_DEFAULT):
if is_fence(key_bytes) and not r.sismember(
OnyxRedisConstants.ACTIVE_FENCES, key_bytes
):
logger.warning(f"Adding {key_bytes} to the lookup table.")
r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=300)
# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.
keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
if not r.exists(key_bytes):
r.srem(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
continue
key_str = key_bytes.decode("utf-8")
if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY:
monitor_connector_taskset(r)
timings["connector"] = time.monotonic() - phase_start
timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
lock_beat.reacquire()
timings["connector_deletion"] = time.monotonic() - phase_start
timings["connector_deletion_ttl"] = r.ttl(
OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK
)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
elif key_str.startswith(RedisDocumentSet.FENCE_PREFIX):
with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["documentset"] = time.monotonic() - phase_start
timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
elif key_str.startswith(RedisUserGroup.FENCE_PREFIX):
monitor_usergroup_taskset = (
fetch_versioned_implementation_with_fallback(
"onyx.background.celery.tasks.vespa.tasks",
@ -942,49 +928,21 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
)
with get_session_with_tenant(tenant_id) as db_session:
monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["usergroup"] = time.monotonic() - phase_start
timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
elif key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
elif key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["pruning"] = time.monotonic() - phase_start
timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
if r.exists(key_bytes):
elif key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
timings["indexing"] = time.monotonic() - phase_start
timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r_replica.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
if r.exists(key_bytes):
elif key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
lock_beat.reacquire()
timings["permissions"] = time.monotonic() - phase_start
timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
else:
pass
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@ -999,8 +957,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
else:
task_logger.error(
"monitor_vespa_sync - Lock not owned on completion: "
f"tenant={tenant_id} "
f"timings={timings}"
f"tenant={tenant_id}"
# f"timings={timings}"
)
redis_lock_dump(lock_beat, r)
@ -1064,15 +1022,6 @@ def vespa_metadata_sync_task(
# the sync might repeat again later
mark_document_as_synced(document_id, db_session)
# this code checks for and removes a per document sync key that is
# used to block out the same doc from continualy resyncing
# a quick hack that is only needed for production issues
# redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key(
# document_id
# )
# r = get_redis_client(tenant_id=tenant_id)
# r.delete(redis_syncing_key)
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} "
@ -1114,3 +1063,23 @@ def vespa_metadata_sync_task(
self.retry(exc=e, countdown=countdown)
return True
def is_fence(key_bytes: bytes) -> bool:
key_str = key_bytes.decode("utf-8")
if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY:
return True
if key_str.startswith(RedisDocumentSet.FENCE_PREFIX):
return True
if key_str.startswith(RedisUserGroup.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
return True
return False

View File

@ -306,9 +306,18 @@ class OnyxRedisLocks:
class OnyxRedisSignals:
VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences"
VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = "signal:validate_external_group_sync_fences"
VALIDATE_PERMISSION_SYNC_FENCES = "signal:validate_permission_sync_fences"
BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences"
BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = (
"signal:block_validate_external_group_sync_fences"
)
BLOCK_VALIDATE_PERMISSION_SYNC_FENCES = (
"signal:block_validate_permission_sync_fences"
)
BLOCK_BUILD_FENCE_LOOKUP_TABLE = "signal:block_build_fence_lookup_table"
class OnyxRedisConstants:
ACTIVE_FENCES = "active_fences"
class OnyxCeleryPriority(int, Enum):

View File

@ -17,6 +17,8 @@ class RedisConnector:
associated background tasks / associated redis interactions."""
def __init__(self, tenant_id: str | None, id: int) -> None:
"""id: a connector credential pair id"""
self.tenant_id: str | None = tenant_id
self.id: int = id
self.redis: redis.Redis = get_redis_client(tenant_id=tenant_id)

View File

@ -2,6 +2,7 @@ import time
from typing import cast
from uuid import uuid4
import redis
from celery import Celery
from redis import Redis
from redis.lock import Lock as RedisLock
@ -12,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import (
construct_document_select_for_connector_credential_pair_by_needs_sync,
@ -28,21 +30,14 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
all connectors and is not per connector."""
PREFIX = "connectorsync"
FENCE_PREFIX = PREFIX + "_fence"
TASKSET_PREFIX = PREFIX + "_taskset"
SYNCING_PREFIX = PREFIX + ":vespa_syncing"
def __init__(self, tenant_id: str | None, id: int) -> None:
super().__init__(tenant_id, str(id))
# documents that should be skipped
self.skip_docs: set[str] = set()
@classmethod
def get_fence_key(cls) -> str:
return RedisConnectorCredentialPair.FENCE_PREFIX
@classmethod
def get_taskset_key(cls) -> str:
return RedisConnectorCredentialPair.TASKSET_PREFIX
@ -51,19 +46,14 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
def taskset_key(self) -> str:
"""Notice that this is intentionally reusing the same taskset for all
connector syncs"""
# example: connector_taskset
# example: connectorsync_taskset
return f"{self.TASKSET_PREFIX}"
def set_skip_docs(self, skip_docs: set[str]) -> None:
# documents that should be skipped. Note that this classes updates
# documents that should be skipped. Note that this class updates
# the list on the fly
self.skip_docs = skip_docs
@staticmethod
def make_redis_syncing_key(doc_id: str) -> str:
"""used to create a key in redis to block a doc from syncing"""
return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"
def generate_tasks(
self,
max_tasks: int,
@ -111,15 +101,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
if doc.id in self.skip_docs:
continue
# an arbitrary number in seconds to prevent the same doc from syncing repeatedly
# SYNC_EXPIRATION = 24 * 60 * 60
# a quick hack that can be uncommented to prevent a doc from resyncing over and over
# redis_syncing_key = self.make_redis_syncing_key(doc.id)
# if redis_client.exists(redis_syncing_key):
# continue
# redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
# the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
# we prefix the task id so it's easier to keep track of who created the task
@ -148,3 +129,78 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
break
return len(async_results), num_docs
class RedisGlobalConnectorCredentialPair:
"""This class is used to scan documents by cc_pair in the db and collect them into
a unified set for syncing.
It differs from the other redis helpers in that the taskset used spans
all connectors and is not per connector."""
PREFIX = "connectorsync"
FENCE_KEY = PREFIX + "_fence"
TASKSET_KEY = PREFIX + "_taskset"
def __init__(self, redis: redis.Redis) -> None:
self.redis = redis
@property
def fenced(self) -> bool:
if self.redis.exists(self.fence_key):
return True
return False
@property
def payload(self) -> int | None:
bytes = self.redis.get(self.fence_key)
if bytes is None:
return None
progress = int(cast(int, bytes))
return progress
def get_remaining(self) -> int:
remaining = cast(int, self.redis.scard(self.taskset_key))
return remaining
@property
def fence_key(self) -> str:
"""Notice that this is intentionally reusing the same fence for all
connector syncs"""
# example: connectorsync_fence
return f"{self.FENCE_KEY}"
@property
def taskset_key(self) -> str:
"""Notice that this is intentionally reusing the same taskset for all
connector syncs"""
# example: connectorsync_taskset
return f"{self.TASKSET_KEY}"
def set_fence(self, payload: int | None) -> None:
if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def delete_taskset(self) -> None:
self.redis.delete(self.taskset_key)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)
@staticmethod
def reset_all(r: redis.Redis) -> None:
r.srem(
OnyxRedisConstants.ACTIVE_FENCES,
RedisGlobalConnectorCredentialPair.FENCE_KEY,
)
r.delete(RedisGlobalConnectorCredentialPair.TASKSET_KEY)
r.delete(RedisGlobalConnectorCredentialPair.FENCE_KEY)

View File

@ -14,6 +14,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import construct_document_select_for_connector_credential_pair
from onyx.db.models import Document as DbDocument
@ -69,10 +70,12 @@ class RedisConnectorDelete:
def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None:
if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def _generate_task_id(self) -> str:
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
@ -136,6 +139,7 @@ class RedisConnectorDelete:
return len(async_results)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
@ -111,10 +112,12 @@ class RedisConnectorPermissionSync:
payload: RedisConnectorPermissionSyncPayload | None,
) -> None:
if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def set_active(self) -> None:
"""This sets a signal to keep the permissioning flow from getting cleaned up within
@ -196,6 +199,7 @@ class RedisConnectorPermissionSync:
return len(async_results)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.active_key)
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)

View File

@ -5,6 +5,8 @@ from uuid import uuid4
import redis
from pydantic import BaseModel
from onyx.configs.constants import OnyxRedisConstants
class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None
@ -103,10 +105,12 @@ class RedisConnectorIndex:
payload: RedisConnectorIndexPayload | None,
) -> None:
if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def terminating(self, celery_task_id: str) -> bool:
if self.redis.exists(f"{self.terminate_key}_{celery_task_id}"):
@ -188,6 +192,7 @@ class RedisConnectorIndex:
return status
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.active_key)
self.redis.delete(self.generator_lock_key)
self.redis.delete(self.generator_progress_key)

View File

@ -11,6 +11,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
@ -79,10 +80,12 @@ class RedisConnectorPrune:
def set_fence(self, value: bool) -> None:
if not value:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, 0)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property
def generator_complete(self) -> int | None:
@ -158,6 +161,7 @@ class RedisConnectorPrune:
return len(async_results)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key)

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.document_set import construct_document_select_by_docset
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper
@ -35,10 +36,12 @@ class RedisDocumentSet(RedisObjectHelper):
def set_fence(self, payload: int | None) -> None:
if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property
def payload(self) -> int | None:
@ -96,6 +99,7 @@ class RedisDocumentSet(RedisObjectHelper):
return len(async_results), len(async_results)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)

View File

@ -113,6 +113,8 @@ class TenantRedis(redis.Redis):
"reacquire",
"create_lock",
"startswith",
"smembers",
"sismember",
"sadd",
"srem",
"scard",

View File

@ -13,6 +13,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper
from onyx.utils.variable_functionality import fetch_versioned_implementation
@ -36,10 +37,12 @@ class RedisUserGroup(RedisObjectHelper):
def set_fence(self, payload: int | None) -> None:
if payload is None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload)
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
@property
def payload(self) -> int | None:
@ -109,6 +112,7 @@ class RedisUserGroup(RedisObjectHelper):
return len(async_results), len(async_results)
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)