diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index eb4284ff1eb..0eea0a7aa3b 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -899,6 +899,9 @@ def connector_indexing_proxy_task( TODO(rkuo): refactor this so that there is a single return path where we canonically log the result of running this function. + + NOTE: we try/except all db access in this function because as a watchdog, this function + needs to be extremely stable. """ start = time.monotonic() @@ -1016,7 +1019,7 @@ def connector_indexing_proxy_task( job.release() break - # if a termination signal is detected, clean up and break + # if a termination signal is detected, break (exit point will clean up) if self.request.id and redis_connector_index.terminating(self.request.id): task_logger.warning( log_builder.build("Indexing watchdog - termination signal detected") @@ -1025,6 +1028,7 @@ def connector_indexing_proxy_task( result.status = IndexingWatchdogTerminalStatus.TERMINATED_BY_SIGNAL break + # if activity timeout is detected, break (exit point will clean up) if not redis_connector_index.connector_active(): task_logger.warning( log_builder.build( @@ -1033,25 +1037,6 @@ def connector_indexing_proxy_task( ) ) - try: - with get_session_with_current_tenant() as db_session: - mark_attempt_failed( - index_attempt_id, - db_session, - "Indexing watchdog - activity timeout exceeded: " - f"attempt={index_attempt_id} " - f"timeout={CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s", - ) - except Exception: - # if the DB exceptions, we'll just get an unfriendly failure message - # in the UI instead of the cancellation message - logger.exception( - log_builder.build( - "Indexing watchdog - transient exception marking index attempt as failed" - ) - ) - - job.cancel() result.status = ( IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT ) @@ -1071,8 +1056,6 @@ def connector_indexing_proxy_task( if not index_attempt.is_finished(): continue except Exception: - # if the DB exceptioned, just restart the check. - # polling the index attempt status doesn't need to be strongly consistent task_logger.exception( log_builder.build( "Indexing watchdog - transient exception looking up index attempt" @@ -1139,8 +1122,6 @@ def connector_indexing_proxy_task( "Connector termination signal detected", ) except Exception: - # if the DB exceptions, we'll just get an unfriendly failure message - # in the UI instead of the cancellation message task_logger.exception( log_builder.build( "Indexing watchdog - transient exception marking index attempt as canceled" @@ -1148,6 +1129,25 @@ def connector_indexing_proxy_task( ) job.cancel() + elif result.status == IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT: + try: + with get_session_with_current_tenant() as db_session: + mark_attempt_failed( + index_attempt_id, + db_session, + "Indexing watchdog - activity timeout exceeded: " + f"attempt={index_attempt_id} " + f"timeout={CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s", + ) + except Exception: + logger.exception( + log_builder.build( + "Indexing watchdog - transient exception marking index attempt as failed" + ) + ) + job.cancel() + else: + pass task_logger.info( log_builder.build( diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index d750a5dcff9..76dc20ecb06 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -173,7 +173,10 @@ def index_doc_batch_with_handler( tenant_id=tenant_id, ) except Exception as e: - logger.exception(f"Failed to index document batch: {document_batch}") + # don't log the batch directly, it's too much text + document_ids = [doc.id for doc in document_batch] + logger.exception(f"Failed to index document batch: {document_ids}") + index_pipeline_result = IndexingPipelineResult( new_docs=0, total_docs=len(document_batch),