move to just setting keys

This commit is contained in:
Richard Kuo (Danswer) 2025-01-05 03:26:33 -08:00
parent 7fb92d42a0
commit 371d1ccd8f
3 changed files with 23 additions and 6 deletions

View File

@ -898,8 +898,12 @@ def vespa_metadata_sync_task(
# the sync might repeat again later # the sync might repeat again later
mark_document_as_synced(document_id, db_session) mark_document_as_synced(document_id, db_session)
redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key(
document_id
)
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id) r.delete(redis_syncing_key)
# r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id)
task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}")
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:

View File

@ -30,7 +30,8 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
FENCE_PREFIX = PREFIX + "_fence" FENCE_PREFIX = PREFIX + "_fence"
TASKSET_PREFIX = PREFIX + "_taskset" TASKSET_PREFIX = PREFIX + "_taskset"
SYNCING_HASH = PREFIX + ":vespa_syncing" # SYNCING_HASH = PREFIX + ":vespa_syncing"
SYNCING_PREFIX = PREFIX + ":vespa_syncing"
def __init__(self, tenant_id: str | None, id: int) -> None: def __init__(self, tenant_id: str | None, id: int) -> None:
super().__init__(tenant_id, str(id)) super().__init__(tenant_id, str(id))
@ -58,6 +59,10 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
# the list on the fly # the list on the fly
self.skip_docs = skip_docs self.skip_docs = skip_docs
@staticmethod
def make_redis_syncing_key(doc_id: str) -> str:
return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"
def generate_tasks( def generate_tasks(
self, self,
celery_app: Celery, celery_app: Celery,
@ -98,7 +103,11 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
continue continue
# is the document sync already queued? # is the document sync already queued?
if redis_client.hexists(doc.id): # if redis_client.hexists(doc.id):
# continue
redis_syncing_key = self.make_redis_syncing_key(doc.id)
if redis_client.exists(redis_syncing_key):
continue continue
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
@ -114,9 +123,11 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
) )
# track the doc.id in redis so that we don't resubmit it repeatedly # track the doc.id in redis so that we don't resubmit it repeatedly
redis_client.hset( # redis_client.hset(
self.SYNCING_HASH, doc.id, custom_task_id, ex=SYNC_EXPIRATION # self.SYNCING_HASH, doc.id, custom_task_id
) # )
redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)
# Priority on sync's triggered by new indexing should be medium # Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task( result = celery_app.send_task(

View File

@ -12,6 +12,7 @@ from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper from onyx.redis.redis_object_helper import RedisObjectHelper
from onyx.utils.variable_functionality import fetch_versioned_implementation from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.utils.variable_functionality import global_version from onyx.utils.variable_functionality import global_version
@ -73,6 +74,7 @@ class RedisUserGroup(RedisObjectHelper):
stmt = construct_document_select_by_usergroup(int(self._id)) stmt = construct_document_select_by_usergroup(int(self._id))
for doc in db_session.scalars(stmt).yield_per(1): for doc in db_session.scalars(stmt).yield_per(1):
doc = cast(Document, doc)
current_time = time.monotonic() current_time = time.monotonic()
if current_time - last_lock_time >= ( if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4