fix stale indexing tasks being allowed to run after a restart

This commit is contained in:
Richard Kuo (Danswer)
2024-11-05 15:45:13 -08:00
committed by Chris Weaver
parent 08600db41d
commit 001fcb3359
2 changed files with 20 additions and 7 deletions

View File

@@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
) )
if attempt_id: if attempt_id:
task_logger.info( task_logger.info(
f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}" f"Indexing queued: index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings_instance.id} "
) )
tasks_created += 1 tasks_created += 1
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
@@ -529,6 +531,13 @@ def connector_indexing_task(
sleep(1) sleep(1)
continue continue
if payload.index_attempt_id != index_attempt_id:
raise ValueError(
f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
f"task_index_attempt={index_attempt_id} "
f"payload_index_attempt={payload.index_attempt_id}"
)
logger.info( logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}" f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}"
) )
@@ -614,7 +623,6 @@ def connector_indexing_task(
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e)) mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))
redis_connector_index.reset()
raise e raise e
finally: finally:
if lock.owned(): if lock.owned():

View File

@@ -690,12 +690,17 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
for a in attempts: for a in attempts:
# if attempts exist in the db but we don't detect them in redis, mark them as failed # if attempts exist in the db but we don't detect them in redis, mark them as failed
failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart." fence_key = RedisConnectorIndex.fence_key_with_ids(
if not r.exists( a.connector_credential_pair_id, a.search_settings_id
RedisConnectorIndex.fence_key_with_ids( )
a.connector_credential_pair_id, a.search_settings_id if not r.exists(fence_key):
failure_reason = (
f"Unknown index attempt. Might be left over from a process restart: "
f"index_attempt={a.id} "
f"cc_pair={a.connector_credential_pair_id} "
f"search_settings={a.search_settings_id}"
) )
): task_logger.warning(failure_reason)
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason) mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)
lock_beat.reacquire() lock_beat.reacquire()