diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 4525b1e9425a..8e445b4f3f41 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -580,39 +580,64 @@ def connector_indexing_proxy_task( if self.request.id and redis_connector_index.terminating(self.request.id): task_logger.warning( - "Indexing proxy - termination signal detected: " + "Indexing watchdog - termination signal detected: " f"attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) - with get_session_with_tenant(tenant_id) as db_session: - mark_attempt_canceled( - index_attempt_id, - db_session, - "Connector termination signal detected", + try: + with get_session_with_tenant(tenant_id) as db_session: + mark_attempt_canceled( + index_attempt_id, + db_session, + "Connector termination signal detected", + ) + finally: + # if the DB exceptions, we'll just get an unfriendly failure message + # in the UI instead of the cancellation message + logger.exception( + "Indexing watchdog - transient exception marking index attempt as canceled: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" ) - job.cancel() + job.cancel() + break - # do nothing for ongoing jobs that haven't been stopped if not job.done(): - with get_session_with_tenant(tenant_id) as db_session: - index_attempt = get_index_attempt( - db_session=db_session, index_attempt_id=index_attempt_id + # if the spawned task is still running, restart the check once again + # if the index attempt is not in a finished status + try: + with get_session_with_tenant(tenant_id) as db_session: + index_attempt = get_index_attempt( + db_session=db_session, index_attempt_id=index_attempt_id + ) + + if not index_attempt: + continue + + if not index_attempt.is_finished(): + continue + except Exception: + # if the DB exceptioned, just restart the check. + # polling the index attempt status doesn't need to be strongly consistent + logger.exception( + "Indexing watchdog - transient exception looking up index attempt: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" ) - - if not index_attempt: - continue - - if not index_attempt.is_finished(): - continue + continue if job.status == "error": task_logger.error( - f"Indexing watchdog - spawned task exceptioned: " + "Indexing watchdog - spawned task exceptioned: " f"attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index f491ff27b235..13835b5d2c6a 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -654,24 +654,28 @@ def monitor_ccpair_indexing_taskset( # outer = result.state in READY state status_int = redis_connector_index.get_completion() if status_int is None: # inner signal not set ... possible error - result_state = result.state + task_state = result.state if ( - result_state in READY_STATES + task_state in READY_STATES ): # outer signal in terminal state ... possible error # Now double check! if redis_connector_index.get_completion() is None: # inner signal still not set (and cannot change when outer result_state is READY) # Task is finished but generator complete isn't set. # We have a problem! Worker may have crashed. + task_result = str(result.result) + task_traceback = str(result.traceback) msg = ( f"Connector indexing aborted or exceptioned: " f"attempt={payload.index_attempt_id} " f"celery_task={payload.celery_task_id} " - f"result_state={result_state} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id} " - f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" + f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} " + f"result.state={task_state} " + f"result.result={task_result} " + f"result.traceback={task_traceback}" ) task_logger.warning(msg)