get rid of some more scan_iter

This commit is contained in:
Richard Kuo (Danswer) 2025-02-02 01:14:10 -08:00
parent bbee2865e9
commit 3a950721b9
3 changed files with 29 additions and 14 deletions

View File

@ -3,6 +3,7 @@ from datetime import datetime
from datetime import timedelta
from datetime import timezone
from time import sleep
from typing import Any
from typing import cast
from uuid import uuid4
@ -38,6 +39,7 @@ from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import mark_cc_pair_as_permissions_synced
@ -58,7 +60,6 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyn
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
from onyx.utils.logger import LoggerContextVars
@ -169,7 +170,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
"Exception while validating permission sync fences"
)
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60)
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@ -506,12 +507,15 @@ def validate_permission_sync_fences(
OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery
)
# validate all existing indexing jobs
for key_bytes in r.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
lock_beat.reacquire()
# validate all existing permission sync jobs
lock_beat.reacquire()
keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
continue
validate_permission_sync_fence(
tenant_id,
key_bytes,
@ -520,6 +524,9 @@ def validate_permission_sync_fences(
r,
r_celery,
)
lock_beat.reacquire()
return

View File

@ -1,6 +1,8 @@
import time
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
import redis
from celery import Celery
@ -19,6 +21,7 @@ from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.engine import get_db_current_time
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import ConnectorCredentialPairStatus
@ -37,7 +40,6 @@ from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.utils.logger import setup_logger
logger = setup_logger()
@ -304,10 +306,13 @@ def validate_indexing_fences(
# Use replica for this because the worst thing that happens
# is that we don't run the validation on this pass
for key_bytes in r_replica.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
lock_beat.reacquire()
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
continue
with get_session_with_tenant(tenant_id) as db_session:
validate_indexing_fence(
tenant_id,
@ -316,6 +321,9 @@ def validate_indexing_fences(
r_celery,
db_session,
)
lock_beat.reacquire()
return

View File

@ -903,7 +903,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
logger.warning(f"Adding {key_bytes} to the lookup table.")
r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=120)
r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=300)
# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.