diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 0af1f0195..3bcb650e7 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -192,7 +192,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: ) if attempt_id: task_logger.info( - f"Indexing queued: index_attempt={attempt_id} " + f"Connector indexing queued: " + f"index_attempt={attempt_id} " f"cc_pair={cc_pair.id} " f"search_settings={search_settings_instance.id} " ) @@ -383,7 +384,6 @@ def try_creating_indexing_task( payload.index_attempt_id = index_attempt_id payload.celery_task_id = result.id redis_connector_index.set_fence(payload) - except Exception: redis_connector_index.set_fence(None) task_logger.exception( @@ -516,7 +516,8 @@ def connector_indexing_task( logger.debug("Sentry DSN not provided, skipping Sentry initialization") logger.info( - f"Indexing spawned task starting: attempt={index_attempt_id} " + f"Indexing spawned task starting: " + f"attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 3cee42ad7..2719a4d06 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -177,7 +177,17 @@ def document_by_cc_pair_cleanup_task( f"Max celery task retries reached. Marking doc as dirty for reconciliation: " f"tenant={tenant_id} doc={document_id}" ) - with get_session_with_tenant(tenant_id): + with get_session_with_tenant(tenant_id) as db_session: + # delete the cc pair relationship now and let reconciliation clean it up + # in vespa + delete_document_by_connector_credential_pair__no_commit( + db_session=db_session, + document_id=document_id, + connector_credential_pair_identifier=ConnectorCredentialPairIdentifier( + connector_id=connector_id, + credential_id=credential_id, + ), + ) mark_document_as_modified(document_id, db_session) return False diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index f4b182ac9..12a1fe30d 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -444,11 +444,22 @@ def monitor_connector_deletion_taskset( db_session, cc_pair.connector_id, cc_pair.credential_id ) if len(doc_ids) > 0: - # if this happens, documents somehow got added while deletion was in progress. Likely a bug - # gating off pruning and indexing work before deletion starts + # NOTE(rkuo): if this happens, documents somehow got added while + # deletion was in progress. Likely a bug gating off pruning and indexing + # work before deletion starts. task_logger.warning( - f"Connector deletion - documents still found after taskset completion: " - f"cc_pair={cc_pair_id} num={len(doc_ids)}" + "Connector deletion - documents still found after taskset completion. " + "Clearing the current deletion attempt and allowing deletion to restart: " + f"cc_pair={cc_pair_id} " + f"docs_deleted={fence_data.num_tasks} " + f"docs_remaining={len(doc_ids)}" + ) + + # We don't want to waive off why we get into this state, but resetting + # our attempt and letting the deletion restart is a good way to recover + redis_connector.delete.reset() + raise RuntimeError( + "Connector deletion - documents still found after taskset completion" ) # clean up the rest of the related Postgres entities @@ -512,8 +523,7 @@ def monitor_connector_deletion_taskset( f"docs_deleted={fence_data.num_tasks}" ) - redis_connector.delete.taskset_clear() - redis_connector.delete.set_fence(None) + redis_connector.delete.reset() def monitor_ccpair_pruning_taskset( @@ -645,26 +655,34 @@ def monitor_ccpair_indexing_taskset( result_state = result.state status_int = redis_connector_index.get_completion() - if status_int is None: + if status_int is None: # completion signal not set ... check for errors + # If we get here, and then the task both sets the completion signal and finishes, + # we will incorrectly abort the task. We must check result state, then check + # get_completion again to avoid the race condition. if result_state in READY_STATES: - # IF the task state is READY, THEN generator_complete should be set - # if it isn't, then the worker crashed - task_logger.info( - f"Connector indexing aborted: " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" - ) - - index_attempt = get_index_attempt(db_session, payload.index_attempt_id) - if index_attempt: - mark_attempt_failed( - index_attempt_id=payload.index_attempt_id, - db_session=db_session, - failure_reason="Connector indexing aborted or exceptioned.", + if redis_connector_index.get_completion() is None: + # IF the task state is READY, THEN generator_complete should be set + # if it isn't, then the worker crashed + msg = ( + f"Connector indexing aborted or exceptioned: " + f"attempt={payload.index_attempt_id} " + f"celery_task={payload.celery_task_id} " + f"result_state={result_state} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" ) + task_logger.warning(msg) - redis_connector_index.reset() + index_attempt = get_index_attempt(db_session, payload.index_attempt_id) + if index_attempt: + mark_attempt_failed( + index_attempt_id=payload.index_attempt_id, + db_session=db_session, + failure_reason=msg, + ) + + redis_connector_index.reset() return status_enum = HTTPStatus(status_int) diff --git a/backend/danswer/redis/redis_connector_delete.py b/backend/danswer/redis/redis_connector_delete.py index 6de4a9ec0..51b59ca92 100644 --- a/backend/danswer/redis/redis_connector_delete.py +++ b/backend/danswer/redis/redis_connector_delete.py @@ -14,6 +14,7 @@ from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.document import construct_document_select_for_connector_credential_pair +from danswer.db.models import Document as DbDocument class RedisConnectorDeletionFenceData(BaseModel): @@ -98,7 +99,8 @@ class RedisConnectorDelete: stmt = construct_document_select_for_connector_credential_pair( cc_pair.connector_id, cc_pair.credential_id ) - for doc in db_session.scalars(stmt).yield_per(1): + for doc_temp in db_session.scalars(stmt).yield_per(1): + doc: DbDocument = doc_temp current_time = time.monotonic() if current_time - last_lock_time >= ( CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 @@ -130,6 +132,10 @@ class RedisConnectorDelete: return len(async_results) + def reset(self) -> None: + self.redis.delete(self.taskset_key) + self.redis.delete(self.fence_key) + @staticmethod def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None: taskset_key = f"{RedisConnectorDelete.TASKSET_PREFIX}_{id}" diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index cc6b1419b..9b9da834e 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -865,21 +865,31 @@ def connector_run_once( ) if attempt_id: logger.info( - f"try_creating_indexing_task succeeded: cc_pair={cc_pair.id} attempt_id={attempt_id}" + f"connector_run_once - try_creating_indexing_task succeeded: " + f"connector={run_info.connector_id} " + f"cc_pair={cc_pair.id} " + f"attempt={attempt_id} " ) index_attempt_ids.append(attempt_id) else: - logger.info(f"try_creating_indexing_task failed: cc_pair={cc_pair.id}") + logger.info( + f"connector_run_once - try_creating_indexing_task failed: " + f"connector={run_info.connector_id} " + f"cc_pair={cc_pair.id}" + ) if not index_attempt_ids: + msg = "No new indexing attempts created, indexing jobs are queued or running." + logger.info(msg) raise HTTPException( status_code=400, - detail="No new indexing attempts created, indexing jobs are queued or running.", + detail=msg, ) + msg = f"Successfully created {len(index_attempt_ids)} index attempts. {index_attempt_ids}" return StatusResponse( success=True, - message=f"Successfully created {len(index_attempt_ids)} index attempts", + message=msg, data=index_attempt_ids, )