Bugfix/celery light backoff (#2880)

* logging cleanup

* raise vespa_timeout to 15 by default

* implement backoff for document index methods specifically

* do not retry on 400 BAD_REQUEST

* handle RetryError

* actually check status code and fix type errors
This commit is contained in:
rkuo-danswer
2024-10-28 09:14:51 -07:00
committed by GitHub
parent 52bd1ad8ef
commit 1d89fea73e
7 changed files with 143 additions and 38 deletions

View File

@ -81,7 +81,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
) )
except Exception: except Exception:
task_logger.exception("Unexpected exception") task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()

View File

@ -175,7 +175,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
) )
if attempt_id: if attempt_id:
task_logger.info( task_logger.info(
f"Indexing queued: cc_pair_id={cc_pair.id} index_attempt_id={attempt_id}" f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}"
) )
tasks_created += 1 tasks_created += 1
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
@ -183,7 +183,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
) )
except Exception: except Exception:
task_logger.exception("Unexpected exception") task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
@ -366,7 +366,12 @@ def try_creating_indexing_task(
r.set(rci.fence_key, fence_value.model_dump_json()) r.set(rci.fence_key, fence_value.model_dump_json())
except Exception: except Exception:
r.delete(rci.fence_key) r.delete(rci.fence_key)
task_logger.exception("Unexpected exception") task_logger.exception(
f"Unexpected exception: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
return None return None
finally: finally:
if lock.owned(): if lock.owned():
@ -470,10 +475,9 @@ def connector_indexing_task(
# read related data and evaluate/print task progress # read related data and evaluate/print task progress
fence_value = cast(bytes, r.get(rci.fence_key)) fence_value = cast(bytes, r.get(rci.fence_key))
if fence_value is None: if fence_value is None:
task_logger.info( raise ValueError(
f"connector_indexing_task: fence_value not found: fence={rci.fence_key}" f"connector_indexing_task: fence_value not found: fence={rci.fence_key}"
) )
raise RuntimeError(f"Fence not found: fence={rci.fence_key}")
try: try:
fence_json = fence_value.decode("utf-8") fence_json = fence_value.decode("utf-8")

View File

@ -79,13 +79,13 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
if not tasks_created: if not tasks_created:
continue continue
task_logger.info(f"Pruning queued: cc_pair_id={cc_pair.id}") task_logger.info(f"Pruning queued: cc_pair={cc_pair.id}")
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
task_logger.info( task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
) )
except Exception: except Exception:
task_logger.exception("Unexpected exception") task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
@ -201,7 +201,7 @@ def try_creating_prune_generator_task(
# set this only after all tasks have been added # set this only after all tasks have been added
r.set(rcp.fence_key, 1) r.set(rcp.fence_key, 1)
except Exception: except Exception:
task_logger.exception("Unexpected exception") task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}")
return None return None
finally: finally:
if lock.owned(): if lock.owned():
@ -300,7 +300,7 @@ def connector_pruning_generator_task(
rcp.documents_to_prune = set(doc_ids_to_remove) rcp.documents_to_prune = set(doc_ids_to_remove)
task_logger.info( task_logger.info(
f"RedisConnectorPruning.generate_tasks starting. cc_pair_id={cc_pair_id}" f"RedisConnectorPruning.generate_tasks starting. cc_pair={cc_pair.id}"
) )
tasks_generated = rcp.generate_tasks( tasks_generated = rcp.generate_tasks(
self.app, db_session, r, None, tenant_id self.app, db_session, r, None, tenant_id
@ -310,7 +310,7 @@ def connector_pruning_generator_task(
task_logger.info( task_logger.info(
f"RedisConnectorPruning.generate_tasks finished. " f"RedisConnectorPruning.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" f"cc_pair={cc_pair.id} tasks_generated={tasks_generated}"
) )
r.set(rcp.generator_complete_key, tasks_generated) r.set(rcp.generator_complete_key, tasks_generated)

View File

@ -0,0 +1,40 @@
import httpx
from tenacity import retry
from tenacity import retry_if_exception_type
from tenacity import stop_after_delay
from tenacity import wait_random_exponential
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import VespaDocumentFields
class RetryDocumentIndex:
"""A wrapper class to help with specific retries against Vespa involving
read timeouts.
wait_random_exponential implements full jitter as per this article:
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/"""
MAX_WAIT = 30
# STOP_AFTER + MAX_WAIT should be slightly less (5?) than the celery soft_time_limit
STOP_AFTER = 70
def __init__(self, index: DocumentIndex):
self.index: DocumentIndex = index
@retry(
retry=retry_if_exception_type(httpx.ReadTimeout),
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def delete_single(self, doc_id: str) -> int:
return self.index.delete_single(doc_id)
@retry(
retry=retry_if_exception_type(httpx.ReadTimeout),
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
return self.index.update_single(doc_id, fields)

View File

@ -1,9 +1,14 @@
from http import HTTPStatus
import httpx
from celery import shared_task from celery import shared_task
from celery import Task from celery import Task
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from tenacity import RetryError
from danswer.access.access import get_access_for_document from danswer.access.access import get_access_for_document
from danswer.background.celery.apps.app_base import task_logger from danswer.background.celery.apps.app_base import task_logger
from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
from danswer.db.document import delete_document_by_connector_credential_pair__no_commit from danswer.db.document import delete_document_by_connector_credential_pair__no_commit
from danswer.db.document import delete_documents_complete__no_commit from danswer.db.document import delete_documents_complete__no_commit
from danswer.db.document import get_document from danswer.db.document import get_document
@ -20,12 +25,17 @@ from danswer.server.documents.models import ConnectorCredentialPairIdentifier
DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3 DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3
# 5 seconds more than RetryDocumentIndex STOP_AFTER+MAX_WAIT
LIGHT_SOFT_TIME_LIMIT = 105
LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15
@shared_task( @shared_task(
name="document_by_cc_pair_cleanup_task", name="document_by_cc_pair_cleanup_task",
bind=True, soft_time_limit=LIGHT_SOFT_TIME_LIMIT,
soft_time_limit=45, time_limit=LIGHT_TIME_LIMIT,
time_limit=60,
max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES, max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES,
bind=True,
) )
def document_by_cc_pair_cleanup_task( def document_by_cc_pair_cleanup_task(
self: Task, self: Task,
@ -49,7 +59,7 @@ def document_by_cc_pair_cleanup_task(
connector / credential pair from the access list connector / credential pair from the access list
(6) delete all relevant entries from postgres (6) delete all relevant entries from postgres
""" """
task_logger.info(f"tenant_id={tenant_id} document_id={document_id}") task_logger.info(f"tenant={tenant_id} doc={document_id}")
try: try:
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
@ -57,17 +67,19 @@ def document_by_cc_pair_cleanup_task(
chunks_affected = 0 chunks_affected = 0
curr_ind_name, sec_ind_name = get_both_index_names(db_session) curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index( doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
) )
retry_index = RetryDocumentIndex(doc_index)
count = get_document_connector_count(db_session, document_id) count = get_document_connector_count(db_session, document_id)
if count == 1: if count == 1:
# count == 1 means this is the only remaining cc_pair reference to the doc # count == 1 means this is the only remaining cc_pair reference to the doc
# delete it from vespa and the db # delete it from vespa and the db
action = "delete" action = "delete"
chunks_affected = document_index.delete_single(document_id) chunks_affected = retry_index.delete_single(document_id)
delete_documents_complete__no_commit( delete_documents_complete__no_commit(
db_session=db_session, db_session=db_session,
document_ids=[document_id], document_ids=[document_id],
@ -97,9 +109,7 @@ def document_by_cc_pair_cleanup_task(
) )
# update Vespa. OK if doc doesn't exist. Raises exception otherwise. # update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = document_index.update_single( chunks_affected = retry_index.update_single(document_id, fields=fields)
document_id, fields=fields
)
# there are still other cc_pair references to the doc, so just resync to Vespa # there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit( delete_document_by_connector_credential_pair__no_commit(
@ -118,19 +128,41 @@ def document_by_cc_pair_cleanup_task(
db_session.commit() db_session.commit()
task_logger.info( task_logger.info(
f"tenant_id={tenant_id} " f"tenant={tenant_id} "
f"document_id={document_id} " f"doc={document_id} "
f"action={action} " f"action={action} "
f"refcount={count} " f"refcount={count} "
f"chunks={chunks_affected}" f"chunks={chunks_affected}"
) )
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
task_logger.info( task_logger.info(
f"SoftTimeLimitExceeded exception. tenant_id={tenant_id} doc_id={document_id}" f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}"
) )
return False return False
except Exception as e: except Exception as ex:
task_logger.exception("Unexpected exception") if isinstance(ex, RetryError):
task_logger.info(f"Retry failed: {ex.last_attempt.attempt_number}")
# only set the inner exception if it is of type Exception
e_temp = ex.last_attempt.exception()
if isinstance(e_temp, Exception):
e = e_temp
else:
e = ex
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: "
f"tenant={tenant_id} "
f"doc={document_id} "
f"status={e.response.status_code}"
)
return False
task_logger.exception(
f"Unexpected exception: tenant={tenant_id} doc={document_id}"
)
if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
# Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
@ -141,7 +173,7 @@ def document_by_cc_pair_cleanup_task(
# eventually gets fixed out of band via stale document reconciliation # eventually gets fixed out of band via stale document reconciliation
task_logger.info( task_logger.info(
f"Max retries reached. Marking doc as dirty for reconciliation: " f"Max retries reached. Marking doc as dirty for reconciliation: "
f"tenant_id={tenant_id} document_id={document_id}" f"tenant={tenant_id} doc={document_id}"
) )
with get_session_with_tenant(tenant_id): with get_session_with_tenant(tenant_id):
mark_document_as_modified(document_id, db_session) mark_document_as_modified(document_id, db_session)

View File

@ -4,6 +4,7 @@ from datetime import timezone
from http import HTTPStatus from http import HTTPStatus
from typing import cast from typing import cast
import httpx
import redis import redis
from celery import Celery from celery import Celery
from celery import shared_task from celery import shared_task
@ -13,6 +14,7 @@ from celery.result import AsyncResult
from celery.states import READY_STATES from celery.states import READY_STATES
from redis import Redis from redis import Redis
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from tenacity import RetryError
from danswer.access.access import get_access_for_document from danswer.access.access import get_access_for_document
from danswer.background.celery.apps.app_base import task_logger from danswer.background.celery.apps.app_base import task_logger
@ -29,6 +31,9 @@ from danswer.background.celery.tasks.shared.RedisConnectorDeletionFenceData impo
from danswer.background.celery.tasks.shared.RedisConnectorIndexingFenceData import ( from danswer.background.celery.tasks.shared.RedisConnectorIndexingFenceData import (
RedisConnectorIndexingFenceData, RedisConnectorIndexingFenceData,
) )
from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
from danswer.background.celery.tasks.shared.tasks import LIGHT_SOFT_TIME_LIMIT
from danswer.background.celery.tasks.shared.tasks import LIGHT_TIME_LIMIT
from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.app_configs import JOB_TIMEOUT
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryQueues from danswer.configs.constants import DanswerCeleryQueues
@ -152,7 +157,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
"Soft time limit exceeded, task is being terminated gracefully." "Soft time limit exceeded, task is being terminated gracefully."
) )
except Exception: except Exception:
task_logger.exception("Unexpected exception") task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally: finally:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
@ -809,22 +814,22 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
@shared_task( @shared_task(
name="vespa_metadata_sync_task", name="vespa_metadata_sync_task",
bind=True, bind=True,
soft_time_limit=45, soft_time_limit=LIGHT_SOFT_TIME_LIMIT,
time_limit=60, time_limit=LIGHT_TIME_LIMIT,
max_retries=3, max_retries=3,
) )
def vespa_metadata_sync_task( def vespa_metadata_sync_task(
self: Task, document_id: str, tenant_id: str | None self: Task, document_id: str, tenant_id: str | None
) -> bool: ) -> bool:
task_logger.info(f"document_id={document_id}")
try: try:
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
curr_ind_name, sec_ind_name = get_both_index_names(db_session) curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index( doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
) )
retry_index = RetryDocumentIndex(doc_index)
doc = get_document(document_id, db_session) doc = get_document(document_id, db_session)
if not doc: if not doc:
return False return False
@ -846,19 +851,43 @@ def vespa_metadata_sync_task(
) )
# update Vespa. OK if doc doesn't exist. Raises exception otherwise. # update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = document_index.update_single(document_id, fields=fields) chunks_affected = retry_index.update_single(document_id, fields)
# update db last. Worst case = we crash right before this and # update db last. Worst case = we crash right before this and
# the sync might repeat again later # the sync might repeat again later
mark_document_as_synced(document_id, db_session) mark_document_as_synced(document_id, db_session)
task_logger.info( task_logger.info(
f"document_id={document_id} action=sync chunks={chunks_affected}" f"tenant={tenant_id} doc={document_id} action=sync chunks={chunks_affected}"
) )
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc_id={document_id}") task_logger.info(
except Exception as e: f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}"
task_logger.exception("Unexpected exception") )
except Exception as ex:
if isinstance(ex, RetryError):
task_logger.warning(f"Retry failed: {ex.last_attempt.attempt_number}")
# only set the inner exception if it is of type Exception
e_temp = ex.last_attempt.exception()
if isinstance(e_temp, Exception):
e = e_temp
else:
e = ex
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: "
f"tenant={tenant_id} "
f"doc={document_id} "
f"status={e.response.status_code}"
)
return False
task_logger.exception(
f"Unexpected exception: tenant={tenant_id} doc={document_id}"
)
# Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
countdown = 2 ** (self.request.retries + 4) countdown = 2 ** (self.request.retries + 4)

View File

@ -437,7 +437,7 @@ CUSTOM_ANSWER_VALIDITY_CONDITIONS = json.loads(
os.environ.get("CUSTOM_ANSWER_VALIDITY_CONDITIONS", "[]") os.environ.get("CUSTOM_ANSWER_VALIDITY_CONDITIONS", "[]")
) )
VESPA_REQUEST_TIMEOUT = int(os.environ.get("VESPA_REQUEST_TIMEOUT") or "5") VESPA_REQUEST_TIMEOUT = int(os.environ.get("VESPA_REQUEST_TIMEOUT") or "15")
SYSTEM_RECURSION_LIMIT = int(os.environ.get("SYSTEM_RECURSION_LIMIT") or "1000") SYSTEM_RECURSION_LIMIT = int(os.environ.get("SYSTEM_RECURSION_LIMIT") or "1000")