Fix indexing job cleanup

This commit is contained in:
Weves 2023-09-09 12:41:36 -07:00 committed by Chris Weaver
parent 67c26f89e8
commit e72f26ef53

View File

@ -42,6 +42,10 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
_UNEXPECTED_STATE_FAILURE_REASON = (
"Stopped mid run, likely due to the background process being killed"
)
def should_create_new_indexing(
connector: Connector, last_index: IndexAttempt | None, db_session: Session
@ -55,44 +59,58 @@ def should_create_new_indexing(
return time_since_index.total_seconds() >= connector.refresh_freq
def create_indexing_jobs(db_session: Session, existing_jobs: dict[int, Future]) -> None:
connectors = fetch_connectors(db_session)
# clean up in-progress jobs that were never completed
for connector in connectors:
in_progress_indexing_attempts = get_inprogress_index_attempts(
connector.id, db_session
def mark_run_failed(
db_session: Session, index_attempt: IndexAttempt, failure_reason: str
) -> None:
"""Marks the `index_attempt` row as failed + updates the `
connector_credential_pair` to reflect that the run failed"""
logger.warning(
f"Marking in-progress attempt 'connector: {index_attempt.connector_id}, "
f"credential: {index_attempt.credential_id}' as failed"
)
mark_attempt_failed(
index_attempt=index_attempt,
db_session=db_session,
failure_reason=failure_reason,
)
if (
index_attempt.connector_id is not None
and index_attempt.credential_id is not None
):
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector_id,
credential_id=index_attempt.credential_id,
attempt_status=IndexingStatus.FAILED,
)
for attempt in in_progress_indexing_attempts:
# if a job is still going, don't touch it
if attempt.id in existing_jobs:
continue
logger.warning(
f"Marking in-progress attempt 'connector: {attempt.connector_id}, "
f"credential: {attempt.credential_id}' as failed"
)
mark_attempt_failed(
attempt,
db_session,
failure_reason="Stopped mid run, likely due to the background process being killed",
)
if attempt.connector_id is not None and attempt.credential_id is not None:
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector_id,
credential_id=attempt.credential_id,
attempt_status=IndexingStatus.FAILED,
)
# potentially kick off new runs
enabled_connectors = [
connector for connector in connectors if not connector.disabled
]
def create_indexing_jobs(db_session: Session, existing_jobs: dict[int, Future]) -> None:
"""Creates new indexing jobs for each connector / credential pair which is:
1. Enabled
2. `refresh_frequency` time has passed since the last indexing run for this pair
3. There is not already an ongoing indexing attempt for this pair
"""
ongoing_pairs: set[tuple[int | None, int | None]] = set()
for attempt_id in existing_jobs:
attempt = get_index_attempt(db_session=db_session, index_attempt_id=attempt_id)
if attempt is None:
logger.error(
f"Unable to find IndexAttempt for ID '{attempt_id}' when creating "
"indexing jobs"
)
continue
ongoing_pairs.add((attempt.connector_id, attempt.credential_id))
enabled_connectors = fetch_connectors(db_session, disabled_status=False)
for connector in enabled_connectors:
for association in connector.credentials:
credential = association.credential
# check if there is an ogoing indexing attempt for this connector + credential pair
if (connector.id, credential.id) in ongoing_pairs:
continue
last_attempt = get_last_attempt(connector.id, credential.id, db_session)
if not should_create_new_indexing(connector, last_attempt, db_session):
continue
@ -111,11 +129,12 @@ def cleanup_indexing_jobs(
) -> dict[int, Future]:
existing_jobs_copy = existing_jobs.copy()
# clean up completed jobs
for attempt_id, job in existing_jobs.items():
# do nothing for ongoing jobs
if not job.done():
continue
# cleanup completed job
job.release()
del existing_jobs_copy[attempt_id]
index_attempt = get_index_attempt(
@ -129,24 +148,40 @@ def cleanup_indexing_jobs(
continue
if index_attempt.status == IndexingStatus.IN_PROGRESS:
logger.warning(
f"Marking in-progress attempt 'connector: {index_attempt.connector_id}, "
f"credential: {index_attempt.credential_id}' as failed"
)
mark_attempt_failed(
index_attempt=index_attempt,
mark_run_failed(
db_session=db_session,
failure_reason="Stopped mid run, likely due to the background process being killed",
index_attempt=index_attempt,
failure_reason=_UNEXPECTED_STATE_FAILURE_REASON,
)
if (
index_attempt.connector_id is not None
and index_attempt.credential_id is not None
):
update_connector_credential_pair(
# clean up in-progress jobs that were never completed
connectors = fetch_connectors(db_session)
for connector in connectors:
in_progress_indexing_attempts = get_inprogress_index_attempts(
connector.id, db_session
)
for index_attempt in in_progress_indexing_attempts:
if index_attempt.id in existing_jobs:
# check to see if the job has been updated in the last hour, if not
# assume it to frozen in some bad state and just mark it as failed. Note: this relies
# on the fact that the `time_updated` field is constantly updated every
# batch of documents indexed
current_db_time = get_db_current_time(db_session=db_session)
time_since_update = current_db_time - index_attempt.time_updated
if time_since_update.seconds > 60 * 60:
existing_jobs[index_attempt.id].cancel()
mark_run_failed(
db_session=db_session,
index_attempt=index_attempt,
failure_reason="Indexing run frozen - no updates in last hour. "
"The run will be re-attempted at next scheduled indexing time.",
)
else:
# If job isn't known, simply mark it as failed
mark_run_failed(
db_session=db_session,
connector_id=index_attempt.connector_id,
credential_id=index_attempt.credential_id,
attempt_status=IndexingStatus.FAILED,
index_attempt=index_attempt,
failure_reason=_UNEXPECTED_STATE_FAILURE_REASON,
)
return existing_jobs_copy