Bugfix/connector aborted logging (#3309)

* improve error logging on task failure.

* add db exception hardening to the indexing watchdog

* log on db exception
This commit is contained in:
rkuo-danswer
2024-12-02 18:34:40 -08:00
committed by GitHub
parent 3fb2bfefec
commit 9e9b7ed61d
2 changed files with 51 additions and 22 deletions

View File

@@ -580,25 +580,39 @@ def connector_indexing_proxy_task(
if self.request.id and redis_connector_index.terminating(self.request.id): if self.request.id and redis_connector_index.terminating(self.request.id):
task_logger.warning( task_logger.warning(
"Indexing proxy - termination signal detected: " "Indexing watchdog - termination signal detected: "
f"attempt={index_attempt_id} " f"attempt={index_attempt_id} "
f"tenant={tenant_id} " f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} " f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}" f"search_settings={search_settings_id}"
) )
try:
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_canceled( mark_attempt_canceled(
index_attempt_id, index_attempt_id,
db_session, db_session,
"Connector termination signal detected", "Connector termination signal detected",
) )
finally:
# if the DB exceptions, we'll just get an unfriendly failure message
# in the UI instead of the cancellation message
logger.exception(
"Indexing watchdog - transient exception marking index attempt as canceled: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
job.cancel() job.cancel()
break break
# do nothing for ongoing jobs that haven't been stopped
if not job.done(): if not job.done():
# if the spawned task is still running, restart the check once again
# if the index attempt is not in a finished status
try:
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt( index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id db_session=db_session, index_attempt_id=index_attempt_id
@@ -609,10 +623,21 @@ def connector_indexing_proxy_task(
if not index_attempt.is_finished(): if not index_attempt.is_finished():
continue continue
except Exception:
# if the DB exceptioned, just restart the check.
# polling the index attempt status doesn't need to be strongly consistent
logger.exception(
"Indexing watchdog - transient exception looking up index attempt: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
continue
if job.status == "error": if job.status == "error":
task_logger.error( task_logger.error(
f"Indexing watchdog - spawned task exceptioned: " "Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} " f"attempt={index_attempt_id} "
f"tenant={tenant_id} " f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} " f"cc_pair={cc_pair_id} "

View File

@@ -654,24 +654,28 @@ def monitor_ccpair_indexing_taskset(
# outer = result.state in READY state # outer = result.state in READY state
status_int = redis_connector_index.get_completion() status_int = redis_connector_index.get_completion()
if status_int is None: # inner signal not set ... possible error if status_int is None: # inner signal not set ... possible error
result_state = result.state task_state = result.state
if ( if (
result_state in READY_STATES task_state in READY_STATES
): # outer signal in terminal state ... possible error ): # outer signal in terminal state ... possible error
# Now double check! # Now double check!
if redis_connector_index.get_completion() is None: if redis_connector_index.get_completion() is None:
# inner signal still not set (and cannot change when outer result_state is READY) # inner signal still not set (and cannot change when outer result_state is READY)
# Task is finished but generator complete isn't set. # Task is finished but generator complete isn't set.
# We have a problem! Worker may have crashed. # We have a problem! Worker may have crashed.
task_result = str(result.result)
task_traceback = str(result.traceback)
msg = ( msg = (
f"Connector indexing aborted or exceptioned: " f"Connector indexing aborted or exceptioned: "
f"attempt={payload.index_attempt_id} " f"attempt={payload.index_attempt_id} "
f"celery_task={payload.celery_task_id} " f"celery_task={payload.celery_task_id} "
f"result_state={result_state} "
f"cc_pair={cc_pair_id} " f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} " f"search_settings={search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"result.state={task_state} "
f"result.result={task_result} "
f"result.traceback={task_traceback}"
) )
task_logger.warning(msg) task_logger.warning(msg)