From 47767c1666fd53e66ee3e17b03bd26e3ced6e52d Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Tue, 29 Apr 2025 14:23:54 -0700 Subject: [PATCH] Small improvements to checkpoint pickup logic (#4634) --- .../indexing/checkpointing_utils.py | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/backend/onyx/background/indexing/checkpointing_utils.py b/backend/onyx/background/indexing/checkpointing_utils.py index e55e23cfac..15981b77a9 100644 --- a/backend/onyx/background/indexing/checkpointing_utils.py +++ b/backend/onyx/background/indexing/checkpointing_utils.py @@ -20,8 +20,7 @@ from onyx.utils.object_size_check import deep_getsizeof logger = setup_logger() -_NUM_RECENT_ATTEMPTS_TO_CONSIDER = 20 -_NUM_DOCS_INDEXED_TO_BE_VALID_CHECKPOINT = 100 +_NUM_RECENT_ATTEMPTS_TO_CONSIDER = 50 def _build_checkpoint_pointer(index_attempt_id: int) -> str: @@ -80,34 +79,56 @@ def get_latest_valid_checkpoint( db_session=db_session, limit=_NUM_RECENT_ATTEMPTS_TO_CONSIDER, ) + + # don't keep using checkpoints if we've had a bunch of failed attempts in a row + # where we make no progress. Only do this if we have had at least + # _NUM_RECENT_ATTEMPTS_TO_CONSIDER completed attempts. + if len(checkpoint_candidates) == _NUM_RECENT_ATTEMPTS_TO_CONSIDER: + had_any_progress = False + for candidate in checkpoint_candidates: + if ( + candidate.total_docs_indexed is not None + and candidate.total_docs_indexed > 0 + ) or candidate.status.is_successful(): + had_any_progress = True + break + + if not had_any_progress: + logger.warning( + f"{_NUM_RECENT_ATTEMPTS_TO_CONSIDER} consecutive failed attempts without progress " + f"found for cc_pair={cc_pair_id}. Ignoring checkpoint to let the run start " + "from scratch." + ) + return connector.build_dummy_checkpoint() + + # filter out any candidates that don't meet the criteria checkpoint_candidates = [ candidate for candidate in checkpoint_candidates if ( candidate.poll_range_start == window_start and candidate.poll_range_end == window_end - and candidate.status == IndexingStatus.FAILED + and ( + candidate.status == IndexingStatus.FAILED + # if the background job was killed (and thus the attempt was canceled) + # we still want to use the checkpoint so that we can pick up where we left off + or candidate.status == IndexingStatus.CANCELED + ) and candidate.checkpoint_pointer is not None + # NOTE: There are a couple connectors that may make progress but not have + # any "total_docs_indexed". E.g. they are going through + # Slack channels, and tons of them don't have any updates. + # Leaving the below in as historical context / in-case we want to use it again. # we want to make sure that the checkpoint is actually useful # if it's only gone through a few docs, it's probably not worth # using. This also avoids weird cases where a connector is basically # non-functional but still "makes progress" by slowly moving the # checkpoint forward run after run - and candidate.total_docs_indexed - and candidate.total_docs_indexed > _NUM_DOCS_INDEXED_TO_BE_VALID_CHECKPOINT + # and candidate.total_docs_indexed + # and candidate.total_docs_indexed > 100 ) ] - # don't keep using checkpoints if we've had a bunch of failed attempts in a row - # for now, capped at 10 - if len(checkpoint_candidates) == _NUM_RECENT_ATTEMPTS_TO_CONSIDER: - logger.warning( - f"{_NUM_RECENT_ATTEMPTS_TO_CONSIDER} consecutive failed attempts found " - f"for cc_pair={cc_pair_id}. Ignoring checkpoint to let the run start " - "from scratch." - ) - return connector.build_dummy_checkpoint() - # assumes latest checkpoint is the furthest along. This only isn't true # if something else has gone wrong. latest_valid_checkpoint_candidate = ( @@ -116,6 +137,9 @@ def get_latest_valid_checkpoint( checkpoint = connector.build_dummy_checkpoint() if latest_valid_checkpoint_candidate is None: + logger.info( + f"No valid checkpoint found for cc_pair={cc_pair_id}. Starting from scratch." + ) return checkpoint try: