From 006fd4c43821cb6a01641d0bd2c7ebd3ad01d459 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Wed, 29 Nov 2023 02:08:50 -0800 Subject: [PATCH] Ingestion API now always updates regardless of document updated_at (#786) --- backend/danswer/indexing/indexing_pipeline.py | 23 ++++++++++++------- backend/danswer/server/danswer_api.py | 2 +- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 644a35a0d..f94e637e1 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -69,6 +69,7 @@ def _indexing_pipeline( document_index: DocumentIndex, documents: list[Document], index_attempt_metadata: IndexAttemptMetadata, + ignore_time_skip: bool = False, ) -> tuple[int, int]: """Takes different pieces of the indexing pipeline and applies it to a batch of documents Note that the documents should already be batched at this point so that it does not inflate the @@ -87,14 +88,18 @@ def _indexing_pipeline( } updatable_docs: list[Document] = [] - for doc in documents: - if ( - doc.id in id_update_time_map - and doc.doc_updated_at - and doc.doc_updated_at <= id_update_time_map[doc.id] - ): - continue - updatable_docs.append(doc) + if ignore_time_skip: + updatable_docs = documents + else: + for doc in documents: + if ( + doc.id in id_update_time_map + and doc.doc_updated_at + and doc.doc_updated_at <= id_update_time_map[doc.id] + ): + continue + updatable_docs.append(doc) + updatable_ids = [doc.id for doc in updatable_docs] # Acquires a lock on the documents so that no other process can modify them @@ -175,6 +180,7 @@ def build_indexing_pipeline( chunker: Chunker | None = None, embedder: Embedder | None = None, document_index: DocumentIndex | None = None, + ignore_time_skip: bool = False, ) -> IndexingPipelineProtocol: """Builds a pipline which takes in a list (batch) of docs and indexes them.""" chunker = chunker or DefaultChunker() @@ -188,4 +194,5 @@ def build_indexing_pipeline( chunker=chunker, embedder=embedder, document_index=document_index, + ignore_time_skip=ignore_time_skip, ) diff --git a/backend/danswer/server/danswer_api.py b/backend/danswer/server/danswer_api.py index 3ad87b515..624e67362 100644 --- a/backend/danswer/server/danswer_api.py +++ b/backend/danswer/server/danswer_api.py @@ -141,7 +141,7 @@ def document_ingestion( if document.source == DocumentSource.INGESTION_API: document.source = DocumentSource.FILE - indexing_pipeline = build_indexing_pipeline() + indexing_pipeline = build_indexing_pipeline(ignore_time_skip=True) new_doc, chunks = indexing_pipeline( documents=[document],