diff --git a/backend/alembic/versions/5f4b8568a221_add_removed_documents_to_index_attempt.py b/backend/alembic/versions/5f4b8568a221_add_removed_documents_to_index_attempt.py new file mode 100644 index 000000000..db3e70896 --- /dev/null +++ b/backend/alembic/versions/5f4b8568a221_add_removed_documents_to_index_attempt.py @@ -0,0 +1,27 @@ +"""add removed documents to index_attempt + +Revision ID: 5f4b8568a221 +Revises: dbaa756c2ccf +Create Date: 2024-02-16 15:02:03.319907 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "5f4b8568a221" +down_revision = "8987770549c0" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "index_attempt", + sa.Column("docs_removed_from_index", sa.Integer()), + ) + op.execute("UPDATE index_attempt SET docs_removed_from_index = 0") + + +def downgrade() -> None: + op.drop_column("index_attempt", "docs_removed_from_index") diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 845850144..8b086dccb 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -47,6 +47,10 @@ def _delete_connector_credential_pair_batch( credential_id: int, document_index: DocumentIndex, ) -> None: + """ + Removes a batch of documents ids from a cc-pair. If no other cc-pair uses a document anymore + it gets permanently deleted. + """ with Session(get_sqlalchemy_engine()) as db_session: # acquire lock for all documents in this batch so that indexing can't # override the deletion diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 70d64d880..a5db15524 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -6,6 +6,9 @@ from datetime import timezone from sqlalchemy.orm import Session +from danswer.background.connector_deletion import ( + _delete_connector_credential_pair_batch, +) from danswer.background.indexing.checkpointing import get_time_windows_for_index_attempt from danswer.configs.app_configs import POLL_CONNECTOR_OFFSET from danswer.connectors.factory import instantiate_connector @@ -18,6 +21,7 @@ from danswer.db.connector import disable_connector from danswer.db.connector_credential_pair import get_last_successful_attempt_time from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.credentials import backend_update_credential_json +from danswer.db.document import get_documents_for_connector_credential_pair from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import mark_attempt_failed @@ -41,8 +45,14 @@ def _get_document_generator( attempt: IndexAttempt, start_time: datetime, end_time: datetime, -) -> GenerateDocumentsOutput: - """NOTE: `start_time` and `end_time` are only used for poll connectors""" +) -> tuple[GenerateDocumentsOutput, bool]: + """ + NOTE: `start_time` and `end_time` are only used for poll connectors + + Returns an interator of document batches and whether the returned documents + are the complete list of existing documents of the connector. If the task + of type LOAD_STATE, the list will be considered complete and otherwise incomplete. + """ task = attempt.connector.input_type try: @@ -64,7 +74,7 @@ def _get_document_generator( if task == InputType.LOAD_STATE: assert isinstance(runnable_connector, LoadConnector) doc_batch_generator = runnable_connector.load_from_state() - + is_listing_complete = True elif task == InputType.POLL: assert isinstance(runnable_connector, PollConnector) if attempt.connector_id is None or attempt.credential_id is None: @@ -77,12 +87,13 @@ def _get_document_generator( doc_batch_generator = runnable_connector.poll_source( start=start_time.timestamp(), end=end_time.timestamp() ) + is_listing_complete = False else: # Event types cannot be handled by a background type raise RuntimeError(f"Invalid task type: {task}") - return doc_batch_generator + return doc_batch_generator, is_listing_complete def _run_indexing( @@ -162,7 +173,7 @@ def _run_indexing( datetime(1970, 1, 1, tzinfo=timezone.utc), ) - doc_batch_generator = _get_document_generator( + doc_batch_generator, is_listing_complete = _get_document_generator( db_session=db_session, attempt=index_attempt, start_time=window_start, @@ -170,6 +181,7 @@ def _run_indexing( ) try: + all_connector_doc_ids = set() for doc_batch in doc_batch_generator: # Check if connector is disabled mid run and stop if so unless it's the secondary # index being built. We want to populate it even for paused connectors @@ -202,6 +214,7 @@ def _run_indexing( net_doc_change += new_docs chunk_count += total_batch_chunks document_count += len(doc_batch) + all_connector_doc_ids.update(doc.id for doc in doc_batch) # commit transaction so that the `update` below begins # with a brand new transaction. Postgres uses the start @@ -216,6 +229,40 @@ def _run_indexing( index_attempt=index_attempt, total_docs_indexed=document_count, new_docs_indexed=net_doc_change, + docs_removed_from_index=0, + ) + + if is_listing_complete: + # clean up all documents from the index that have not been returned from the connector + all_indexed_document_ids = { + d.id + for d in get_documents_for_connector_credential_pair( + db_session=db_session, + connector_id=index_attempt.connector_id, + credential_id=index_attempt.credential_id, + ) + } + doc_ids_to_remove = list( + all_indexed_document_ids - all_connector_doc_ids + ) + logger.debug( + f"Cleaning up {len(doc_ids_to_remove)} documents that are not contained in the newest connector state" + ) + + # delete docs from cc-pair and receive the number of completely deleted docs in return + _delete_connector_credential_pair_batch( + document_ids=doc_ids_to_remove, + connector_id=index_attempt.connector_id, + credential_id=index_attempt.credential_id, + document_index=document_index, + ) + + update_docs_indexed( + db_session=db_session, + index_attempt=index_attempt, + total_docs_indexed=document_count, + new_docs_indexed=net_doc_change, + docs_removed_from_index=len(doc_ids_to_remove), ) run_end_dt = window_end diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 05a23395d..ce913098e 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -115,9 +115,11 @@ def update_docs_indexed( index_attempt: IndexAttempt, total_docs_indexed: int, new_docs_indexed: int, + docs_removed_from_index: int, ) -> None: index_attempt.total_docs_indexed = total_docs_indexed index_attempt.new_docs_indexed = new_docs_indexed + index_attempt.docs_removed_from_index = docs_removed_from_index db_session.add(index_attempt) db_session.commit() diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index ecd96103f..b338d3e7a 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -427,6 +427,7 @@ class IndexAttempt(Base): # The two below may be slightly out of sync if user switches Embedding Model new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0) total_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0) + docs_removed_from_index: Mapped[int | None] = mapped_column(Integer, default=0) # only filled if status = "failed" error_msg: Mapped[str | None] = mapped_column(Text, default=None) # only filled if status = "failed" AND an unhandled exception caused the failure diff --git a/backend/danswer/server/documents/models.py b/backend/danswer/server/documents/models.py index cac00578e..d574cc361 100644 --- a/backend/danswer/server/documents/models.py +++ b/backend/danswer/server/documents/models.py @@ -31,6 +31,7 @@ class IndexAttemptSnapshot(BaseModel): status: IndexingStatus | None new_docs_indexed: int # only includes completely new docs total_docs_indexed: int # includes docs that are updated + docs_removed_from_index: int error_msg: str | None full_exception_trace: str | None time_started: str | None @@ -45,6 +46,7 @@ class IndexAttemptSnapshot(BaseModel): status=index_attempt.status, new_docs_indexed=index_attempt.new_docs_indexed or 0, total_docs_indexed=index_attempt.total_docs_indexed or 0, + docs_removed_from_index=index_attempt.docs_removed_from_index or 0, error_msg=index_attempt.error_msg, full_exception_trace=index_attempt.full_exception_trace, time_started=index_attempt.time_started.isoformat() diff --git a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx index d2fd7facd..5f87b95bb 100644 --- a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx +++ b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx @@ -80,6 +80,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { Time Started Status New Doc Cnt + Removed Doc Cnt Total Doc Cnt Error Msg @@ -109,6 +110,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { )} {indexAttempt.new_docs_indexed} + {indexAttempt.docs_removed_from_index} {indexAttempt.total_docs_indexed}
diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index 1935ae776..18b9e336f 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -165,6 +165,7 @@ export interface IndexAttemptSnapshot { id: number; status: ValidStatuses | null; new_docs_indexed: number; + docs_removed_from_index: number; total_docs_indexed: number; error_msg: string | null; full_exception_trace: string | null;