From 114326d11a63925251ac4ca2dda632a7eeec2b75 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 17 Oct 2024 12:43:34 -0700 Subject: [PATCH] fix sync to use update_single (#2822) --- .../background/celery/tasks/shared/tasks.py | 2 +- .../danswer/background/celery/tasks/vespa/tasks.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index b065122be841..05408d58f977 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -111,7 +111,7 @@ def document_by_cc_pair_cleanup_task( pass task_logger.info( - f"document_id={document_id} refcount={count} action={action} chunks={chunks_affected}" + f"document_id={document_id} action={action} refcount={count} chunks={chunks_affected}" ) db_session.commit() except SoftTimeLimitExceeded: diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index e958d63ad302..c50237b606a8 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -45,7 +45,7 @@ from danswer.db.models import DocumentSet from danswer.db.models import UserGroup from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index -from danswer.document_index.interfaces import UpdateRequest +from danswer.document_index.interfaces import VespaDocumentFields from danswer.redis.redis_pool import get_redis_client from danswer.utils.variable_functionality import fetch_versioned_implementation from danswer.utils.variable_functionality import ( @@ -609,20 +609,24 @@ def vespa_metadata_sync_task( doc_access = get_access_for_document( document_id=document_id, db_session=db_session ) - update_request = UpdateRequest( - document_ids=[document_id], + + fields = VespaDocumentFields( document_sets=update_doc_sets, access=doc_access, boost=doc.boost, hidden=doc.hidden, ) - # update Vespa - document_index.update(update_requests=[update_request]) + # update Vespa. OK if doc doesn't exist. Raises exception otherwise. + chunks_affected = document_index.update_single(document_id, fields=fields) # update db last. Worst case = we crash right before this and # the sync might repeat again later mark_document_as_synced(document_id, db_session) + + task_logger.info( + f"document_id={document_id} action=sync chunks={chunks_affected}" + ) except SoftTimeLimitExceeded: task_logger.info(f"SoftTimeLimitExceeded exception. doc_id={document_id}") except Exception as e: