From 046c0fbe3e157336cd6cd760bb697bbcc8591b4a Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Wed, 11 Dec 2024 19:08:05 -0800 Subject: [PATCH] update indexing --- backend/danswer/document_index/interfaces.py | 7 +++- backend/danswer/document_index/vespa/index.py | 34 ++++++++++--------- backend/danswer/seeding/load_docs.py | 2 +- backend/danswer/setup.py | 3 +- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py index 84dcbf4847..dc6d19f9aa 100644 --- a/backend/danswer/document_index/interfaces.py +++ b/backend/danswer/document_index/interfaces.py @@ -143,11 +143,11 @@ class Indexable(abc.ABC): """ Class must implement the ability to index document chunks """ - @abc.abstractmethod def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: """ Takes a list of document chunks and indexes them in the document index @@ -165,9 +165,14 @@ class Indexable(abc.ABC): only needs to index chunks into the PRIMARY index. Do not update the secondary index here, it is done automatically outside of this code. + NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously + indexed for the given index/tenant. This can be used to optimize the indexing process for + new or empty indices. + Parameters: - chunks: Document chunks with all of the information needed for indexing to the document index. + - fresh_index: Boolean indicating whether this is a fresh index with no existing documents. Returns: List of document ids which map to unique documents and are used for deduping chunks diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index ebe6daca1a..d3f491db6a 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -306,6 +306,7 @@ class VespaIndex(DocumentIndex): def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: """Receive a list of chunks from a batch of documents and index the chunks into Vespa along with updating the associated permissions. Assumes that a document will not be split into @@ -322,26 +323,27 @@ class VespaIndex(DocumentIndex): concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): - # Check for existing documents, existing documents need to have all of their chunks deleted - # prior to indexing as the document size (num chunks) may have shrunk - first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] - for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): - existing_docs.update( - get_existing_documents_from_chunks( - chunks=chunk_batch, + if not fresh_index: + # Check for existing documents, existing documents need to have all of their chunks deleted + # prior to indexing as the document size (num chunks) may have shrunk + first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] + for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + existing_docs.update( + get_existing_documents_from_chunks( + chunks=chunk_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + ) + + for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): + delete_vespa_docs( + document_ids=doc_id_batch, index_name=self.index_name, http_client=http_client, executor=executor, ) - ) - - for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): - delete_vespa_docs( - document_ids=doc_id_batch, - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( diff --git a/backend/danswer/seeding/load_docs.py b/backend/danswer/seeding/load_docs.py index 5fe591423f..e7061fa131 100644 --- a/backend/danswer/seeding/load_docs.py +++ b/backend/danswer/seeding/load_docs.py @@ -216,7 +216,7 @@ def seed_initial_documents( # as we just sent over the Vespa schema and there is a slight delay index_with_retries = retry_builder()(document_index.index) - index_with_retries(chunks=chunks) + index_with_retries(chunks=chunks, fresh_index=True) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/danswer/setup.py b/backend/danswer/setup.py index 9571ac2892..2468d45470 100644 --- a/backend/danswer/setup.py +++ b/backend/danswer/setup.py @@ -22,6 +22,7 @@ from danswer.db.document import check_docs_exist from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import expire_index_attempts from danswer.db.llm import fetch_default_provider +from danswer.seeding.load_docs import seed_initial_documents from danswer.db.llm import update_default_provider from danswer.db.llm import upsert_llm_provider from danswer.db.persona import delete_old_default_personas @@ -150,7 +151,7 @@ def setup_danswer( # update multipass indexing setting based on GPU availability update_default_multipass_indexing(db_session) - # seed_initial_documents(db_session, tenant_id, cohere_enabled) + seed_initial_documents(db_session, tenant_id, cohere_enabled) def translate_saved_search_settings(db_session: Session) -> None: