update indexing

This commit is contained in:
pablodanswer 2024-12-11 19:08:05 -08:00
parent 7c29b1e028
commit 046c0fbe3e
4 changed files with 27 additions and 19 deletions

View File

@ -143,11 +143,11 @@ class Indexable(abc.ABC):
""" """
Class must implement the ability to index document chunks Class must implement the ability to index document chunks
""" """
@abc.abstractmethod @abc.abstractmethod
def index( def index(
self, self,
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]: ) -> set[DocumentInsertionRecord]:
""" """
Takes a list of document chunks and indexes them in the document index Takes a list of document chunks and indexes them in the document index
@ -165,9 +165,14 @@ class Indexable(abc.ABC):
only needs to index chunks into the PRIMARY index. Do not update the secondary index here, only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code. it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters: Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document - chunks: Document chunks with all of the information needed for indexing to the document
index. index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
Returns: Returns:
List of document ids which map to unique documents and are used for deduping chunks List of document ids which map to unique documents and are used for deduping chunks

View File

@ -306,6 +306,7 @@ class VespaIndex(DocumentIndex):
def index( def index(
self, self,
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]: ) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along """Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into with updating the associated permissions. Assumes that a document will not be split into
@ -322,26 +323,27 @@ class VespaIndex(DocumentIndex):
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client, get_vespa_http_client() as http_client,
): ):
# Check for existing documents, existing documents need to have all of their chunks deleted if not fresh_index:
# prior to indexing as the document size (num chunks) may have shrunk # Check for existing documents, existing documents need to have all of their chunks deleted
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] # prior to indexing as the document size (num chunks) may have shrunk
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
existing_docs.update( for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
get_existing_documents_from_chunks( existing_docs.update(
chunks=chunk_batch, get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name, index_name=self.index_name,
http_client=http_client, http_client=http_client,
executor=executor, executor=executor,
) )
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks( batch_index_vespa_chunks(

View File

@ -216,7 +216,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay # as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder()(document_index.index) index_with_retries = retry_builder()(document_index.index)
index_with_retries(chunks=chunks) index_with_retries(chunks=chunks, fresh_index=True)
# Mock a run for the UI even though it did not actually call out to anything # Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt( mock_successful_index_attempt(

View File

@ -22,6 +22,7 @@ from danswer.db.document import check_docs_exist
from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import expire_index_attempts from danswer.db.index_attempt import expire_index_attempts
from danswer.db.llm import fetch_default_provider from danswer.db.llm import fetch_default_provider
from danswer.seeding.load_docs import seed_initial_documents
from danswer.db.llm import update_default_provider from danswer.db.llm import update_default_provider
from danswer.db.llm import upsert_llm_provider from danswer.db.llm import upsert_llm_provider
from danswer.db.persona import delete_old_default_personas from danswer.db.persona import delete_old_default_personas
@ -150,7 +151,7 @@ def setup_danswer(
# update multipass indexing setting based on GPU availability # update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session) update_default_multipass_indexing(db_session)
# seed_initial_documents(db_session, tenant_id, cohere_enabled) seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None: def translate_saved_search_settings(db_session: Session) -> None: