diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 842d3679a..d86db37ad 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -389,6 +389,8 @@ def monitor_connector_deletion_taskset( db_session=db_session, cc_pair_id=cc_pair_id, ) + credential_id_to_delete: int | None = None + connector_id_to_delete: int | None = None if not cc_pair: task_logger.warning( f"Connector deletion - cc_pair not found: cc_pair={cc_pair_id}" @@ -443,26 +445,35 @@ def monitor_connector_deletion_taskset( db_session=db_session, ) + # Store IDs before potentially expiring cc_pair + connector_id_to_delete = cc_pair.connector_id + credential_id_to_delete = cc_pair.credential_id + # Explicitly delete document by connector credential pair records before deleting the connector # This is needed because connector_id is a primary key in that table and cascading deletes won't work delete_all_documents_by_connector_credential_pair__no_commit( db_session=db_session, - connector_id=cc_pair.connector_id, - credential_id=cc_pair.credential_id, + connector_id=connector_id_to_delete, + credential_id=credential_id_to_delete, ) + # Flush to ensure document deletion happens before connector deletion db_session.flush() + # Expire the cc_pair to ensure SQLAlchemy doesn't try to manage its state + # related to the deleted DocumentByConnectorCredentialPair during commit + db_session.expire(cc_pair) + # finally, delete the cc-pair delete_connector_credential_pair__no_commit( db_session=db_session, - connector_id=cc_pair.connector_id, - credential_id=cc_pair.credential_id, + connector_id=connector_id_to_delete, + credential_id=credential_id_to_delete, ) # if there are no credentials left, delete the connector connector = fetch_connector_by_id( db_session=db_session, - connector_id=cc_pair.connector_id, + connector_id=connector_id_to_delete, ) if not connector or not len(connector.credentials): task_logger.info( @@ -495,15 +506,15 @@ def monitor_connector_deletion_taskset( task_logger.exception( f"Connector deletion exceptioned: " - f"cc_pair={cc_pair_id} connector={cc_pair.connector_id} credential={cc_pair.credential_id}" + f"cc_pair={cc_pair_id} connector={connector_id_to_delete} credential={credential_id_to_delete}" ) raise e task_logger.info( f"Connector deletion succeeded: " f"cc_pair={cc_pair_id} " - f"connector={cc_pair.connector_id} " - f"credential={cc_pair.credential_id} " + f"connector={connector_id_to_delete} " + f"credential={credential_id_to_delete} " f"docs_deleted={fence_data.num_tasks}" ) @@ -553,7 +564,7 @@ def validate_connector_deletion_fences( def validate_connector_deletion_fence( tenant_id: str, key_bytes: bytes, - queued_tasks: set[str], + queued_upsert_tasks: set[str], r: Redis, ) -> None: """Checks for the error condition where an indexing fence is set but the associated celery tasks don't exist. @@ -640,7 +651,7 @@ def validate_connector_deletion_fence( member_bytes = cast(bytes, member) member_str = member_bytes.decode("utf-8") - if member_str in queued_tasks: + if member_str in queued_upsert_tasks: continue tasks_not_in_celery += 1