From 829d04c9043c108760ad6591280bb5cb1e73a58a Mon Sep 17 00:00:00 2001 From: Weves Date: Mon, 2 Oct 2023 00:02:43 -0700 Subject: [PATCH] Add multi-threading to improve speed of updates / indexing --- backend/danswer/datastores/vespa/store.py | 262 ++++++++++++++-------- 1 file changed, 163 insertions(+), 99 deletions(-) diff --git a/backend/danswer/datastores/vespa/store.py b/backend/danswer/datastores/vespa/store.py index 0fc67c1d5..1736194fa 100644 --- a/backend/danswer/datastores/vespa/store.py +++ b/backend/danswer/datastores/vespa/store.py @@ -1,6 +1,9 @@ +import concurrent.futures import json import string +import time from collections.abc import Mapping +from dataclasses import dataclass from typing import Any from typing import cast from uuid import UUID @@ -42,6 +45,7 @@ from danswer.datastores.interfaces import IndexFilter from danswer.datastores.interfaces import UpdateRequest from danswer.datastores.vespa.utils import remove_invalid_unicode_chars from danswer.search.semantic_search import embed_query +from danswer.utils.batching import batch_generator from danswer.utils.logger import setup_logger logger = setup_logger() @@ -56,10 +60,20 @@ DOCUMENT_ID_ENDPOINT = ( ) SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" _BATCH_SIZE = 100 # Specific to Vespa +_NUM_THREADS = ( + 16 # since Vespa doesn't allow batching of inserts / updates, we use threads +) # Specific to Vespa, needed for highlighting matching keywords / section CONTENT_SUMMARY = "content_summary" +@dataclass +class _VespaUpdateRequest: + document_id: str + url: str + update_request: dict[str, dict] + + def _does_document_exist( doc_chunk_id: str, ) -> bool: @@ -108,103 +122,124 @@ def _delete_vespa_doc_chunks(document_id: str) -> bool: return not any(failures) -def _index_vespa_chunks( - chunks: list[DocMetadataAwareIndexChunk], -) -> set[DocumentInsertionRecord]: +def _index_vespa_chunk( + chunk: DocMetadataAwareIndexChunk, already_existing_documents: set[str] +) -> bool: json_header = { "Content-Type": "application/json", } + document = chunk.source_document + # No minichunk documents in vespa, minichunk vectors are stored in the chunk itself + vespa_chunk_id = str(get_uuid_from_chunk(chunk)) + + # Delete all chunks related to the document if (1) it already exists and + # (2) this is our first time running into it during this indexing attempt + chunk_exists = _does_document_exist(vespa_chunk_id) + if chunk_exists and document.id not in already_existing_documents: + deletion_success = _delete_vespa_doc_chunks(document.id) + if not deletion_success: + raise RuntimeError( + f"Failed to delete pre-existing chunks for with document with id: {document.id}" + ) + + embeddings = chunk.embeddings + embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding} + if embeddings.mini_chunk_embeddings: + for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings): + embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed + + vespa_document_fields = { + DOCUMENT_ID: document.id, + CHUNK_ID: chunk.chunk_id, + BLURB: chunk.blurb, + # this duplication of `content` is needed for keyword highlighting :( + CONTENT: chunk.content, + CONTENT_SUMMARY: chunk.content, + SOURCE_TYPE: str(document.source.value), + SOURCE_LINKS: json.dumps(chunk.source_links), + SEMANTIC_IDENTIFIER: document.semantic_identifier, + SECTION_CONTINUATION: chunk.section_continuation, + METADATA: json.dumps(document.metadata), + EMBEDDINGS: embeddings_name_vector_map, + BOOST: DEFAULT_BOOST, + # the only `set` vespa has is `weightedset`, so we have to give each + # element an arbitrary weight + ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()}, + DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets}, + } + + def _index_chunk( + url: str, + headers: dict[str, str], + fields: dict[str, Any], + ) -> Response: + logger.debug(f'Indexing to URL "{url}"') + res = requests.post(url, headers=headers, json={"fields": fields}) + try: + res.raise_for_status() + return res + except Exception as e: + logger.error( + f"Failed to index document: '{document.id}'. Got response: '{res.text}'" + ) + raise e + + vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}" + try: + _index_chunk(vespa_url, json_header, vespa_document_fields) + except HTTPError as e: + if cast(Response, e.response).status_code != 400: + raise e + + # if it's a 400 response, try again with invalid unicode chars removed + # only doing this on error to avoid having to go through the content + # char by char every time + vespa_document_fields[BLURB] = remove_invalid_unicode_chars( + cast(str, vespa_document_fields[BLURB]) + ) + vespa_document_fields[SEMANTIC_IDENTIFIER] = remove_invalid_unicode_chars( + cast(str, vespa_document_fields[SEMANTIC_IDENTIFIER]) + ) + vespa_document_fields[CONTENT] = remove_invalid_unicode_chars( + cast(str, vespa_document_fields[CONTENT]) + ) + vespa_document_fields[CONTENT_SUMMARY] = remove_invalid_unicode_chars( + cast(str, vespa_document_fields[CONTENT_SUMMARY]) + ) + _index_chunk(vespa_url, json_header, vespa_document_fields) + + return chunk_exists + + +def _index_vespa_chunks( + chunks: list[DocMetadataAwareIndexChunk], +) -> set[DocumentInsertionRecord]: insertion_records: set[DocumentInsertionRecord] = set() # document ids of documents that existed BEFORE this indexing already_existing_documents: set[str] = set() - for chunk in chunks: - document = chunk.source_document - # No minichunk documents in vespa, minichunk vectors are stored in the chunk itself - vespa_chunk_id = str(get_uuid_from_chunk(chunk)) - # Delete all chunks related to the document if (1) it already exists and - # (2) this is our first time running into it during this indexing attempt - chunk_exists = _does_document_exist(vespa_chunk_id) - if chunk_exists and document.id not in already_existing_documents: - deletion_success = _delete_vespa_doc_chunks(document.id) - if not deletion_success: - raise RuntimeError( - f"Failed to delete pre-existing chunks for with document with id: {document.id}" + # use threads to parallelize since Vespa doesn't allow batching of updates + with concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor: + for chunk_batch in batch_generator(chunks, _BATCH_SIZE): + future_to_chunk = { + executor.submit( + _index_vespa_chunk, chunk, already_existing_documents + ): chunk + for chunk in chunk_batch + } + for future in concurrent.futures.as_completed(future_to_chunk): + chunk = future_to_chunk[future] + chunk_already_existed = future.result() + if chunk_already_existed: + already_existing_documents.add(chunk.source_document.id) + + insertion_records.add( + DocumentInsertionRecord( + document_id=chunk.source_document.id, + already_existed=chunk.source_document.id + in already_existing_documents, + ) ) - already_existing_documents.add(document.id) - - embeddings = chunk.embeddings - embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding} - if embeddings.mini_chunk_embeddings: - for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings): - embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed - - vespa_document_fields = { - DOCUMENT_ID: document.id, - CHUNK_ID: chunk.chunk_id, - BLURB: chunk.blurb, - # this duplication of `content` is needed for keyword highlighting :( - CONTENT: chunk.content, - CONTENT_SUMMARY: chunk.content, - SOURCE_TYPE: str(document.source.value), - SOURCE_LINKS: json.dumps(chunk.source_links), - SEMANTIC_IDENTIFIER: document.semantic_identifier, - SECTION_CONTINUATION: chunk.section_continuation, - METADATA: json.dumps(document.metadata), - EMBEDDINGS: embeddings_name_vector_map, - BOOST: DEFAULT_BOOST, - # the only `set` vespa has is `weightedset`, so we have to give each - # element an arbitrary weight - ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()}, - DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets}, - } - - def _index_chunk( - url: str, - headers: dict[str, str], - fields: dict[str, Any], - ) -> Response: - logger.debug(f'Indexing to URL "{url}"') - res = requests.post(url, headers=headers, json={"fields": fields}) - try: - res.raise_for_status() - return res - except Exception as e: - logger.error( - f"Failed to index document: '{document.id}'. Got response: '{res.text}'" - ) - raise e - - vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}" - try: - _index_chunk(vespa_url, json_header, vespa_document_fields) - except HTTPError as e: - if cast(Response, e.response).status_code != 400: - raise e - - # if it's a 400 response, try again with invalid unicode chars removed - # only doing this on error to avoid having to go through the content - # char by char every time - vespa_document_fields[BLURB] = remove_invalid_unicode_chars( - cast(str, vespa_document_fields[BLURB]) - ) - vespa_document_fields[SEMANTIC_IDENTIFIER] = remove_invalid_unicode_chars( - cast(str, vespa_document_fields[SEMANTIC_IDENTIFIER]) - ) - vespa_document_fields[CONTENT] = remove_invalid_unicode_chars( - cast(str, vespa_document_fields[CONTENT]) - ) - vespa_document_fields[CONTENT_SUMMARY] = remove_invalid_unicode_chars( - cast(str, vespa_document_fields[CONTENT_SUMMARY]) - ) - _index_chunk(vespa_url, json_header, vespa_document_fields) - - insertion_records.add( - DocumentInsertionRecord( - document_id=document.id, - already_existed=document.id in already_existing_documents, - ) - ) return insertion_records @@ -368,11 +403,38 @@ class VespaIndex(DocumentIndex): ) -> set[DocumentInsertionRecord]: return _index_vespa_chunks(chunks=chunks) + @staticmethod + def _apply_updates_batched( + updates: list[_VespaUpdateRequest], + batch_size: int = _BATCH_SIZE, + ) -> None: + """Runs a batch of updates in parallel via the ThreadPoolExecutor.""" + with concurrent.futures.ThreadPoolExecutor( + max_workers=_NUM_THREADS + ) as executor: + for update_batch in batch_generator(updates, batch_size): + future_to_document_id = { + executor.submit( + requests.put, + update.url, + headers={"Content-Type": "application/json"}, + data=json.dumps(update.update_request), + ): update.document_id + for update in update_batch + } + for future in concurrent.futures.as_completed(future_to_document_id): + res = future.result() + try: + res.raise_for_status() + except requests.HTTPError as e: + failure_msg = f"Failed to update document: {future_to_document_id[future]}" + raise requests.HTTPError(failure_msg) from e + def update(self, update_requests: list[UpdateRequest]) -> None: logger.info(f"Updating {len(update_requests)} documents in Vespa") + start = time.time() - json_header = {"Content-Type": "application/json"} - + processed_updates_requests: list[_VespaUpdateRequest] = [] for update_request in update_requests: if ( update_request.boost is None @@ -400,16 +462,18 @@ class VespaIndex(DocumentIndex): for document_id in update_request.document_ids: for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(document_id): - url = f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}" - res = requests.put( - url, headers=json_header, data=json.dumps(update_dict) + processed_updates_requests.append( + _VespaUpdateRequest( + document_id=document_id, + url=f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}", + update_request=update_dict, + ) ) - try: - res.raise_for_status() - except requests.HTTPError as e: - failure_msg = f"Failed to update document: {document_id}" - raise requests.HTTPError(failure_msg) from e + self._apply_updates_batched(processed_updates_requests) + logger.info( + "Finished updating Vespa documents in %s seconds", time.time() - start + ) def delete(self, doc_ids: list[str]) -> None: logger.info(f"Deleting {len(doc_ids)} documents from Vespa")