Add multi-threading to improve speed of updates / indexing

This commit is contained in:
Weves
2023-10-02 00:02:43 -07:00
committed by Chris Weaver
parent 351475de28
commit 829d04c904

View File

@@ -1,6 +1,9 @@
import concurrent.futures
import json import json
import string import string
import time
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any from typing import Any
from typing import cast from typing import cast
from uuid import UUID from uuid import UUID
@@ -42,6 +45,7 @@ from danswer.datastores.interfaces import IndexFilter
from danswer.datastores.interfaces import UpdateRequest from danswer.datastores.interfaces import UpdateRequest
from danswer.datastores.vespa.utils import remove_invalid_unicode_chars from danswer.datastores.vespa.utils import remove_invalid_unicode_chars
from danswer.search.semantic_search import embed_query from danswer.search.semantic_search import embed_query
from danswer.utils.batching import batch_generator
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
@@ -56,10 +60,20 @@ DOCUMENT_ID_ENDPOINT = (
) )
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
_BATCH_SIZE = 100 # Specific to Vespa _BATCH_SIZE = 100 # Specific to Vespa
_NUM_THREADS = (
16 # since Vespa doesn't allow batching of inserts / updates, we use threads
)
# Specific to Vespa, needed for highlighting matching keywords / section # Specific to Vespa, needed for highlighting matching keywords / section
CONTENT_SUMMARY = "content_summary" CONTENT_SUMMARY = "content_summary"
@dataclass
class _VespaUpdateRequest:
document_id: str
url: str
update_request: dict[str, dict]
def _does_document_exist( def _does_document_exist(
doc_chunk_id: str, doc_chunk_id: str,
) -> bool: ) -> bool:
@@ -108,103 +122,124 @@ def _delete_vespa_doc_chunks(document_id: str) -> bool:
return not any(failures) return not any(failures)
def _index_vespa_chunks( def _index_vespa_chunk(
chunks: list[DocMetadataAwareIndexChunk], chunk: DocMetadataAwareIndexChunk, already_existing_documents: set[str]
) -> set[DocumentInsertionRecord]: ) -> bool:
json_header = { json_header = {
"Content-Type": "application/json", "Content-Type": "application/json",
} }
document = chunk.source_document
# No minichunk documents in vespa, minichunk vectors are stored in the chunk itself
vespa_chunk_id = str(get_uuid_from_chunk(chunk))
# Delete all chunks related to the document if (1) it already exists and
# (2) this is our first time running into it during this indexing attempt
chunk_exists = _does_document_exist(vespa_chunk_id)
if chunk_exists and document.id not in already_existing_documents:
deletion_success = _delete_vespa_doc_chunks(document.id)
if not deletion_success:
raise RuntimeError(
f"Failed to delete pre-existing chunks for with document with id: {document.id}"
)
embeddings = chunk.embeddings
embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding}
if embeddings.mini_chunk_embeddings:
for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings):
embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed
vespa_document_fields = {
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: chunk.blurb,
# this duplication of `content` is needed for keyword highlighting :(
CONTENT: chunk.content,
CONTENT_SUMMARY: chunk.content,
SOURCE_TYPE: str(document.source.value),
SOURCE_LINKS: json.dumps(chunk.source_links),
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
METADATA: json.dumps(document.metadata),
EMBEDDINGS: embeddings_name_vector_map,
BOOST: DEFAULT_BOOST,
# the only `set` vespa has is `weightedset`, so we have to give each
# element an arbitrary weight
ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()},
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
def _index_chunk(
url: str,
headers: dict[str, str],
fields: dict[str, Any],
) -> Response:
logger.debug(f'Indexing to URL "{url}"')
res = requests.post(url, headers=headers, json={"fields": fields})
try:
res.raise_for_status()
return res
except Exception as e:
logger.error(
f"Failed to index document: '{document.id}'. Got response: '{res.text}'"
)
raise e
vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}"
try:
_index_chunk(vespa_url, json_header, vespa_document_fields)
except HTTPError as e:
if cast(Response, e.response).status_code != 400:
raise e
# if it's a 400 response, try again with invalid unicode chars removed
# only doing this on error to avoid having to go through the content
# char by char every time
vespa_document_fields[BLURB] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[BLURB])
)
vespa_document_fields[SEMANTIC_IDENTIFIER] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[SEMANTIC_IDENTIFIER])
)
vespa_document_fields[CONTENT] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[CONTENT])
)
vespa_document_fields[CONTENT_SUMMARY] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[CONTENT_SUMMARY])
)
_index_chunk(vespa_url, json_header, vespa_document_fields)
return chunk_exists
def _index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk],
) -> set[DocumentInsertionRecord]:
insertion_records: set[DocumentInsertionRecord] = set() insertion_records: set[DocumentInsertionRecord] = set()
# document ids of documents that existed BEFORE this indexing # document ids of documents that existed BEFORE this indexing
already_existing_documents: set[str] = set() already_existing_documents: set[str] = set()
for chunk in chunks:
document = chunk.source_document
# No minichunk documents in vespa, minichunk vectors are stored in the chunk itself
vespa_chunk_id = str(get_uuid_from_chunk(chunk))
# Delete all chunks related to the document if (1) it already exists and # use threads to parallelize since Vespa doesn't allow batching of updates
# (2) this is our first time running into it during this indexing attempt with concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor:
chunk_exists = _does_document_exist(vespa_chunk_id) for chunk_batch in batch_generator(chunks, _BATCH_SIZE):
if chunk_exists and document.id not in already_existing_documents: future_to_chunk = {
deletion_success = _delete_vespa_doc_chunks(document.id) executor.submit(
if not deletion_success: _index_vespa_chunk, chunk, already_existing_documents
raise RuntimeError( ): chunk
f"Failed to delete pre-existing chunks for with document with id: {document.id}" for chunk in chunk_batch
}
for future in concurrent.futures.as_completed(future_to_chunk):
chunk = future_to_chunk[future]
chunk_already_existed = future.result()
if chunk_already_existed:
already_existing_documents.add(chunk.source_document.id)
insertion_records.add(
DocumentInsertionRecord(
document_id=chunk.source_document.id,
already_existed=chunk.source_document.id
in already_existing_documents,
)
) )
already_existing_documents.add(document.id)
embeddings = chunk.embeddings
embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding}
if embeddings.mini_chunk_embeddings:
for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings):
embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed
vespa_document_fields = {
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: chunk.blurb,
# this duplication of `content` is needed for keyword highlighting :(
CONTENT: chunk.content,
CONTENT_SUMMARY: chunk.content,
SOURCE_TYPE: str(document.source.value),
SOURCE_LINKS: json.dumps(chunk.source_links),
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
METADATA: json.dumps(document.metadata),
EMBEDDINGS: embeddings_name_vector_map,
BOOST: DEFAULT_BOOST,
# the only `set` vespa has is `weightedset`, so we have to give each
# element an arbitrary weight
ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()},
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
def _index_chunk(
url: str,
headers: dict[str, str],
fields: dict[str, Any],
) -> Response:
logger.debug(f'Indexing to URL "{url}"')
res = requests.post(url, headers=headers, json={"fields": fields})
try:
res.raise_for_status()
return res
except Exception as e:
logger.error(
f"Failed to index document: '{document.id}'. Got response: '{res.text}'"
)
raise e
vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}"
try:
_index_chunk(vespa_url, json_header, vespa_document_fields)
except HTTPError as e:
if cast(Response, e.response).status_code != 400:
raise e
# if it's a 400 response, try again with invalid unicode chars removed
# only doing this on error to avoid having to go through the content
# char by char every time
vespa_document_fields[BLURB] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[BLURB])
)
vespa_document_fields[SEMANTIC_IDENTIFIER] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[SEMANTIC_IDENTIFIER])
)
vespa_document_fields[CONTENT] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[CONTENT])
)
vespa_document_fields[CONTENT_SUMMARY] = remove_invalid_unicode_chars(
cast(str, vespa_document_fields[CONTENT_SUMMARY])
)
_index_chunk(vespa_url, json_header, vespa_document_fields)
insertion_records.add(
DocumentInsertionRecord(
document_id=document.id,
already_existed=document.id in already_existing_documents,
)
)
return insertion_records return insertion_records
@@ -368,11 +403,38 @@ class VespaIndex(DocumentIndex):
) -> set[DocumentInsertionRecord]: ) -> set[DocumentInsertionRecord]:
return _index_vespa_chunks(chunks=chunks) return _index_vespa_chunks(chunks=chunks)
@staticmethod
def _apply_updates_batched(
updates: list[_VespaUpdateRequest],
batch_size: int = _BATCH_SIZE,
) -> None:
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=_NUM_THREADS
) as executor:
for update_batch in batch_generator(updates, batch_size):
future_to_document_id = {
executor.submit(
requests.put,
update.url,
headers={"Content-Type": "application/json"},
data=json.dumps(update.update_request),
): update.document_id
for update in update_batch
}
for future in concurrent.futures.as_completed(future_to_document_id):
res = future.result()
try:
res.raise_for_status()
except requests.HTTPError as e:
failure_msg = f"Failed to update document: {future_to_document_id[future]}"
raise requests.HTTPError(failure_msg) from e
def update(self, update_requests: list[UpdateRequest]) -> None: def update(self, update_requests: list[UpdateRequest]) -> None:
logger.info(f"Updating {len(update_requests)} documents in Vespa") logger.info(f"Updating {len(update_requests)} documents in Vespa")
start = time.time()
json_header = {"Content-Type": "application/json"} processed_updates_requests: list[_VespaUpdateRequest] = []
for update_request in update_requests: for update_request in update_requests:
if ( if (
update_request.boost is None update_request.boost is None
@@ -400,16 +462,18 @@ class VespaIndex(DocumentIndex):
for document_id in update_request.document_ids: for document_id in update_request.document_ids:
for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(document_id): for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(document_id):
url = f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}" processed_updates_requests.append(
res = requests.put( _VespaUpdateRequest(
url, headers=json_header, data=json.dumps(update_dict) document_id=document_id,
url=f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}",
update_request=update_dict,
)
) )
try: self._apply_updates_batched(processed_updates_requests)
res.raise_for_status() logger.info(
except requests.HTTPError as e: "Finished updating Vespa documents in %s seconds", time.time() - start
failure_msg = f"Failed to update document: {document_id}" )
raise requests.HTTPError(failure_msg) from e
def delete(self, doc_ids: list[str]) -> None: def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa") logger.info(f"Deleting {len(doc_ids)} documents from Vespa")