build a lookup table every so often to handle cloud migration

This commit is contained in:
Richard Kuo (Danswer) 2025-01-31 12:12:52 -08:00
parent 5232aeacad
commit b64545c7c7
2 changed files with 39 additions and 1 deletions

View File

@ -38,6 +38,7 @@ from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import fetch_connector_by_id
from onyx.db.connector_credential_pair import add_deletion_failure_message
from onyx.db.connector_credential_pair import (
@ -82,7 +83,9 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_usergroup import RedisUserGroup
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
@ -832,7 +835,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
# then read from the replica. In this case, monitoring work could be done on a fence
# that no longer exists. To avoid this, we scan from the replica, but double check
# the result on the master.
# r_replica = get_redis_replica_client(tenant_id=tenant_id)
r_replica = get_redis_replica_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK,
@ -888,6 +891,20 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
f"permissions_upsert={n_permissions_upsert} "
)
# we want to run this less frequently than the overall task
if not r.exists(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE):
# build a lookup table of existing fences
# this is just a migration concern and should be unnecessary once
# lookup tables are rolled out
for key_bytes in r_replica.scan_iter(count=SCAN_ITER_COUNT_DEFAULT):
if is_fence(key_bytes):
logger.warning(f"Adding {key_bytes} to the lookup table.")
r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
r.set(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE, 1, ex=120)
# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.
keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
@ -1056,3 +1073,23 @@ def vespa_metadata_sync_task(
self.retry(exc=e, countdown=countdown)
return True
def is_fence(key_bytes: bytes) -> bool:
key_str = key_bytes.decode("utf-8")
if key_str == RedisGlobalConnectorCredentialPair.FENCE_KEY:
return True
if key_str.startswith(RedisDocumentSet.FENCE_PREFIX):
return True
if key_str.startswith(RedisUserGroup.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorDelete.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
return True
return False

View File

@ -302,6 +302,7 @@ class OnyxRedisSignals:
VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences"
VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = "signal:validate_external_group_sync_fences"
VALIDATE_PERMISSION_SYNC_FENCES = "signal:validate_permission_sync_fences"
BUILD_FENCE_LOOKUP_TABLE = "signal:build_fence_lookup_table"
class OnyxRedisConstants: