From 3a950721b98fc363d7bae9179af6ed9047c675a9 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Sun, 2 Feb 2025 01:14:10 -0800 Subject: [PATCH] get rid of some more scan_iter --- .../tasks/doc_permission_syncing/tasks.py | 23 ++++++++++++------- .../background/celery/tasks/indexing/utils.py | 18 +++++++++++---- .../background/celery/tasks/vespa/tasks.py | 2 +- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 6d0b11dc5..a4608762b 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -3,6 +3,7 @@ from datetime import datetime from datetime import timedelta from datetime import timezone from time import sleep +from typing import Any from typing import cast from uuid import uuid4 @@ -38,6 +39,7 @@ from onyx.configs.constants import DocumentSource from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_cc_pair_as_permissions_synced @@ -58,7 +60,6 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyn from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.server.utils import make_short_id from onyx.utils.logger import doc_permission_sync_ctx from onyx.utils.logger import LoggerContextVars @@ -169,7 +170,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool "Exception while validating permission sync fences" ) - r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60) + r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -506,12 +507,15 @@ def validate_permission_sync_fences( OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery ) - # validate all existing indexing jobs - for key_bytes in r.scan_iter( - RedisConnectorPermissionSync.FENCE_PREFIX + "*", - count=SCAN_ITER_COUNT_DEFAULT, - ): - lock_beat.reacquire() + # validate all existing permission sync jobs + lock_beat.reacquire() + keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX): + continue + validate_permission_sync_fence( tenant_id, key_bytes, @@ -520,6 +524,9 @@ def validate_permission_sync_fences( r, r_celery, ) + + lock_beat.reacquire() + return diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py index e14e79b5f..0eee8a36f 100644 --- a/backend/onyx/background/celery/tasks/indexing/utils.py +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -1,6 +1,8 @@ import time from datetime import datetime from datetime import timezone +from typing import Any +from typing import cast import redis from celery import Celery @@ -19,6 +21,7 @@ from onyx.configs.constants import DocumentSource from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.db.engine import get_db_current_time from onyx.db.engine import get_session_with_tenant from onyx.db.enums import ConnectorCredentialPairStatus @@ -37,7 +40,6 @@ from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_index import RedisConnectorIndexPayload from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.utils.logger import setup_logger logger = setup_logger() @@ -304,10 +306,13 @@ def validate_indexing_fences( # Use replica for this because the worst thing that happens # is that we don't run the validation on this pass - for key_bytes in r_replica.scan_iter( - RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - lock_beat.reacquire() + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorIndex.FENCE_PREFIX): + continue + with get_session_with_tenant(tenant_id) as db_session: validate_indexing_fence( tenant_id, @@ -316,6 +321,9 @@ def validate_indexing_fences( r_celery, db_session, ) + + lock_beat.reacquire() + return diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 76214c00b..e50c0fdfc 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -903,7 +903,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: logger.warning(f"Adding {key_bytes} to the lookup table.") r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes) - r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=120) + r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=300) # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth.