From 371d1ccd8fc5bf78ad3d24db1444051134e66c5f Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Sun, 5 Jan 2025 03:26:33 -0800 Subject: [PATCH] move to just setting keys --- .../background/celery/tasks/vespa/tasks.py | 6 +++++- .../redis/redis_connector_credential_pair.py | 21 ++++++++++++++----- backend/onyx/redis/redis_usergroup.py | 2 ++ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 393e18237..7bef0bcca 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -898,8 +898,12 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) + redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key( + document_id + ) r = get_redis_client(tenant_id=tenant_id) - r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id) + r.delete(redis_syncing_key) + # r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id) task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") except SoftTimeLimitExceeded: diff --git a/backend/onyx/redis/redis_connector_credential_pair.py b/backend/onyx/redis/redis_connector_credential_pair.py index 3f5a01d8b..26055028c 100644 --- a/backend/onyx/redis/redis_connector_credential_pair.py +++ b/backend/onyx/redis/redis_connector_credential_pair.py @@ -30,7 +30,8 @@ class RedisConnectorCredentialPair(RedisObjectHelper): FENCE_PREFIX = PREFIX + "_fence" TASKSET_PREFIX = PREFIX + "_taskset" - SYNCING_HASH = PREFIX + ":vespa_syncing" + # SYNCING_HASH = PREFIX + ":vespa_syncing" + SYNCING_PREFIX = PREFIX + ":vespa_syncing" def __init__(self, tenant_id: str | None, id: int) -> None: super().__init__(tenant_id, str(id)) @@ -58,6 +59,10 @@ class RedisConnectorCredentialPair(RedisObjectHelper): # the list on the fly self.skip_docs = skip_docs + @staticmethod + def make_redis_syncing_key(doc_id: str) -> str: + return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" + def generate_tasks( self, celery_app: Celery, @@ -98,7 +103,11 @@ class RedisConnectorCredentialPair(RedisObjectHelper): continue # is the document sync already queued? - if redis_client.hexists(doc.id): + # if redis_client.hexists(doc.id): + # continue + + redis_syncing_key = self.make_redis_syncing_key(doc.id) + if redis_client.exists(redis_syncing_key): continue # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" @@ -114,9 +123,11 @@ class RedisConnectorCredentialPair(RedisObjectHelper): ) # track the doc.id in redis so that we don't resubmit it repeatedly - redis_client.hset( - self.SYNCING_HASH, doc.id, custom_task_id, ex=SYNC_EXPIRATION - ) + # redis_client.hset( + # self.SYNCING_HASH, doc.id, custom_task_id + # ) + + redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION) # Priority on sync's triggered by new indexing should be medium result = celery_app.send_task( diff --git a/backend/onyx/redis/redis_usergroup.py b/backend/onyx/redis/redis_usergroup.py index 3a9a863fa..f7ee6199a 100644 --- a/backend/onyx/redis/redis_usergroup.py +++ b/backend/onyx/redis/redis_usergroup.py @@ -12,6 +12,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.utils.variable_functionality import fetch_versioned_implementation from onyx.utils.variable_functionality import global_version @@ -73,6 +74,7 @@ class RedisUserGroup(RedisObjectHelper): stmt = construct_document_select_by_usergroup(int(self._id)) for doc in db_session.scalars(stmt).yield_per(1): + doc = cast(Document, doc) current_time = time.monotonic() if current_time - last_lock_time >= ( CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4