Small improvements to checkpoint pickup logic (#4634)

This commit is contained in:
Chris Weaver 2025-04-29 14:23:54 -07:00 committed by GitHub
parent 9be3da2357
commit 47767c1666
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,8 +20,7 @@ from onyx.utils.object_size_check import deep_getsizeof
logger = setup_logger()
_NUM_RECENT_ATTEMPTS_TO_CONSIDER = 20
_NUM_DOCS_INDEXED_TO_BE_VALID_CHECKPOINT = 100
_NUM_RECENT_ATTEMPTS_TO_CONSIDER = 50
def _build_checkpoint_pointer(index_attempt_id: int) -> str:
@ -80,34 +79,56 @@ def get_latest_valid_checkpoint(
db_session=db_session,
limit=_NUM_RECENT_ATTEMPTS_TO_CONSIDER,
)
# don't keep using checkpoints if we've had a bunch of failed attempts in a row
# where we make no progress. Only do this if we have had at least
# _NUM_RECENT_ATTEMPTS_TO_CONSIDER completed attempts.
if len(checkpoint_candidates) == _NUM_RECENT_ATTEMPTS_TO_CONSIDER:
had_any_progress = False
for candidate in checkpoint_candidates:
if (
candidate.total_docs_indexed is not None
and candidate.total_docs_indexed > 0
) or candidate.status.is_successful():
had_any_progress = True
break
if not had_any_progress:
logger.warning(
f"{_NUM_RECENT_ATTEMPTS_TO_CONSIDER} consecutive failed attempts without progress "
f"found for cc_pair={cc_pair_id}. Ignoring checkpoint to let the run start "
"from scratch."
)
return connector.build_dummy_checkpoint()
# filter out any candidates that don't meet the criteria
checkpoint_candidates = [
candidate
for candidate in checkpoint_candidates
if (
candidate.poll_range_start == window_start
and candidate.poll_range_end == window_end
and candidate.status == IndexingStatus.FAILED
and (
candidate.status == IndexingStatus.FAILED
# if the background job was killed (and thus the attempt was canceled)
# we still want to use the checkpoint so that we can pick up where we left off
or candidate.status == IndexingStatus.CANCELED
)
and candidate.checkpoint_pointer is not None
# NOTE: There are a couple connectors that may make progress but not have
# any "total_docs_indexed". E.g. they are going through
# Slack channels, and tons of them don't have any updates.
# Leaving the below in as historical context / in-case we want to use it again.
# we want to make sure that the checkpoint is actually useful
# if it's only gone through a few docs, it's probably not worth
# using. This also avoids weird cases where a connector is basically
# non-functional but still "makes progress" by slowly moving the
# checkpoint forward run after run
and candidate.total_docs_indexed
and candidate.total_docs_indexed > _NUM_DOCS_INDEXED_TO_BE_VALID_CHECKPOINT
# and candidate.total_docs_indexed
# and candidate.total_docs_indexed > 100
)
]
# don't keep using checkpoints if we've had a bunch of failed attempts in a row
# for now, capped at 10
if len(checkpoint_candidates) == _NUM_RECENT_ATTEMPTS_TO_CONSIDER:
logger.warning(
f"{_NUM_RECENT_ATTEMPTS_TO_CONSIDER} consecutive failed attempts found "
f"for cc_pair={cc_pair_id}. Ignoring checkpoint to let the run start "
"from scratch."
)
return connector.build_dummy_checkpoint()
# assumes latest checkpoint is the furthest along. This only isn't true
# if something else has gone wrong.
latest_valid_checkpoint_candidate = (
@ -116,6 +137,9 @@ def get_latest_valid_checkpoint(
checkpoint = connector.build_dummy_checkpoint()
if latest_valid_checkpoint_candidate is None:
logger.info(
f"No valid checkpoint found for cc_pair={cc_pair_id}. Starting from scratch."
)
return checkpoint
try: