From 1d89fea73ed4aabe836f18f80c56f9df296f8a50 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Mon, 28 Oct 2024 09:14:51 -0700 Subject: [PATCH] Bugfix/celery light backoff (#2880) * logging cleanup * raise vespa_timeout to 15 by default * implement backoff for document index methods specifically * do not retry on 400 BAD_REQUEST * handle RetryError * actually check status code and fix type errors --- .../celery/tasks/connector_deletion/tasks.py | 2 +- .../background/celery/tasks/indexing/tasks.py | 14 +++-- .../background/celery/tasks/pruning/tasks.py | 10 +-- .../celery/tasks/shared/RetryDocumentIndex.py | 40 ++++++++++++ .../background/celery/tasks/shared/tasks.py | 62 ++++++++++++++----- .../background/celery/tasks/vespa/tasks.py | 51 +++++++++++---- backend/danswer/configs/app_configs.py | 2 +- 7 files changed, 143 insertions(+), 38 deletions(-) create mode 100644 backend/danswer/background/celery/tasks/shared/RetryDocumentIndex.py diff --git a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py index 59d236cde..0b8c98c18 100644 --- a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py @@ -81,7 +81,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception("Unexpected exception") + task_logger.exception(f"Unexpected exception: tenant={tenant_id}") finally: if lock_beat.owned(): lock_beat.release() diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 1d8723f11..7c1460cbd 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -175,7 +175,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: ) if attempt_id: task_logger.info( - f"Indexing queued: cc_pair_id={cc_pair.id} index_attempt_id={attempt_id}" + f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}" ) tasks_created += 1 except SoftTimeLimitExceeded: @@ -183,7 +183,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception("Unexpected exception") + task_logger.exception(f"Unexpected exception: tenant={tenant_id}") finally: if lock_beat.owned(): lock_beat.release() @@ -366,7 +366,12 @@ def try_creating_indexing_task( r.set(rci.fence_key, fence_value.model_dump_json()) except Exception: r.delete(rci.fence_key) - task_logger.exception("Unexpected exception") + task_logger.exception( + f"Unexpected exception: " + f"tenant={tenant_id} " + f"cc_pair={cc_pair.id} " + f"search_settings={search_settings.id}" + ) return None finally: if lock.owned(): @@ -470,10 +475,9 @@ def connector_indexing_task( # read related data and evaluate/print task progress fence_value = cast(bytes, r.get(rci.fence_key)) if fence_value is None: - task_logger.info( + raise ValueError( f"connector_indexing_task: fence_value not found: fence={rci.fence_key}" ) - raise RuntimeError(f"Fence not found: fence={rci.fence_key}") try: fence_json = fence_value.decode("utf-8") diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index 2e68986e8..d9579ccf9 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -79,13 +79,13 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: if not tasks_created: continue - task_logger.info(f"Pruning queued: cc_pair_id={cc_pair.id}") + task_logger.info(f"Pruning queued: cc_pair={cc_pair.id}") except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception("Unexpected exception") + task_logger.exception(f"Unexpected exception: tenant={tenant_id}") finally: if lock_beat.owned(): lock_beat.release() @@ -201,7 +201,7 @@ def try_creating_prune_generator_task( # set this only after all tasks have been added r.set(rcp.fence_key, 1) except Exception: - task_logger.exception("Unexpected exception") + task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}") return None finally: if lock.owned(): @@ -300,7 +300,7 @@ def connector_pruning_generator_task( rcp.documents_to_prune = set(doc_ids_to_remove) task_logger.info( - f"RedisConnectorPruning.generate_tasks starting. cc_pair_id={cc_pair_id}" + f"RedisConnectorPruning.generate_tasks starting. cc_pair={cc_pair.id}" ) tasks_generated = rcp.generate_tasks( self.app, db_session, r, None, tenant_id @@ -310,7 +310,7 @@ def connector_pruning_generator_task( task_logger.info( f"RedisConnectorPruning.generate_tasks finished. " - f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" + f"cc_pair={cc_pair.id} tasks_generated={tasks_generated}" ) r.set(rcp.generator_complete_key, tasks_generated) diff --git a/backend/danswer/background/celery/tasks/shared/RetryDocumentIndex.py b/backend/danswer/background/celery/tasks/shared/RetryDocumentIndex.py new file mode 100644 index 000000000..bdaca0d81 --- /dev/null +++ b/backend/danswer/background/celery/tasks/shared/RetryDocumentIndex.py @@ -0,0 +1,40 @@ +import httpx +from tenacity import retry +from tenacity import retry_if_exception_type +from tenacity import stop_after_delay +from tenacity import wait_random_exponential + +from danswer.document_index.interfaces import DocumentIndex +from danswer.document_index.interfaces import VespaDocumentFields + + +class RetryDocumentIndex: + """A wrapper class to help with specific retries against Vespa involving + read timeouts. + + wait_random_exponential implements full jitter as per this article: + https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/""" + + MAX_WAIT = 30 + + # STOP_AFTER + MAX_WAIT should be slightly less (5?) than the celery soft_time_limit + STOP_AFTER = 70 + + def __init__(self, index: DocumentIndex): + self.index: DocumentIndex = index + + @retry( + retry=retry_if_exception_type(httpx.ReadTimeout), + wait=wait_random_exponential(multiplier=1, max=MAX_WAIT), + stop=stop_after_delay(STOP_AFTER), + ) + def delete_single(self, doc_id: str) -> int: + return self.index.delete_single(doc_id) + + @retry( + retry=retry_if_exception_type(httpx.ReadTimeout), + wait=wait_random_exponential(multiplier=1, max=MAX_WAIT), + stop=stop_after_delay(STOP_AFTER), + ) + def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: + return self.index.update_single(doc_id, fields) diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 52a49d467..116e7e1ff 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -1,9 +1,14 @@ +from http import HTTPStatus + +import httpx from celery import shared_task from celery import Task from celery.exceptions import SoftTimeLimitExceeded +from tenacity import RetryError from danswer.access.access import get_access_for_document from danswer.background.celery.apps.app_base import task_logger +from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex from danswer.db.document import delete_document_by_connector_credential_pair__no_commit from danswer.db.document import delete_documents_complete__no_commit from danswer.db.document import get_document @@ -20,12 +25,17 @@ from danswer.server.documents.models import ConnectorCredentialPairIdentifier DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3 +# 5 seconds more than RetryDocumentIndex STOP_AFTER+MAX_WAIT +LIGHT_SOFT_TIME_LIMIT = 105 +LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15 + + @shared_task( name="document_by_cc_pair_cleanup_task", - bind=True, - soft_time_limit=45, - time_limit=60, + soft_time_limit=LIGHT_SOFT_TIME_LIMIT, + time_limit=LIGHT_TIME_LIMIT, max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES, + bind=True, ) def document_by_cc_pair_cleanup_task( self: Task, @@ -49,7 +59,7 @@ def document_by_cc_pair_cleanup_task( connector / credential pair from the access list (6) delete all relevant entries from postgres """ - task_logger.info(f"tenant_id={tenant_id} document_id={document_id}") + task_logger.info(f"tenant={tenant_id} doc={document_id}") try: with get_session_with_tenant(tenant_id) as db_session: @@ -57,17 +67,19 @@ def document_by_cc_pair_cleanup_task( chunks_affected = 0 curr_ind_name, sec_ind_name = get_both_index_names(db_session) - document_index = get_default_document_index( + doc_index = get_default_document_index( primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name ) + retry_index = RetryDocumentIndex(doc_index) + count = get_document_connector_count(db_session, document_id) if count == 1: # count == 1 means this is the only remaining cc_pair reference to the doc # delete it from vespa and the db action = "delete" - chunks_affected = document_index.delete_single(document_id) + chunks_affected = retry_index.delete_single(document_id) delete_documents_complete__no_commit( db_session=db_session, document_ids=[document_id], @@ -97,9 +109,7 @@ def document_by_cc_pair_cleanup_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - chunks_affected = document_index.update_single( - document_id, fields=fields - ) + chunks_affected = retry_index.update_single(document_id, fields=fields) # there are still other cc_pair references to the doc, so just resync to Vespa delete_document_by_connector_credential_pair__no_commit( @@ -118,19 +128,41 @@ def document_by_cc_pair_cleanup_task( db_session.commit() task_logger.info( - f"tenant_id={tenant_id} " - f"document_id={document_id} " + f"tenant={tenant_id} " + f"doc={document_id} " f"action={action} " f"refcount={count} " f"chunks={chunks_affected}" ) except SoftTimeLimitExceeded: task_logger.info( - f"SoftTimeLimitExceeded exception. tenant_id={tenant_id} doc_id={document_id}" + f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" ) return False - except Exception as e: - task_logger.exception("Unexpected exception") + except Exception as ex: + if isinstance(ex, RetryError): + task_logger.info(f"Retry failed: {ex.last_attempt.attempt_number}") + + # only set the inner exception if it is of type Exception + e_temp = ex.last_attempt.exception() + if isinstance(e_temp, Exception): + e = e_temp + else: + e = ex + + if isinstance(e, httpx.HTTPStatusError): + if e.response.status_code == HTTPStatus.BAD_REQUEST: + task_logger.exception( + f"Non-retryable HTTPStatusError: " + f"tenant={tenant_id} " + f"doc={document_id} " + f"status={e.response.status_code}" + ) + return False + + task_logger.exception( + f"Unexpected exception: tenant={tenant_id} doc={document_id}" + ) if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 @@ -141,7 +173,7 @@ def document_by_cc_pair_cleanup_task( # eventually gets fixed out of band via stale document reconciliation task_logger.info( f"Max retries reached. Marking doc as dirty for reconciliation: " - f"tenant_id={tenant_id} document_id={document_id}" + f"tenant={tenant_id} doc={document_id}" ) with get_session_with_tenant(tenant_id): mark_document_as_modified(document_id, db_session) diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index fcc4d2aa5..b058a97e1 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -4,6 +4,7 @@ from datetime import timezone from http import HTTPStatus from typing import cast +import httpx import redis from celery import Celery from celery import shared_task @@ -13,6 +14,7 @@ from celery.result import AsyncResult from celery.states import READY_STATES from redis import Redis from sqlalchemy.orm import Session +from tenacity import RetryError from danswer.access.access import get_access_for_document from danswer.background.celery.apps.app_base import task_logger @@ -29,6 +31,9 @@ from danswer.background.celery.tasks.shared.RedisConnectorDeletionFenceData impo from danswer.background.celery.tasks.shared.RedisConnectorIndexingFenceData import ( RedisConnectorIndexingFenceData, ) +from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex +from danswer.background.celery.tasks.shared.tasks import LIGHT_SOFT_TIME_LIMIT +from danswer.background.celery.tasks.shared.tasks import LIGHT_TIME_LIMIT from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryQueues @@ -152,7 +157,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception("Unexpected exception") + task_logger.exception(f"Unexpected exception: tenant={tenant_id}") finally: if lock_beat.owned(): lock_beat.release() @@ -809,22 +814,22 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: @shared_task( name="vespa_metadata_sync_task", bind=True, - soft_time_limit=45, - time_limit=60, + soft_time_limit=LIGHT_SOFT_TIME_LIMIT, + time_limit=LIGHT_TIME_LIMIT, max_retries=3, ) def vespa_metadata_sync_task( self: Task, document_id: str, tenant_id: str | None ) -> bool: - task_logger.info(f"document_id={document_id}") - try: with get_session_with_tenant(tenant_id) as db_session: curr_ind_name, sec_ind_name = get_both_index_names(db_session) - document_index = get_default_document_index( + doc_index = get_default_document_index( primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name ) + retry_index = RetryDocumentIndex(doc_index) + doc = get_document(document_id, db_session) if not doc: return False @@ -846,19 +851,43 @@ def vespa_metadata_sync_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - chunks_affected = document_index.update_single(document_id, fields=fields) + chunks_affected = retry_index.update_single(document_id, fields) # update db last. Worst case = we crash right before this and # the sync might repeat again later mark_document_as_synced(document_id, db_session) task_logger.info( - f"document_id={document_id} action=sync chunks={chunks_affected}" + f"tenant={tenant_id} doc={document_id} action=sync chunks={chunks_affected}" ) except SoftTimeLimitExceeded: - task_logger.info(f"SoftTimeLimitExceeded exception. doc_id={document_id}") - except Exception as e: - task_logger.exception("Unexpected exception") + task_logger.info( + f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" + ) + except Exception as ex: + if isinstance(ex, RetryError): + task_logger.warning(f"Retry failed: {ex.last_attempt.attempt_number}") + + # only set the inner exception if it is of type Exception + e_temp = ex.last_attempt.exception() + if isinstance(e_temp, Exception): + e = e_temp + else: + e = ex + + if isinstance(e, httpx.HTTPStatusError): + if e.response.status_code == HTTPStatus.BAD_REQUEST: + task_logger.exception( + f"Non-retryable HTTPStatusError: " + f"tenant={tenant_id} " + f"doc={document_id} " + f"status={e.response.status_code}" + ) + return False + + task_logger.exception( + f"Unexpected exception: tenant={tenant_id} doc={document_id}" + ) # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 countdown = 2 ** (self.request.retries + 4) diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 1e9e0099d..6d6cc6b96 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -437,7 +437,7 @@ CUSTOM_ANSWER_VALIDITY_CONDITIONS = json.loads( os.environ.get("CUSTOM_ANSWER_VALIDITY_CONDITIONS", "[]") ) -VESPA_REQUEST_TIMEOUT = int(os.environ.get("VESPA_REQUEST_TIMEOUT") or "5") +VESPA_REQUEST_TIMEOUT = int(os.environ.get("VESPA_REQUEST_TIMEOUT") or "15") SYSTEM_RECURSION_LIMIT = int(os.environ.get("SYSTEM_RECURSION_LIMIT") or "1000")