Merge pull request #3442 from onyx-dot-app/vespa_seeding_fix

Update initial seeding for latency requirements
This commit is contained in:
pablonyx
2024-12-12 09:59:50 -08:00
committed by GitHub
4 changed files with 29 additions and 18 deletions

View File

@@ -148,6 +148,7 @@ class Indexable(abc.ABC):
def index( def index(
self, self,
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]: ) -> set[DocumentInsertionRecord]:
""" """
Takes a list of document chunks and indexes them in the document index Takes a list of document chunks and indexes them in the document index
@@ -165,9 +166,14 @@ class Indexable(abc.ABC):
only needs to index chunks into the PRIMARY index. Do not update the secondary index here, only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code. it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters: Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document - chunks: Document chunks with all of the information needed for indexing to the document
index. index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
Returns: Returns:
List of document ids which map to unique documents and are used for deduping chunks List of document ids which map to unique documents and are used for deduping chunks

View File

@@ -306,6 +306,7 @@ class VespaIndex(DocumentIndex):
def index( def index(
self, self,
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]: ) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along """Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into with updating the associated permissions. Assumes that a document will not be split into
@@ -322,26 +323,29 @@ class VespaIndex(DocumentIndex):
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client, get_vespa_http_client() as http_client,
): ):
# Check for existing documents, existing documents need to have all of their chunks deleted if not fresh_index:
# prior to indexing as the document size (num chunks) may have shrunk # Check for existing documents, existing documents need to have all of their chunks deleted
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] # prior to indexing as the document size (num chunks) may have shrunk
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): first_chunks = [
existing_docs.update( chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
get_existing_documents_from_chunks( ]
chunks=chunk_batch, for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name, index_name=self.index_name,
http_client=http_client, http_client=http_client,
executor=executor, executor=executor,
) )
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks( batch_index_vespa_chunks(

View File

@@ -216,7 +216,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay # as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder()(document_index.index) index_with_retries = retry_builder()(document_index.index)
index_with_retries(chunks=chunks) index_with_retries(chunks=chunks, fresh_index=True)
# Mock a run for the UI even though it did not actually call out to anything # Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt( mock_successful_index_attempt(

View File

@@ -39,6 +39,7 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder
from danswer.seeding.load_docs import seed_initial_documents
from danswer.seeding.load_yamls import load_chat_yamls from danswer.seeding.load_yamls import load_chat_yamls
from danswer.server.manage.llm.models import LLMProviderUpsertRequest from danswer.server.manage.llm.models import LLMProviderUpsertRequest
from danswer.server.settings.store import load_settings from danswer.server.settings.store import load_settings
@@ -150,7 +151,7 @@ def setup_danswer(
# update multipass indexing setting based on GPU availability # update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session) update_default_multipass_indexing(db_session)
# seed_initial_documents(db_session, tenant_id, cohere_enabled) seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None: def translate_saved_search_settings(db_session: Session) -> None: