Ingestion API now always updates regardless of document updated_at (#786)

This commit is contained in:
Yuhong Sun 2023-11-29 02:08:50 -08:00 committed by GitHub
parent 9b7069a043
commit 006fd4c438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 9 deletions

View File

@ -69,6 +69,7 @@ def _indexing_pipeline(
document_index: DocumentIndex, document_index: DocumentIndex,
documents: list[Document], documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata, index_attempt_metadata: IndexAttemptMetadata,
ignore_time_skip: bool = False,
) -> tuple[int, int]: ) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents """Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the Note that the documents should already be batched at this point so that it does not inflate the
@ -87,14 +88,18 @@ def _indexing_pipeline(
} }
updatable_docs: list[Document] = [] updatable_docs: list[Document] = []
for doc in documents: if ignore_time_skip:
if ( updatable_docs = documents
doc.id in id_update_time_map else:
and doc.doc_updated_at for doc in documents:
and doc.doc_updated_at <= id_update_time_map[doc.id] if (
): doc.id in id_update_time_map
continue and doc.doc_updated_at
updatable_docs.append(doc) and doc.doc_updated_at <= id_update_time_map[doc.id]
):
continue
updatable_docs.append(doc)
updatable_ids = [doc.id for doc in updatable_docs] updatable_ids = [doc.id for doc in updatable_docs]
# Acquires a lock on the documents so that no other process can modify them # Acquires a lock on the documents so that no other process can modify them
@ -175,6 +180,7 @@ def build_indexing_pipeline(
chunker: Chunker | None = None, chunker: Chunker | None = None,
embedder: Embedder | None = None, embedder: Embedder | None = None,
document_index: DocumentIndex | None = None, document_index: DocumentIndex | None = None,
ignore_time_skip: bool = False,
) -> IndexingPipelineProtocol: ) -> IndexingPipelineProtocol:
"""Builds a pipline which takes in a list (batch) of docs and indexes them.""" """Builds a pipline which takes in a list (batch) of docs and indexes them."""
chunker = chunker or DefaultChunker() chunker = chunker or DefaultChunker()
@ -188,4 +194,5 @@ def build_indexing_pipeline(
chunker=chunker, chunker=chunker,
embedder=embedder, embedder=embedder,
document_index=document_index, document_index=document_index,
ignore_time_skip=ignore_time_skip,
) )

View File

@ -141,7 +141,7 @@ def document_ingestion(
if document.source == DocumentSource.INGESTION_API: if document.source == DocumentSource.INGESTION_API:
document.source = DocumentSource.FILE document.source = DocumentSource.FILE
indexing_pipeline = build_indexing_pipeline() indexing_pipeline = build_indexing_pipeline(ignore_time_skip=True)
new_doc, chunks = indexing_pipeline( new_doc, chunks = indexing_pipeline(
documents=[document], documents=[document],