From 6e5d9f33d24c7286c2d49cddf6ef8ebd676b353e Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Thu, 23 May 2024 23:44:32 -0700 Subject: [PATCH] Fix Undeleteable Connectors (#1507) --- ...remove_last_attempt_status_from_cc_pair.py | 31 ++++++++++++++++ backend/danswer/background/celery/celery.py | 4 +- .../background/indexing/run_indexing.py | 37 +++++++------------ backend/danswer/background/update.py | 28 -------------- .../danswer/connectors/discourse/connector.py | 2 +- .../connectors/productboard/connector.py | 2 +- .../danswer/db/connector_credential_pair.py | 30 +-------------- backend/danswer/db/deletion_attempt.py | 24 ++++++++++-- backend/danswer/db/models.py | 3 -- backend/danswer/server/documents/connector.py | 5 ++- .../danswer/server/manage/administrative.py | 4 +- 11 files changed, 78 insertions(+), 92 deletions(-) create mode 100644 backend/alembic/versions/ec85f2b3c544_remove_last_attempt_status_from_cc_pair.py diff --git a/backend/alembic/versions/ec85f2b3c544_remove_last_attempt_status_from_cc_pair.py b/backend/alembic/versions/ec85f2b3c544_remove_last_attempt_status_from_cc_pair.py new file mode 100644 index 0000000000..8b982c32e2 --- /dev/null +++ b/backend/alembic/versions/ec85f2b3c544_remove_last_attempt_status_from_cc_pair.py @@ -0,0 +1,31 @@ +"""Remove Last Attempt Status from CC Pair + +Revision ID: ec85f2b3c544 +Revises: 3879338f8ba1 +Create Date: 2024-05-23 21:39:46.126010 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "ec85f2b3c544" +down_revision = "3879338f8ba1" +branch_labels: None = None +depends_on: None = None + + +def upgrade() -> None: + op.drop_column("connector_credential_pair", "last_attempt_status") + + +def downgrade() -> None: + op.add_column( + "connector_credential_pair", + sa.Column( + "last_attempt_status", + sa.VARCHAR(), + autoincrement=False, + nullable=True, + ), + ) diff --git a/backend/danswer/background/celery/celery.py b/backend/danswer/background/celery/celery.py index 1a5ae1f977..91fef292db 100644 --- a/backend/danswer/background/celery/celery.py +++ b/backend/danswer/background/celery/celery.py @@ -68,7 +68,9 @@ def cleanup_connector_credential_pair_task( f"{connector_id} and Credential ID: {credential_id} does not exist." ) - deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) + deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed( + connector_credential_pair=cc_pair, db_session=db_session + ) if deletion_attempt_disallowed_reason: raise ValueError(deletion_attempt_disallowed_reason) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index e78d5558d2..18b30113cd 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -160,19 +160,19 @@ def _run_indexing( source_type=db_connector.source, ) ): - window_start = max( - window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET), - datetime(1970, 1, 1, tzinfo=timezone.utc), - ) - - doc_batch_generator, is_listing_complete = _get_document_generator( - db_session=db_session, - attempt=index_attempt, - start_time=window_start, - end_time=window_end, - ) - try: + window_start = max( + window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET), + datetime(1970, 1, 1, tzinfo=timezone.utc), + ) + + doc_batch_generator, is_listing_complete = _get_document_generator( + db_session=db_session, + attempt=index_attempt, + start_time=window_start, + end_time=window_end, + ) + all_connector_doc_ids: set[str] = set() for doc_batch in doc_batch_generator: # Check if connector is disabled mid run and stop if so unless it's the secondary @@ -263,7 +263,6 @@ def _run_indexing( db_session=db_session, connector_id=db_connector.id, credential_id=db_credential.id, - attempt_status=IndexingStatus.IN_PROGRESS, net_docs=net_doc_change, run_dt=run_end_dt, ) @@ -294,7 +293,6 @@ def _run_indexing( db_session=db_session, connector_id=index_attempt.connector.id, credential_id=index_attempt.credential.id, - attempt_status=IndexingStatus.FAILED, net_docs=net_doc_change, ) raise e @@ -309,7 +307,6 @@ def _run_indexing( db_session=db_session, connector_id=db_connector.id, credential_id=db_credential.id, - attempt_status=IndexingStatus.SUCCESS, run_dt=run_end_dt, ) @@ -343,15 +340,7 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA # only commit once, to make sure this all happens in a single transaction mark_attempt_in_progress__no_commit(attempt) - is_primary = attempt.embedding_model.status == IndexModelStatus.PRESENT - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=attempt.connector.id, - credential_id=attempt.credential.id, - attempt_status=IndexingStatus.IN_PROGRESS, - ) - else: + if attempt.embedding_model.status != IndexModelStatus.PRESENT: db_session.commit() return attempt diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index ae62969657..8b115e4482 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -17,8 +17,6 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.db.connector import fetch_connectors -from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed -from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.engine import get_db_current_time @@ -119,17 +117,6 @@ def _mark_run_failed( db_session=db_session, failure_reason=failure_reason, ) - if ( - index_attempt.connector_id is not None - and index_attempt.credential_id is not None - and index_attempt.embedding_model.status == IndexModelStatus.PRESENT - ): - update_connector_credential_pair( - db_session=db_session, - connector_id=index_attempt.connector_id, - credential_id=index_attempt.credential_id, - attempt_status=IndexingStatus.FAILED, - ) """Main funcs""" @@ -192,16 +179,6 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None: connector.id, credential.id, model.id, db_session ) - # CC-Pair will have the status that it should for the primary index - # Will be re-sync-ed once the indices are swapped - if model.status == IndexModelStatus.PRESENT: - update_connector_credential_pair( - db_session=db_session, - connector_id=connector.id, - credential_id=credential.id, - attempt_status=IndexingStatus.NOT_STARTED, - ) - def cleanup_indexing_jobs( existing_jobs: dict[int, Future | SimpleJob], @@ -391,11 +368,6 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non existing_jobs: dict[int, Future | SimpleJob] = {} - with Session(engine) as db_session: - # Previous version did not always clean up cc-pairs well leaving some connectors undeleteable - # This ensures that bad states get cleaned up - mark_all_in_progress_cc_pairs_failed(db_session) - while True: start = time.time() start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") diff --git a/backend/danswer/connectors/discourse/connector.py b/backend/danswer/connectors/discourse/connector.py index 47d817d113..1bf64a6b36 100644 --- a/backend/danswer/connectors/discourse/connector.py +++ b/backend/danswer/connectors/discourse/connector.py @@ -193,7 +193,7 @@ class DiscourseConnector(PollConnector): start=start_datetime, end=end_datetime ) - return self._yield_discourse_documents(latest_topic_ids) + yield from self._yield_discourse_documents(latest_topic_ids) if __name__ == "__main__": diff --git a/backend/danswer/connectors/productboard/connector.py b/backend/danswer/connectors/productboard/connector.py index 1c013f42b5..9ef301aa76 100644 --- a/backend/danswer/connectors/productboard/connector.py +++ b/backend/danswer/connectors/productboard/connector.py @@ -207,7 +207,7 @@ class ProductboardConnector(PollConnector): ): return True else: - logger.error(f"Unable to find updated_at for document '{document.id}'") + logger.debug(f"Unable to find updated_at for document '{document.id}'") return False diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index 25c646072e..31f2982db6 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -4,7 +4,6 @@ from fastapi import HTTPException from sqlalchemy import delete from sqlalchemy import desc from sqlalchemy import select -from sqlalchemy import update from sqlalchemy.orm import Session from danswer.db.connector import fetch_connector_by_id @@ -96,7 +95,6 @@ def update_connector_credential_pair( db_session: Session, connector_id: int, credential_id: int, - attempt_status: IndexingStatus, net_docs: int | None = None, run_dt: datetime | None = None, ) -> None: @@ -107,13 +105,9 @@ def update_connector_credential_pair( f"and credential id {credential_id}" ) return - cc_pair.last_attempt_status = attempt_status # simply don't update last_successful_index_time if run_dt is not specified # at worst, this would result in re-indexing documents that were already indexed - if ( - attempt_status == IndexingStatus.SUCCESS - or attempt_status == IndexingStatus.IN_PROGRESS - ) and run_dt is not None: + if run_dt is not None: cc_pair.last_successful_index_time = run_dt if net_docs is not None: cc_pair.total_docs_indexed += net_docs @@ -132,20 +126,6 @@ def delete_connector_credential_pair__no_commit( db_session.execute(stmt) -def mark_all_in_progress_cc_pairs_failed( - db_session: Session, -) -> None: - stmt = ( - update(ConnectorCredentialPair) - .where( - ConnectorCredentialPair.last_attempt_status == IndexingStatus.IN_PROGRESS - ) - .values(last_attempt_status=IndexingStatus.FAILED) - ) - db_session.execute(stmt) - db_session.commit() - - def associate_default_cc_pair(db_session: Session) -> None: existing_association = ( db_session.query(ConnectorCredentialPair) @@ -297,12 +277,4 @@ def resync_cc_pair( last_success.time_started if last_success else None ) - last_run = find_latest_index_attempt( - connector_id=cc_pair.connector_id, - credential_id=cc_pair.credential_id, - only_include_success=False, - db_session=db_session, - ) - cc_pair.last_attempt_status = last_run.status if last_run else None - db_session.commit() diff --git a/backend/danswer/db/deletion_attempt.py b/backend/danswer/db/deletion_attempt.py index e324c1c27a..b66e6f5852 100644 --- a/backend/danswer/db/deletion_attempt.py +++ b/backend/danswer/db/deletion_attempt.py @@ -1,9 +1,14 @@ +from sqlalchemy.orm import Session + +from danswer.db.embedding_model import get_current_db_embedding_model +from danswer.db.index_attempt import get_last_attempt from danswer.db.models import ConnectorCredentialPair from danswer.db.models import IndexingStatus def check_deletion_attempt_is_allowed( connector_credential_pair: ConnectorCredentialPair, + db_session: Session, allow_scheduled: bool = False, ) -> str | None: """ @@ -21,9 +26,22 @@ def check_deletion_attempt_is_allowed( if not connector_credential_pair.connector.disabled: return base_error_msg + " Connector must be paused." - if connector_credential_pair.last_attempt_status == IndexingStatus.IN_PROGRESS or ( - connector_credential_pair.last_attempt_status == IndexingStatus.NOT_STARTED - and not allow_scheduled + connector_id = connector_credential_pair.connector_id + credential_id = connector_credential_pair.credential_id + current_embedding_model = get_current_db_embedding_model(db_session) + + last_indexing = get_last_attempt( + connector_id=connector_id, + credential_id=credential_id, + embedding_model_id=current_embedding_model.id, + db_session=db_session, + ) + + if not last_indexing: + return None + + if last_indexing.status == IndexingStatus.IN_PROGRESS or ( + last_indexing.status == IndexingStatus.NOT_STARTED and not allow_scheduled ): return ( base_error_msg diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index f2bc7cdb94..a77223394f 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -270,9 +270,6 @@ class ConnectorCredentialPair(Base): last_successful_index_time: Mapped[datetime.datetime | None] = mapped_column( DateTime(timezone=True), default=None ) - last_attempt_status: Mapped[IndexingStatus | None] = mapped_column( - Enum(IndexingStatus, native_enum=False) - ) total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0) connector: Mapped["Connector"] = relationship( diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index f612d7a879..e183346805 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -418,7 +418,9 @@ def get_connector_indexing_status( credential=CredentialSnapshot.from_credential_db_model(credential), public_doc=cc_pair.is_public, owner=credential.user.email if credential.user else "", - last_status=cc_pair.last_attempt_status, + last_status=latest_index_attempt.status + if latest_index_attempt + else None, last_success=cc_pair.last_successful_index_time, docs_indexed=cc_pair_to_document_cnt.get( (connector.id, credential.id), 0 @@ -438,6 +440,7 @@ def get_connector_indexing_status( ), is_deletable=check_deletion_attempt_is_allowed( connector_credential_pair=cc_pair, + db_session=db_session, # allow scheduled indexing attempts here, since on deletion request we will cancel them allow_scheduled=True, ) diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index 7fdfba768f..c60206ca3f 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -175,7 +175,9 @@ def create_deletion_attempt_for_connector_id( ) # Check if the deletion attempt should be allowed - deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) + deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed( + connector_credential_pair=cc_pair, db_session=db_session + ) if deletion_attempt_disallowed_reason: raise HTTPException( status_code=400,