Fix Indexing Concurrency (#767)

This commit is contained in:
Yuhong Sun 2023-11-25 21:40:36 -08:00 committed by GitHub
parent ac2ed31726
commit 8391d89bea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 63 deletions

View File

@ -91,10 +91,13 @@ class _VespaUpdateRequest:
update_request: dict[str, dict]
@retry(tries=3, delay=1, backoff=2)
def _does_document_exist(
doc_chunk_id: str,
) -> bool:
"""Returns whether the document already exists and the users/group whitelists"""
"""Returns whether the document already exists and the users/group whitelists
Specifically in this case, document refers to a vespa document which is equivalent to a Danswer
chunk. This checks for whether the chunk exists already in the index"""
doc_fetch_response = requests.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}")
if doc_fetch_response.status_code == 404:
return False
@ -142,25 +145,72 @@ def _get_vespa_chunk_ids_by_document_id(
return doc_chunk_ids
def _delete_vespa_doc_chunks(document_id: str) -> bool:
@retry(tries=3, delay=1, backoff=2)
def _delete_vespa_doc_chunks(document_id: str) -> None:
doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id)
failed = False
for chunk_id in doc_chunk_ids:
success = (
requests.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}").status_code == 200
)
if not success:
failed = True
logger.error(f"Failed to delete chunk: {chunk_id}")
res = requests.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}")
res.raise_for_status()
return not failed
def _delete_vespa_docs(
document_ids: list[str],
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None:
external_executor = True
if not executor:
external_executor = False
executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS)
try:
doc_deletion_future = {
executor.submit(_delete_vespa_doc_chunks, doc_id): doc_id
for doc_id in document_ids
}
for future in concurrent.futures.as_completed(doc_deletion_future):
# Will raise exception if the deletion raised an exception
future.result()
finally:
if not external_executor:
executor.shutdown(wait=True)
def _get_existing_documents_from_chunks(
chunks: list[DocMetadataAwareIndexChunk],
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> set[str]:
external_executor = True
if not executor:
external_executor = False
executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS)
document_ids: set[str] = set()
try:
chunk_existence_future = {
executor.submit(
_does_document_exist, str(get_uuid_from_chunk(chunk))
): chunk
for chunk in chunks
}
for future in concurrent.futures.as_completed(chunk_existence_future):
chunk = chunk_existence_future[future]
chunk_already_existed = future.result()
if chunk_already_existed:
document_ids.add(chunk.source_document.id)
finally:
if not external_executor:
executor.shutdown(wait=True)
return document_ids
@retry(tries=3, delay=1, backoff=2)
def _index_vespa_chunk(
chunk: DocMetadataAwareIndexChunk, already_existing_documents: set[str]
) -> bool:
def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None:
json_header = {
"Content-Type": "application/json",
}
@ -168,16 +218,6 @@ def _index_vespa_chunk(
# No minichunk documents in vespa, minichunk vectors are stored in the chunk itself
vespa_chunk_id = str(get_uuid_from_chunk(chunk))
# Delete all chunks related to the document if (1) it already exists and
# (2) this is our first time running into it during this indexing attempt
chunk_exists = _does_document_exist(vespa_chunk_id)
if chunk_exists and document.id not in already_existing_documents:
deletion_success = _delete_vespa_doc_chunks(document.id)
if not deletion_success:
raise RuntimeError(
f"Failed to delete pre-existing chunks for with document with id: {document.id}"
)
embeddings = chunk.embeddings
embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding}
if embeddings.mini_chunk_embeddings:
@ -260,47 +300,65 @@ def _index_vespa_chunk(
log_error=True,
)
return chunk_exists
def _batch_index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk],
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None:
external_executor = True
if not executor:
external_executor = False
executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS)
try:
chunk_index_future = {
executor.submit(_index_vespa_chunk, chunk): chunk for chunk in chunks
}
for future in concurrent.futures.as_completed(chunk_index_future):
# Will raise exception if any indexing raised an exception
future.result()
finally:
if not external_executor:
executor.shutdown(wait=True)
def _index_vespa_chunks(
def _clear_and_index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk],
) -> set[DocumentInsertionRecord]:
insertion_records: set[DocumentInsertionRecord] = set()
# document ids of documents that existed BEFORE this indexing
already_existing_documents: set[str] = set()
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""
existing_docs: set[str] = set()
# use threads to parallelize since Vespa doesn't allow batching of updates
with concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor:
for chunk_batch in batch_generator(chunks, _BATCH_SIZE):
future_to_chunk = {
executor.submit(
_index_vespa_chunk, chunk, already_existing_documents
): chunk
for chunk in chunk_batch
}
for future in concurrent.futures.as_completed(future_to_chunk):
chunk = future_to_chunk[future]
chunk_already_existed = future.result()
if chunk_already_existed:
already_existing_documents.add(chunk.source_document.id)
# In the logic below, we check if the chunk comes from a doc that has already been
# added to already_existing_document. This works because the chunks are ordered
# and because the Document chunks are not separated into different batches.
# The first chunk is processed first and if it exists, then its entire document
# is marked as already existing, so if the document length increases and new chunks
# are added, they must come last in processing and the doc would already be in
# already existing documents.
insertion_records.add(
DocumentInsertionRecord(
document_id=chunk.source_document.id,
already_existed=chunk.source_document.id
in already_existing_documents,
)
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE):
existing_docs.update(
_get_existing_documents_from_chunks(
chunks=chunk_batch, executor=executor
)
)
return insertion_records
for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE):
_delete_vespa_docs(document_ids=doc_id_batch, executor=executor)
for chunk_batch in batch_generator(chunks, _BATCH_SIZE):
_batch_index_vespa_chunks(chunks=chunk_batch, executor=executor)
all_doc_ids = {chunk.source_document.id for chunk in chunks}
return {
DocumentInsertionRecord(
document_id=doc_id,
already_existed=doc_id in existing_docs,
)
for doc_id in all_doc_ids
}
def _build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str:
@ -523,7 +581,7 @@ class VespaIndex(DocumentIndex):
self,
chunks: list[DocMetadataAwareIndexChunk],
) -> set[DocumentInsertionRecord]:
return _index_vespa_chunks(chunks=chunks)
return _clear_and_index_vespa_chunks(chunks=chunks)
@staticmethod
def _apply_updates_batched(
@ -607,12 +665,7 @@ class VespaIndex(DocumentIndex):
def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
for doc_id in doc_ids:
success = _delete_vespa_doc_chunks(doc_id)
if not success:
raise RuntimeError(
f"Unable to delete document with document id: {doc_id}"
)
_delete_vespa_docs(doc_ids)
def keyword_retrieval(
self,

View File

@ -46,6 +46,9 @@ def embed_chunks(
# Normalize embeddings is only configured via model_configs.py, be sure to use right value for the set loss
embeddings.extend(embedding_model.encode(text_batch))
# Replace line above with the line below for easy debugging of indexing flow, skipping the actual model
# embeddings.extend([[0.0] * 384 for _ in range(len(text_batch))])
embedding_ind_start = 0
for chunk_ind, chunk in enumerate(chunks):
num_embeddings = chunk_mini_chunks_count[chunk_ind]

View File

@ -1,3 +1,5 @@
import os
import numpy as np
import requests
import tensorflow as tf # type: ignore
@ -42,6 +44,8 @@ def get_default_tokenizer() -> AutoTokenizer:
global _TOKENIZER
if _TOKENIZER is None:
_TOKENIZER = AutoTokenizer.from_pretrained(DOCUMENT_ENCODER_MODEL)
if hasattr(_TOKENIZER, "is_fast") and _TOKENIZER.is_fast:
os.environ["TOKENIZERS_PARALLELISM"] = "false"
return _TOKENIZER