comment out the per doc sync hack (#3620)

* comment out the per doc sync hack

* fix commented code

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-01-07 11:44:15 -08:00 committed by GitHub
parent 5b5c1166ca
commit 7cd76ec404
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 16 additions and 23 deletions

View File

@ -955,12 +955,14 @@ def vespa_metadata_sync_task(
# the sync might repeat again later # the sync might repeat again later
mark_document_as_synced(document_id, db_session) mark_document_as_synced(document_id, db_session)
redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key( # this code checks for and removes a per document sync key that is
document_id # used to block out the same doc from continualy resyncing
) # a quick hack that is only needed for production issues
r = get_redis_client(tenant_id=tenant_id) # redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key(
r.delete(redis_syncing_key) # document_id
# r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id) # )
# r = get_redis_client(tenant_id=tenant_id)
# r.delete(redis_syncing_key)
task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}")
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:

View File

@ -30,7 +30,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
FENCE_PREFIX = PREFIX + "_fence" FENCE_PREFIX = PREFIX + "_fence"
TASKSET_PREFIX = PREFIX + "_taskset" TASKSET_PREFIX = PREFIX + "_taskset"
# SYNCING_HASH = PREFIX + ":vespa_syncing"
SYNCING_PREFIX = PREFIX + ":vespa_syncing" SYNCING_PREFIX = PREFIX + ":vespa_syncing"
def __init__(self, tenant_id: str | None, id: int) -> None: def __init__(self, tenant_id: str | None, id: int) -> None:
@ -61,6 +60,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
@staticmethod @staticmethod
def make_redis_syncing_key(doc_id: str) -> str: def make_redis_syncing_key(doc_id: str) -> str:
"""used to create a key in redis to block a doc from syncing"""
return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"
def generate_tasks( def generate_tasks(
@ -71,9 +71,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
lock: RedisLock, lock: RedisLock,
tenant_id: str | None, tenant_id: str | None,
) -> tuple[int, int] | None: ) -> tuple[int, int] | None:
# an arbitrary number in seconds to prevent the same doc from syncing repeatedly
SYNC_EXPIRATION = 24 * 60 * 60
last_lock_time = time.monotonic() last_lock_time = time.monotonic()
async_results = [] async_results = []
@ -102,13 +99,14 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
if doc.id in self.skip_docs: if doc.id in self.skip_docs:
continue continue
# is the document sync already queued? # an arbitrary number in seconds to prevent the same doc from syncing repeatedly
# if redis_client.hexists(doc.id): # SYNC_EXPIRATION = 24 * 60 * 60
# continue
redis_syncing_key = self.make_redis_syncing_key(doc.id) # a quick hack that can be uncommented to prevent a doc from resyncing over and over
if redis_client.exists(redis_syncing_key): # redis_syncing_key = self.make_redis_syncing_key(doc.id)
continue # if redis_client.exists(redis_syncing_key):
# continue
# redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
# the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
@ -122,13 +120,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
RedisConnectorCredentialPair.get_taskset_key(), custom_task_id RedisConnectorCredentialPair.get_taskset_key(), custom_task_id
) )
# track the doc.id in redis so that we don't resubmit it repeatedly
# redis_client.hset(
# self.SYNCING_HASH, doc.id, custom_task_id
# )
redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)
# Priority on sync's triggered by new indexing should be medium # Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task( result = celery_app.send_task(
OnyxCeleryTask.VESPA_METADATA_SYNC_TASK, OnyxCeleryTask.VESPA_METADATA_SYNC_TASK,