mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-29 11:12:02 +01:00
tighter signaling to prevent indexing cleanup from hitting tasks that are just starting (#2867)
* better indexing synchronization * add logging for fence wait * handle the task not creating * add more logging * add more logging * raise retry count
This commit is contained in:
parent
802086ee57
commit
e4779c29a7
@ -255,7 +255,19 @@ def try_creating_indexing_task(
|
||||
|
||||
custom_task_id = f"{rci.generator_task_id_prefix}_{uuid4()}"
|
||||
|
||||
# create the index attempt ... just for tracking purposes
|
||||
# set a basic fence to start
|
||||
fence_value = RedisConnectorIndexingFenceData(
|
||||
index_attempt_id=None,
|
||||
started=None,
|
||||
submitted=datetime.now(timezone.utc),
|
||||
celery_task_id=None,
|
||||
)
|
||||
r.set(rci.fence_key, fence_value.model_dump_json())
|
||||
|
||||
# create the index attempt for tracking purposes
|
||||
# code elsewhere checks for index attempts without an associated redis key
|
||||
# and cleans them up
|
||||
# therefore we must create the attempt and the task after the fence goes up
|
||||
index_attempt_id = create_index_attempt(
|
||||
cc_pair.id,
|
||||
search_settings.id,
|
||||
@ -276,17 +288,19 @@ def try_creating_indexing_task(
|
||||
priority=DanswerCeleryPriority.MEDIUM,
|
||||
)
|
||||
if not result:
|
||||
return None
|
||||
raise RuntimeError("send_task for connector_indexing_proxy_task failed.")
|
||||
|
||||
# set this only after all tasks have been added
|
||||
# now fill out the fence with the rest of the data
|
||||
fence_value = RedisConnectorIndexingFenceData(
|
||||
index_attempt_id=index_attempt_id,
|
||||
started=None,
|
||||
submitted=datetime.now(timezone.utc),
|
||||
celery_task_id=result.id,
|
||||
)
|
||||
|
||||
r.set(rci.fence_key, fence_value.model_dump_json())
|
||||
except Exception:
|
||||
r.delete(rci.fence_key)
|
||||
task_logger.exception("Unexpected exception")
|
||||
return None
|
||||
finally:
|
||||
@ -371,6 +385,38 @@ def connector_indexing_task(
|
||||
|
||||
rci = RedisConnectorIndexing(cc_pair_id, search_settings_id)
|
||||
|
||||
while True:
|
||||
# read related data and evaluate/print task progress
|
||||
fence_value = cast(bytes, r.get(rci.fence_key))
|
||||
if fence_value is None:
|
||||
task_logger.info(
|
||||
f"connector_indexing_task: fence_value not found: fence={rci.fence_key}"
|
||||
)
|
||||
raise
|
||||
|
||||
try:
|
||||
fence_json = fence_value.decode("utf-8")
|
||||
fence_data = RedisConnectorIndexingFenceData.model_validate_json(
|
||||
cast(str, fence_json)
|
||||
)
|
||||
except ValueError:
|
||||
task_logger.exception(
|
||||
f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}"
|
||||
)
|
||||
raise
|
||||
|
||||
if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
|
||||
task_logger.info(
|
||||
f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}"
|
||||
)
|
||||
sleep(1)
|
||||
continue
|
||||
|
||||
task_logger.info(
|
||||
f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}"
|
||||
)
|
||||
break
|
||||
|
||||
lock = r.lock(
|
||||
rci.generator_lock_key,
|
||||
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
|
||||
|
@ -21,10 +21,10 @@ from danswer.server.documents.models import ConnectorCredentialPairIdentifier
|
||||
|
||||
|
||||
class RedisConnectorIndexingFenceData(BaseModel):
|
||||
index_attempt_id: int
|
||||
index_attempt_id: int | None
|
||||
started: datetime | None
|
||||
submitted: datetime
|
||||
celery_task_id: str
|
||||
celery_task_id: str | None
|
||||
|
||||
|
||||
@shared_task(
|
||||
|
@ -574,6 +574,10 @@ def monitor_ccpair_indexing_taskset(
|
||||
"monitor_ccpair_indexing_taskset: generator_progress_value is not an integer."
|
||||
)
|
||||
|
||||
if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
|
||||
# the task is still setting up
|
||||
return
|
||||
|
||||
# Read result state BEFORE generator_complete_key to avoid a race condition
|
||||
result: AsyncResult = AsyncResult(fence_data.celery_task_id)
|
||||
result_state = result.state
|
||||
|
@ -373,7 +373,7 @@ class WebConnector(LoadConnector):
|
||||
page.close()
|
||||
except Exception as e:
|
||||
last_error = f"Failed to fetch '{current_url}': {e}"
|
||||
logger.error(last_error)
|
||||
logger.exception(last_error)
|
||||
playwright.stop()
|
||||
restart_playwright = True
|
||||
continue
|
||||
|
@ -118,7 +118,7 @@ def get_existing_documents_from_chunks(
|
||||
return document_ids
|
||||
|
||||
|
||||
@retry(tries=3, delay=1, backoff=2)
|
||||
@retry(tries=5, delay=1, backoff=2)
|
||||
def _index_vespa_chunk(
|
||||
chunk: DocMetadataAwareIndexChunk,
|
||||
index_name: str,
|
||||
|
@ -246,17 +246,17 @@ class CCPairManager:
|
||||
fetched_cc_pair.last_success
|
||||
and fetched_cc_pair.last_success > after
|
||||
):
|
||||
print(f"CC pair {cc_pair.id} indexing complete.")
|
||||
print(f"Indexing complete: cc_pair={cc_pair.id}")
|
||||
return
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
if elapsed > timeout:
|
||||
raise TimeoutError(
|
||||
f"CC pair {cc_pair.id} indexing was not completed within {timeout} seconds"
|
||||
f"Indexing wait timed out: cc_pair={cc_pair.id} timeout={timeout}s"
|
||||
)
|
||||
|
||||
print(
|
||||
f"CC pair {cc_pair.id} indexing to complete. elapsed={elapsed:.2f} timeout={timeout}"
|
||||
f"Indexing wait for completion: cc_pair={cc_pair.id} elapsed={elapsed:.2f} timeout={timeout}s"
|
||||
)
|
||||
time.sleep(5)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user