From 61b5bd569be006a3babc896333216bcddf928ee0 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Wed, 14 Aug 2024 22:18:53 -0700 Subject: [PATCH] Reworked chunking to support mega chunks (#2032) --- backend/danswer/configs/app_configs.py | 4 + backend/danswer/configs/constants.py | 32 +- backend/danswer/configs/model_configs.py | 2 + .../document_index/document_index_utils.py | 7 + backend/danswer/document_index/interfaces.py | 34 +- .../vespa/app_config/schemas/danswer_chunk.sd | 4 + .../document_index/vespa/chunk_retrieval.py | 424 +++++++++ .../danswer/document_index/vespa/deletion.py | 65 ++ backend/danswer/document_index/vespa/index.py | 842 +++--------------- .../document_index/vespa/indexing_utils.py | 227 +++++ .../vespa/{ => shared_utils}/utils.py | 0 .../shared_utils/vespa_request_builders.py | 96 ++ .../danswer/document_index/vespa_constants.py | 85 ++ backend/danswer/indexing/chunker.py | 450 +++++----- backend/danswer/indexing/embedder.py | 37 +- backend/danswer/indexing/indexing_pipeline.py | 25 +- backend/danswer/indexing/models.py | 2 + .../search_nlp_models.py | 79 +- backend/danswer/search/models.py | 1 + backend/danswer/search/pipeline.py | 83 +- .../danswer/search/retrieval/search_runner.py | 123 ++- backend/danswer/server/documents/document.py | 16 +- backend/model_server/custom_models.py | 1 + .../tests/regression/answer_quality/README.md | 13 +- .../regression/answer_quality/api_utils.py | 1 - .../answer_quality/file_uploader.py | 15 +- .../unit/danswer/indexing/test_chunker.py | 10 +- 27 files changed, 1522 insertions(+), 1156 deletions(-) create mode 100644 backend/danswer/document_index/vespa/chunk_retrieval.py create mode 100644 backend/danswer/document_index/vespa/deletion.py create mode 100644 backend/danswer/document_index/vespa/indexing_utils.py rename backend/danswer/document_index/vespa/{ => shared_utils}/utils.py (100%) create mode 100644 backend/danswer/document_index/vespa/shared_utils/vespa_request_builders.py create mode 100644 backend/danswer/document_index/vespa_constants.py diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 50f7fc5ad5..eeb8a1a7a5 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -276,6 +276,10 @@ ENABLE_MULTIPASS_INDEXING = ( # Slightly larger since the sentence aware split is a max cutoff so most minichunks will be under MINI_CHUNK_SIZE # tokens. But we need it to be at least as big as 1/4th chunk size to avoid having a tiny mini-chunk at the end MINI_CHUNK_SIZE = 150 + +# This is the number of regular chunks per large chunk +LARGE_CHUNK_RATIO = 4 + # Include the document level metadata in each chunk. If the metadata is too long, then it is thrown out # We don't want the metadata to overwhelm the actual contents of the chunk SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true" diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index dfbb0d0e67..408ffb2f18 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -1,26 +1,6 @@ from enum import Enum -DOCUMENT_ID = "document_id" -CHUNK_ID = "chunk_id" -BLURB = "blurb" -CONTENT = "content" SOURCE_TYPE = "source_type" -SOURCE_LINKS = "source_links" -SOURCE_LINK = "link" -SEMANTIC_IDENTIFIER = "semantic_identifier" -TITLE = "title" -SKIP_TITLE_EMBEDDING = "skip_title" -SECTION_CONTINUATION = "section_continuation" -EMBEDDINGS = "embeddings" -TITLE_EMBEDDING = "title_embedding" -ALLOWED_USERS = "allowed_users" -ACCESS_CONTROL_LIST = "access_control_list" -DOCUMENT_SETS = "document_sets" -TIME_FILTER = "time_filter" -METADATA = "metadata" -METADATA_LIST = "metadata_list" -METADATA_SUFFIX = "metadata_suffix" -MATCH_HIGHLIGHTS = "match_highlights" # stored in the `metadata` of a chunk. Used to signify that this chunk should # not be used for QA. For example, Google Drive file types which can't be parsed # are still useful as a search result but not for QA. @@ -28,20 +8,10 @@ IGNORE_FOR_QA = "ignore_for_qa" # NOTE: deprecated, only used for porting key from old system GEN_AI_API_KEY_STORAGE_KEY = "genai_api_key" PUBLIC_DOC_PAT = "PUBLIC" -PUBLIC_DOCUMENT_SET = "__PUBLIC" -QUOTE = "quote" -BOOST = "boost" -DOC_UPDATED_AT = "doc_updated_at" # Indexed as seconds since epoch -PRIMARY_OWNERS = "primary_owners" -SECONDARY_OWNERS = "secondary_owners" -RECENCY_BIAS = "recency_bias" -HIDDEN = "hidden" -SCORE = "score" ID_SEPARATOR = ":;:" DEFAULT_BOOST = 0 SESSION_KEY = "session" -QUERY_EVENT_ID = "query_event_id" -LLM_CHUNKS = "llm_chunks" + # For chunking/processing chunks RETURN_SEPARATOR = "\n\r\n" diff --git a/backend/danswer/configs/model_configs.py b/backend/danswer/configs/model_configs.py index ec2b418ce2..e2b8ee7f62 100644 --- a/backend/danswer/configs/model_configs.py +++ b/backend/danswer/configs/model_configs.py @@ -19,6 +19,8 @@ DOCUMENT_ENCODER_MODEL = ( # If the below is changed, Vespa deployment must also be changed DOC_EMBEDDING_DIM = int(os.environ.get("DOC_EMBEDDING_DIM") or 768) # Model should be chosen with 512 context size, ideally don't change this +# If multipass_indexing is enabled, the max context size would be set to +# DOC_EMBEDDING_CONTEXT_SIZE * LARGE_CHUNK_RATIO DOC_EMBEDDING_CONTEXT_SIZE = 512 NORMALIZE_EMBEDDINGS = ( os.environ.get("NORMALIZE_EMBEDDINGS") or "true" diff --git a/backend/danswer/document_index/document_index_utils.py b/backend/danswer/document_index/document_index_utils.py index 271fd0cc2e..21959041bd 100644 --- a/backend/danswer/document_index/document_index_utils.py +++ b/backend/danswer/document_index/document_index_utils.py @@ -50,4 +50,11 @@ def get_uuid_from_chunk( unique_identifier_string = "_".join( [doc_str, str(chunk.chunk_id), str(mini_chunk_ind)] ) + if chunk.large_chunk_reference_ids: + unique_identifier_string += "_large" + "_".join( + [ + str(referenced_chunk_id) + for referenced_chunk_id in chunk.large_chunk_reference_ids + ] + ) return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py index 6c5ad20322..2acd097795 100644 --- a/backend/danswer/document_index/interfaces.py +++ b/backend/danswer/document_index/interfaces.py @@ -16,6 +16,25 @@ class DocumentInsertionRecord: already_existed: bool +@dataclass(frozen=True) +class VespaChunkRequest: + document_id: str + min_chunk_ind: int | None = None + max_chunk_ind: int | None = None + + @property + def is_capped(self) -> bool: + # If the max chunk index is not None, then the chunk request is capped + # If the min chunk index is None, we can assume the min is 0 + return self.max_chunk_ind is not None + + @property + def range(self) -> int | None: + if self.max_chunk_ind is not None: + return (self.max_chunk_ind - (self.min_chunk_ind or 0)) + 1 + return None + + @dataclass class DocumentMetadata: """ @@ -183,10 +202,9 @@ class IdRetrievalCapable(abc.ABC): @abc.abstractmethod def id_based_retrieval( self, - document_id: str, - min_chunk_ind: int | None, - max_chunk_ind: int | None, - user_access_control_list: list[str] | None = None, + chunk_requests: list[VespaChunkRequest], + filters: IndexFilters, + batch_retrieval: bool = False, ) -> list[InferenceChunkUncleaned]: """ Fetch chunk(s) based on document id @@ -197,11 +215,9 @@ class IdRetrievalCapable(abc.ABC): or extended section will have duplicate segments. Parameters: - - document_id: document id for which to retrieve the chunk(s) - - min_chunk_ind: if None then fetch from the start of doc - - max_chunk_ind: - - filters: standard filters object, in this case only the access filter is applied as a - permission check + - chunk_requests: requests containing the document id and the chunk range to retrieve + - filters: Filters to apply to retrieval + - batch_retrieval: If True, perform a batch retrieval Returns: list of chunks for the document id or the specific chunk by the specified chunk index diff --git a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd index 27d8aafd65..bf7772e154 100644 --- a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd +++ b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd @@ -88,6 +88,10 @@ schema DANSWER_CHUNK_NAME { rank:filter attribute: fast-search } + # If chunk is a large chunk, this will contain the ids of the smaller chunks + field large_chunk_reference_ids type array { + indexing: summary | attribute + } field metadata type string { indexing: summary | attribute } diff --git a/backend/danswer/document_index/vespa/chunk_retrieval.py b/backend/danswer/document_index/vespa/chunk_retrieval.py new file mode 100644 index 0000000000..12f6de0c1f --- /dev/null +++ b/backend/danswer/document_index/vespa/chunk_retrieval.py @@ -0,0 +1,424 @@ +import json +import string +from collections.abc import Callable +from collections.abc import Mapping +from datetime import datetime +from datetime import timezone +from typing import Any +from typing import cast + +import requests +from retry import retry + +from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION +from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.vespa.shared_utils.vespa_request_builders import ( + build_vespa_filters, +) +from danswer.document_index.vespa.shared_utils.vespa_request_builders import ( + build_vespa_id_based_retrieval_yql, +) +from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST +from danswer.document_index.vespa_constants import BLURB +from danswer.document_index.vespa_constants import BOOST +from danswer.document_index.vespa_constants import CHUNK_ID +from danswer.document_index.vespa_constants import CONTENT +from danswer.document_index.vespa_constants import CONTENT_SUMMARY +from danswer.document_index.vespa_constants import DOC_UPDATED_AT +from danswer.document_index.vespa_constants import DOCUMENT_ID +from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT +from danswer.document_index.vespa_constants import HIDDEN +from danswer.document_index.vespa_constants import LARGE_CHUNK_REFERENCE_IDS +from danswer.document_index.vespa_constants import MAX_ID_SEARCH_QUERY_SIZE +from danswer.document_index.vespa_constants import METADATA +from danswer.document_index.vespa_constants import METADATA_SUFFIX +from danswer.document_index.vespa_constants import PRIMARY_OWNERS +from danswer.document_index.vespa_constants import RECENCY_BIAS +from danswer.document_index.vespa_constants import SEARCH_ENDPOINT +from danswer.document_index.vespa_constants import SECONDARY_OWNERS +from danswer.document_index.vespa_constants import SECTION_CONTINUATION +from danswer.document_index.vespa_constants import SEMANTIC_IDENTIFIER +from danswer.document_index.vespa_constants import SOURCE_LINKS +from danswer.document_index.vespa_constants import SOURCE_TYPE +from danswer.document_index.vespa_constants import TITLE +from danswer.document_index.vespa_constants import YQL_BASE +from danswer.search.models import IndexFilters +from danswer.search.models import InferenceChunkUncleaned +from danswer.utils.logger import setup_logger +from danswer.utils.threadpool_concurrency import run_functions_tuples_in_parallel + +logger = setup_logger() + + +def _process_dynamic_summary( + dynamic_summary: str, max_summary_length: int = 400 +) -> list[str]: + if not dynamic_summary: + return [] + + current_length = 0 + processed_summary: list[str] = [] + for summary_section in dynamic_summary.split(""): + # if we're past the desired max length, break at the last word + if current_length + len(summary_section) >= max_summary_length: + summary_section = summary_section[: max_summary_length - current_length] + summary_section = summary_section.lstrip() # remove any leading whitespace + + # handle the case where the truncated section is either just a + # single (partial) word or if it's empty + first_space = summary_section.find(" ") + if first_space == -1: + # add ``...`` to previous section + if processed_summary: + processed_summary[-1] += "..." + break + + # handle the valid truncated section case + summary_section = summary_section.rsplit(" ", 1)[0] + if summary_section[-1] in string.punctuation: + summary_section = summary_section[:-1] + summary_section += "..." + processed_summary.append(summary_section) + break + + processed_summary.append(summary_section) + current_length += len(summary_section) + + return processed_summary + + +def _vespa_hit_to_inference_chunk( + hit: dict[str, Any], null_score: bool = False +) -> InferenceChunkUncleaned: + fields = cast(dict[str, Any], hit["fields"]) + + # parse fields that are stored as strings, but are really json / datetime + metadata = json.loads(fields[METADATA]) if METADATA in fields else {} + updated_at = ( + datetime.fromtimestamp(fields[DOC_UPDATED_AT], tz=timezone.utc) + if DOC_UPDATED_AT in fields + else None + ) + + match_highlights = _process_dynamic_summary( + # fallback to regular `content` if the `content_summary` field + # isn't present + dynamic_summary=hit["fields"].get(CONTENT_SUMMARY, hit["fields"][CONTENT]), + ) + semantic_identifier = fields.get(SEMANTIC_IDENTIFIER, "") + if not semantic_identifier: + logger.error( + f"Chunk with blurb: {fields.get(BLURB, 'Unknown')[:50]}... has no Semantic Identifier" + ) + + source_links = fields.get(SOURCE_LINKS, {}) + source_links_dict_unprocessed = ( + json.loads(source_links) if isinstance(source_links, str) else source_links + ) + source_links_dict = { + int(k): v + for k, v in cast(dict[str, str], source_links_dict_unprocessed).items() + } + + return InferenceChunkUncleaned( + chunk_id=fields[CHUNK_ID], + blurb=fields.get(BLURB, ""), # Unused + content=fields[CONTENT], # Includes extra title prefix and metadata suffix + source_links=source_links_dict or {0: ""}, + section_continuation=fields[SECTION_CONTINUATION], + document_id=fields[DOCUMENT_ID], + source_type=fields[SOURCE_TYPE], + title=fields.get(TITLE), + semantic_identifier=fields[SEMANTIC_IDENTIFIER], + boost=fields.get(BOOST, 1), + recency_bias=fields.get("matchfeatures", {}).get(RECENCY_BIAS, 1.0), + score=None if null_score else hit.get("relevance", 0), + hidden=fields.get(HIDDEN, False), + primary_owners=fields.get(PRIMARY_OWNERS), + secondary_owners=fields.get(SECONDARY_OWNERS), + large_chunk_reference_ids=fields.get(LARGE_CHUNK_REFERENCE_IDS, []), + metadata=metadata, + metadata_suffix=fields.get(METADATA_SUFFIX), + match_highlights=match_highlights, + updated_at=updated_at, + ) + + +def _get_chunks_via_visit_api( + chunk_request: VespaChunkRequest, + index_name: str, + filters: IndexFilters, + field_names: list[str] | None = None, + get_large_chunks: bool = False, +) -> list[dict]: + # Constructing the URL for the Visit API + # NOTE: visit API uses the same URL as the document API, but with different params + url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name) + + # build the list of fields to retrieve + field_set_list = ( + None + if not field_names + else [f"{index_name}:{field_name}" for field_name in field_names] + ) + acl_fieldset_entry = f"{index_name}:{ACCESS_CONTROL_LIST}" + if ( + field_set_list + and filters.access_control_list + and acl_fieldset_entry not in field_set_list + ): + field_set_list.append(acl_fieldset_entry) + field_set = ",".join(field_set_list) if field_set_list else None + + # build filters + selection = f"{index_name}.document_id=='{chunk_request.document_id}'" + + if chunk_request.is_capped: + selection += f" and {index_name}.chunk_id>={chunk_request.min_chunk_ind or 0}" + selection += f" and {index_name}.chunk_id<={chunk_request.max_chunk_ind}" + if not get_large_chunks: + selection += f" and {index_name}.large_chunk_reference_ids == null" + + # Setting up the selection criteria in the query parameters + params = { + # NOTE: Document Selector Language doesn't allow `contains`, so we can't check + # for the ACL in the selection. Instead, we have to check as a postfilter + "selection": selection, + "continuation": None, + "wantedDocumentCount": 1_000, + "fieldSet": field_set, + } + + document_chunks: list[dict] = [] + while True: + response = requests.get(url, params=params) + try: + response.raise_for_status() + except requests.HTTPError as e: + request_info = f"Headers: {response.request.headers}\nPayload: {params}" + response_info = f"Status Code: {response.status_code}\nResponse Content: {response.text}" + error_base = f"Error occurred getting chunk by Document ID {chunk_request.document_id}" + logger.error( + f"{error_base}:\n" + f"{request_info}\n" + f"{response_info}\n" + f"Exception: {e}" + ) + raise requests.HTTPError(error_base) from e + + # Check if the response contains any documents + response_data = response.json() + if "documents" in response_data: + for document in response_data["documents"]: + if filters.access_control_list: + document_acl = document["fields"].get(ACCESS_CONTROL_LIST) + if not document_acl or not any( + user_acl_entry in document_acl + for user_acl_entry in filters.access_control_list + ): + continue + document_chunks.append(document) + + # Check for continuation token to handle pagination + if "continuation" in response_data and response_data["continuation"]: + params["continuation"] = response_data["continuation"] + else: + break # Exit loop if no continuation token + + return document_chunks + + +def get_all_vespa_ids_for_document_id( + document_id: str, + index_name: str, + filters: IndexFilters | None = None, + get_large_chunks: bool = False, +) -> list[str]: + document_chunks = _get_chunks_via_visit_api( + chunk_request=VespaChunkRequest(document_id=document_id), + index_name=index_name, + filters=filters or IndexFilters(access_control_list=None), + field_names=[DOCUMENT_ID], + get_large_chunks=get_large_chunks, + ) + return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks] + + +def parallel_visit_api_retrieval( + index_name: str, + chunk_requests: list[VespaChunkRequest], + filters: IndexFilters, + get_large_chunks: bool = False, +) -> list[InferenceChunkUncleaned]: + functions_with_args: list[tuple[Callable, tuple]] = [ + ( + _get_chunks_via_visit_api, + (chunk_request, index_name, filters, get_large_chunks), + ) + for chunk_request in chunk_requests + ] + + parallel_results = run_functions_tuples_in_parallel( + functions_with_args, allow_failures=True + ) + + # Any failures to retrieve would give a None, drop the Nones and empty lists + vespa_chunk_sets = [res for res in parallel_results if res] + + flattened_vespa_chunks = [] + for chunk_set in vespa_chunk_sets: + flattened_vespa_chunks.extend(chunk_set) + + inference_chunks = [ + _vespa_hit_to_inference_chunk(chunk, null_score=True) + for chunk in flattened_vespa_chunks + ] + + return inference_chunks + + +@retry(tries=3, delay=1, backoff=2) +def query_vespa( + query_params: Mapping[str, str | int | float] +) -> list[InferenceChunkUncleaned]: + if "query" in query_params and not cast(str, query_params["query"]).strip(): + raise ValueError("No/empty query received") + + params = dict( + **query_params, + **{ + "presentation.timing": True, + } + if LOG_VESPA_TIMING_INFORMATION + else {}, + ) + + response = requests.post( + SEARCH_ENDPOINT, + json=params, + ) + try: + response.raise_for_status() + except requests.HTTPError as e: + request_info = f"Headers: {response.request.headers}\nPayload: {params}" + response_info = ( + f"Status Code: {response.status_code}\n" + f"Response Content: {response.text}" + ) + error_base = "Failed to query Vespa" + logger.error( + f"{error_base}:\n" + f"{request_info}\n" + f"{response_info}\n" + f"Exception: {e}" + ) + raise requests.HTTPError(error_base) from e + + response_json: dict[str, Any] = response.json() + if LOG_VESPA_TIMING_INFORMATION: + logger.info("Vespa timing info: %s", response_json.get("timing")) + hits = response_json["root"].get("children", []) + + for hit in hits: + if hit["fields"].get(CONTENT) is None: + identifier = hit["fields"].get("documentid") or hit["id"] + logger.error( + f"Vespa Index with Vespa ID {identifier} has no contents. " + f"This is invalid because the vector is not meaningful and keywordsearch cannot " + f"fetch this document" + ) + + filtered_hits = [hit for hit in hits if hit["fields"].get(CONTENT) is not None] + + inference_chunks = [_vespa_hit_to_inference_chunk(hit) for hit in filtered_hits] + # Good Debugging Spot + return inference_chunks + + +def _get_chunks_via_batch_search( + index_name: str, + chunk_requests: list[VespaChunkRequest], + filters: IndexFilters, + get_large_chunks: bool = False, +) -> list[InferenceChunkUncleaned]: + if not chunk_requests: + return [] + + filters_str = build_vespa_filters(filters=filters, include_hidden=True) + + yql = ( + YQL_BASE.format(index_name=index_name) + + filters_str + + build_vespa_id_based_retrieval_yql(chunk_requests[0]) + ) + chunk_requests.pop(0) + + for request in chunk_requests: + yql += " or " + build_vespa_id_based_retrieval_yql(request) + params: dict[str, str | int | float] = { + "yql": yql, + "hits": MAX_ID_SEARCH_QUERY_SIZE, + } + + inference_chunks = query_vespa(params) + if not get_large_chunks: + inference_chunks = [ + chunk for chunk in inference_chunks if not chunk.large_chunk_reference_ids + ] + inference_chunks.sort(key=lambda chunk: chunk.chunk_id) + return inference_chunks + + +def batch_search_api_retrieval( + index_name: str, + chunk_requests: list[VespaChunkRequest], + filters: IndexFilters, + get_large_chunks: bool = False, +) -> list[InferenceChunkUncleaned]: + retrieved_chunks: list[InferenceChunkUncleaned] = [] + capped_requests: list[VespaChunkRequest] = [] + uncapped_requests: list[VespaChunkRequest] = [] + chunk_count = 0 + for request in chunk_requests: + # All requests without a chunk range are uncapped + # Uncapped requests are retrieved using the Visit API + range = request.range + if range is None: + uncapped_requests.append(request) + continue + + # If adding the range to the chunk count is greater than the + # max query size, we need to perform a retrieval to avoid hitting the limit + if chunk_count + range > MAX_ID_SEARCH_QUERY_SIZE: + retrieved_chunks.extend( + _get_chunks_via_batch_search( + index_name=index_name, + chunk_requests=capped_requests, + filters=filters, + get_large_chunks=get_large_chunks, + ) + ) + capped_requests = [] + chunk_count = 0 + capped_requests.append(request) + chunk_count += range + + if capped_requests: + retrieved_chunks.extend( + _get_chunks_via_batch_search( + index_name=index_name, + chunk_requests=capped_requests, + filters=filters, + get_large_chunks=get_large_chunks, + ) + ) + + if uncapped_requests: + logger.debug(f"Retrieving {len(uncapped_requests)} uncapped requests") + retrieved_chunks.extend( + parallel_visit_api_retrieval( + index_name, uncapped_requests, filters, get_large_chunks + ) + ) + + return retrieved_chunks diff --git a/backend/danswer/document_index/vespa/deletion.py b/backend/danswer/document_index/vespa/deletion.py new file mode 100644 index 0000000000..3c8b7b97f1 --- /dev/null +++ b/backend/danswer/document_index/vespa/deletion.py @@ -0,0 +1,65 @@ +import concurrent.futures + +import httpx +from retry import retry + +from danswer.document_index.vespa.chunk_retrieval import ( + get_all_vespa_ids_for_document_id, +) +from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT +from danswer.document_index.vespa_constants import NUM_THREADS +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +CONTENT_SUMMARY = "content_summary" + + +@retry(tries=3, delay=1, backoff=2) +def _delete_vespa_doc_chunks( + document_id: str, index_name: str, http_client: httpx.Client +) -> None: + doc_chunk_ids = get_all_vespa_ids_for_document_id( + document_id=document_id, + index_name=index_name, + get_large_chunks=True, + ) + + for chunk_id in doc_chunk_ids: + try: + res = http_client.delete( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}" + ) + res.raise_for_status() + except httpx.HTTPStatusError as e: + logger.error(f"Failed to delete chunk, details: {e.response.text}") + raise + + +def delete_vespa_docs( + document_ids: list[str], + index_name: str, + http_client: httpx.Client, + executor: concurrent.futures.ThreadPoolExecutor | None = None, +) -> None: + external_executor = True + + if not executor: + external_executor = False + executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) + + try: + doc_deletion_future = { + executor.submit( + _delete_vespa_doc_chunks, doc_id, index_name, http_client + ): doc_id + for doc_id in document_ids + } + for future in concurrent.futures.as_completed(doc_deletion_future): + # Will raise exception if the deletion raised an exception + future.result() + + finally: + if not external_executor: + executor.shutdown(wait=True) diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 468e1e0c86..f380d53c9f 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -1,64 +1,58 @@ import concurrent.futures import io -import json import os -import string import time import zipfile -from collections.abc import Mapping from dataclasses import dataclass from datetime import datetime from datetime import timedelta -from datetime import timezone -from typing import Any from typing import BinaryIO -from typing import cast import httpx import requests -from retry import retry -from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION -from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST -from danswer.configs.app_configs import VESPA_HOST -from danswer.configs.app_configs import VESPA_PORT -from danswer.configs.app_configs import VESPA_TENANT_PORT from danswer.configs.chat_configs import DOC_TIME_DECAY from danswer.configs.chat_configs import NUM_RETURNED_HITS from danswer.configs.chat_configs import TITLE_CONTENT_RATIO -from danswer.configs.constants import ACCESS_CONTROL_LIST -from danswer.configs.constants import BLURB -from danswer.configs.constants import BOOST -from danswer.configs.constants import CHUNK_ID -from danswer.configs.constants import CONTENT -from danswer.configs.constants import DOC_UPDATED_AT -from danswer.configs.constants import DOCUMENT_ID -from danswer.configs.constants import DOCUMENT_SETS -from danswer.configs.constants import EMBEDDINGS -from danswer.configs.constants import HIDDEN -from danswer.configs.constants import INDEX_SEPARATOR -from danswer.configs.constants import METADATA -from danswer.configs.constants import METADATA_LIST -from danswer.configs.constants import METADATA_SUFFIX -from danswer.configs.constants import PRIMARY_OWNERS -from danswer.configs.constants import RECENCY_BIAS -from danswer.configs.constants import SECONDARY_OWNERS -from danswer.configs.constants import SECTION_CONTINUATION -from danswer.configs.constants import SEMANTIC_IDENTIFIER -from danswer.configs.constants import SKIP_TITLE_EMBEDDING -from danswer.configs.constants import SOURCE_LINKS -from danswer.configs.constants import SOURCE_TYPE -from danswer.configs.constants import TITLE -from danswer.configs.constants import TITLE_EMBEDDING -from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( - get_experts_stores_representations, -) -from danswer.document_index.document_index_utils import get_uuid_from_chunk from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentInsertionRecord from danswer.document_index.interfaces import UpdateRequest -from danswer.document_index.vespa.utils import remove_invalid_unicode_chars -from danswer.document_index.vespa.utils import replace_invalid_doc_id_characters +from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.vespa.chunk_retrieval import batch_search_api_retrieval +from danswer.document_index.vespa.chunk_retrieval import ( + get_all_vespa_ids_for_document_id, +) +from danswer.document_index.vespa.chunk_retrieval import ( + parallel_visit_api_retrieval, +) +from danswer.document_index.vespa.chunk_retrieval import query_vespa +from danswer.document_index.vespa.deletion import delete_vespa_docs +from danswer.document_index.vespa.indexing_utils import batch_index_vespa_chunks +from danswer.document_index.vespa.indexing_utils import clean_chunk_id_copy +from danswer.document_index.vespa.indexing_utils import ( + get_existing_documents_from_chunks, +) +from danswer.document_index.vespa.shared_utils.utils import ( + replace_invalid_doc_id_characters, +) +from danswer.document_index.vespa.shared_utils.vespa_request_builders import ( + build_vespa_filters, +) +from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST +from danswer.document_index.vespa_constants import BATCH_SIZE +from danswer.document_index.vespa_constants import BOOST +from danswer.document_index.vespa_constants import CONTENT_SUMMARY +from danswer.document_index.vespa_constants import DANSWER_CHUNK_REPLACEMENT_PAT +from danswer.document_index.vespa_constants import DATE_REPLACEMENT +from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT +from danswer.document_index.vespa_constants import DOCUMENT_REPLACEMENT_PAT +from danswer.document_index.vespa_constants import DOCUMENT_SETS +from danswer.document_index.vespa_constants import HIDDEN +from danswer.document_index.vespa_constants import NUM_THREADS +from danswer.document_index.vespa_constants import VESPA_APPLICATION_ENDPOINT +from danswer.document_index.vespa_constants import VESPA_DIM_REPLACEMENT_PAT +from danswer.document_index.vespa_constants import VESPA_TIMEOUT +from danswer.document_index.vespa_constants import YQL_BASE from danswer.indexing.models import DocMetadataAwareIndexChunk from danswer.search.models import IndexFilters from danswer.search.models import InferenceChunkUncleaned @@ -68,34 +62,6 @@ from shared_configs.model_server_models import Embedding logger = setup_logger() -VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM" -DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME" -DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT" -DATE_REPLACEMENT = "DATE_REPLACEMENT" - -# config server -VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}" -VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" - -# main search application -VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}" -# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd -DOCUMENT_ID_ENDPOINT = ( - f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" -) -SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" - -_BATCH_SIZE = 128 # Specific to Vespa -_NUM_THREADS = ( - 32 # since Vespa doesn't allow batching of inserts / updates, we use threads -) -# up from 500ms for now, since we've seen quite a few timeouts -# in the long term, we are looking to improve the performance of Vespa -# so that we can bring this back to default -_VESPA_TIMEOUT = "3s" -# Specific to Vespa, needed for highlighting matching keywords / section -CONTENT_SUMMARY = "content_summary" - @dataclass class _VespaUpdateRequest: @@ -104,582 +70,6 @@ class _VespaUpdateRequest: update_request: dict[str, dict] -@retry(tries=3, delay=1, backoff=2) -def _does_document_exist( - doc_chunk_id: str, - index_name: str, - http_client: httpx.Client, -) -> bool: - """Returns whether the document already exists and the users/group whitelists - Specifically in this case, document refers to a vespa document which is equivalent to a Danswer - chunk. This checks for whether the chunk exists already in the index""" - doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" - doc_fetch_response = http_client.get(doc_url) - - if doc_fetch_response.status_code == 404: - return False - - if doc_fetch_response.status_code != 200: - logger.debug(f"Failed to check for document with URL {doc_url}") - raise RuntimeError( - f"Unexpected fetch document by ID value from Vespa " - f"with error {doc_fetch_response.status_code}" - ) - return True - - -def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None: - if not t: - return None - - if t.tzinfo != timezone.utc: - raise ValueError("Connectors must provide document update time in UTC") - - return int(t.timestamp()) - - -def _get_vespa_chunks_by_document_id( - document_id: str, - index_name: str, - user_access_control_list: list[str] | None = None, - min_chunk_ind: int | None = None, - max_chunk_ind: int | None = None, - field_names: list[str] | None = None, -) -> list[dict]: - # Constructing the URL for the Visit API - # NOTE: visit API uses the same URL as the document API, but with different params - url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name) - - # build the list of fields to retrieve - field_set_list = ( - None - if not field_names - else [f"{index_name}:{field_name}" for field_name in field_names] - ) - acl_fieldset_entry = f"{index_name}:{ACCESS_CONTROL_LIST}" - if ( - field_set_list - and user_access_control_list - and acl_fieldset_entry not in field_set_list - ): - field_set_list.append(acl_fieldset_entry) - field_set = ",".join(field_set_list) if field_set_list else None - - # build filters - selection = f"{index_name}.document_id=='{document_id}'" - if min_chunk_ind is not None: - selection += f" and {index_name}.chunk_id>={min_chunk_ind}" - if max_chunk_ind is not None: - selection += f" and {index_name}.chunk_id<={max_chunk_ind}" - - # Setting up the selection criteria in the query parameters - params = { - # NOTE: Document Selector Language doesn't allow `contains`, so we can't check - # for the ACL in the selection. Instead, we have to check as a postfilter - "selection": selection, - "continuation": None, - "wantedDocumentCount": 1_000, - "fieldSet": field_set, - } - - document_chunks: list[dict] = [] - while True: - response = requests.get(url, params=params) - try: - response.raise_for_status() - except requests.HTTPError as e: - request_info = f"Headers: {response.request.headers}\nPayload: {params}" - response_info = f"Status Code: {response.status_code}\nResponse Content: {response.text}" - error_base = f"Error occurred getting chunk by Document ID {document_id}" - logger.error( - f"{error_base}:\n" - f"{request_info}\n" - f"{response_info}\n" - f"Exception: {e}" - ) - raise requests.HTTPError(error_base) from e - - # Check if the response contains any documents - response_data = response.json() - if "documents" in response_data: - for document in response_data["documents"]: - if user_access_control_list: - document_acl = document["fields"].get(ACCESS_CONTROL_LIST) - if not document_acl or not any( - user_acl_entry in document_acl - for user_acl_entry in user_access_control_list - ): - continue - document_chunks.append(document) - document_chunks.extend(response_data["documents"]) - - # Check for continuation token to handle pagination - if "continuation" in response_data and response_data["continuation"]: - params["continuation"] = response_data["continuation"] - else: - break # Exit loop if no continuation token - - return document_chunks - - -def _get_vespa_chunk_ids_by_document_id( - document_id: str, index_name: str, user_access_control_list: list[str] | None = None -) -> list[str]: - document_chunks = _get_vespa_chunks_by_document_id( - document_id=document_id, - index_name=index_name, - user_access_control_list=user_access_control_list, - field_names=[DOCUMENT_ID], - ) - return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks] - - -@retry(tries=3, delay=1, backoff=2) -def _delete_vespa_doc_chunks( - document_id: str, index_name: str, http_client: httpx.Client -) -> None: - doc_chunk_ids = _get_vespa_chunk_ids_by_document_id( - document_id=document_id, index_name=index_name - ) - - for chunk_id in doc_chunk_ids: - try: - res = http_client.delete( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}" - ) - res.raise_for_status() - except httpx.HTTPStatusError as e: - logger.error(f"Failed to delete chunk, details: {e.response.text}") - raise - - -def _delete_vespa_docs( - document_ids: list[str], - index_name: str, - http_client: httpx.Client, - executor: concurrent.futures.ThreadPoolExecutor | None = None, -) -> None: - external_executor = True - - if not executor: - external_executor = False - executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) - - try: - doc_deletion_future = { - executor.submit( - _delete_vespa_doc_chunks, doc_id, index_name, http_client - ): doc_id - for doc_id in document_ids - } - for future in concurrent.futures.as_completed(doc_deletion_future): - # Will raise exception if the deletion raised an exception - future.result() - - finally: - if not external_executor: - executor.shutdown(wait=True) - - -def _get_existing_documents_from_chunks( - chunks: list[DocMetadataAwareIndexChunk], - index_name: str, - http_client: httpx.Client, - executor: concurrent.futures.ThreadPoolExecutor | None = None, -) -> set[str]: - external_executor = True - - if not executor: - external_executor = False - executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) - - document_ids: set[str] = set() - try: - chunk_existence_future = { - executor.submit( - _does_document_exist, - str(get_uuid_from_chunk(chunk)), - index_name, - http_client, - ): chunk - for chunk in chunks - } - for future in concurrent.futures.as_completed(chunk_existence_future): - chunk = chunk_existence_future[future] - chunk_already_existed = future.result() - if chunk_already_existed: - document_ids.add(chunk.source_document.id) - - finally: - if not external_executor: - executor.shutdown(wait=True) - - return document_ids - - -@retry(tries=3, delay=1, backoff=2) -def _index_vespa_chunk( - chunk: DocMetadataAwareIndexChunk, index_name: str, http_client: httpx.Client -) -> None: - json_header = { - "Content-Type": "application/json", - } - document = chunk.source_document - - # No minichunk documents in vespa, minichunk vectors are stored in the chunk itself - vespa_chunk_id = str(get_uuid_from_chunk(chunk)) - embeddings = chunk.embeddings - - embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding} - - if embeddings.mini_chunk_embeddings: - for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings): - embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed - - title = document.get_title_for_document_index() - - vespa_document_fields = { - DOCUMENT_ID: document.id, - CHUNK_ID: chunk.chunk_id, - BLURB: remove_invalid_unicode_chars(chunk.blurb), - TITLE: remove_invalid_unicode_chars(title) if title else None, - SKIP_TITLE_EMBEDDING: not title, - # For the BM25 index, the keyword suffix is used, the vector is already generated with the more - # natural language representation of the metadata section - CONTENT: remove_invalid_unicode_chars( - f"{chunk.title_prefix}{chunk.content}{chunk.metadata_suffix_keyword}" - ), - # This duplication of `content` is needed for keyword highlighting - # Note that it's not exactly the same as the actual content - # which contains the title prefix and metadata suffix - CONTENT_SUMMARY: remove_invalid_unicode_chars(chunk.content), - SOURCE_TYPE: str(document.source.value), - SOURCE_LINKS: json.dumps(chunk.source_links), - SEMANTIC_IDENTIFIER: remove_invalid_unicode_chars(document.semantic_identifier), - SECTION_CONTINUATION: chunk.section_continuation, - METADATA: json.dumps(document.metadata), - # Save as a list for efficient extraction as an Attribute - METADATA_LIST: chunk.source_document.get_metadata_str_attributes(), - METADATA_SUFFIX: chunk.metadata_suffix_keyword, - EMBEDDINGS: embeddings_name_vector_map, - TITLE_EMBEDDING: chunk.title_embedding, - BOOST: chunk.boost, - DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), - PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners), - SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners), - # the only `set` vespa has is `weightedset`, so we have to give each - # element an arbitrary weight - ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()}, - DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets}, - } - - vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}" - logger.debug(f'Indexing to URL "{vespa_url}"') - res = http_client.post( - vespa_url, headers=json_header, json={"fields": vespa_document_fields} - ) - try: - res.raise_for_status() - except Exception as e: - logger.exception( - f"Failed to index document: '{document.id}'. Got response: '{res.text}'" - ) - raise e - - -def _batch_index_vespa_chunks( - chunks: list[DocMetadataAwareIndexChunk], - index_name: str, - http_client: httpx.Client, - executor: concurrent.futures.ThreadPoolExecutor | None = None, -) -> None: - external_executor = True - - if not executor: - external_executor = False - executor = concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) - - try: - chunk_index_future = { - executor.submit(_index_vespa_chunk, chunk, index_name, http_client): chunk - for chunk in chunks - } - for future in concurrent.futures.as_completed(chunk_index_future): - # Will raise exception if any indexing raised an exception - future.result() - - finally: - if not external_executor: - executor.shutdown(wait=True) - - -def _clear_and_index_vespa_chunks( - chunks: list[DocMetadataAwareIndexChunk], - index_name: str, -) -> set[DocumentInsertionRecord]: - """Receive a list of chunks from a batch of documents and index the chunks into Vespa along - with updating the associated permissions. Assumes that a document will not be split into - multiple chunk batches calling this function multiple times, otherwise only the last set of - chunks will be kept""" - existing_docs: set[str] = set() - - # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for - # indexing / updates / deletes since we have to make a large volume of requests. - with ( - concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor, - httpx.Client(http2=True) as http_client, - ): - # Check for existing documents, existing documents need to have all of their chunks deleted - # prior to indexing as the document size (num chunks) may have shrunk - first_chunks = [chunk for chunk in chunks if chunk.chunk_id == 0] - for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE): - existing_docs.update( - _get_existing_documents_from_chunks( - chunks=chunk_batch, - index_name=index_name, - http_client=http_client, - executor=executor, - ) - ) - - for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE): - _delete_vespa_docs( - document_ids=doc_id_batch, - index_name=index_name, - http_client=http_client, - executor=executor, - ) - - for chunk_batch in batch_generator(chunks, _BATCH_SIZE): - _batch_index_vespa_chunks( - chunks=chunk_batch, - index_name=index_name, - http_client=http_client, - executor=executor, - ) - - all_doc_ids = {chunk.source_document.id for chunk in chunks} - - return { - DocumentInsertionRecord( - document_id=doc_id, - already_existed=doc_id in existing_docs, - ) - for doc_id in all_doc_ids - } - - -def _build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str: - def _build_or_filters(key: str, vals: list[str] | None) -> str: - if vals is None: - return "" - - valid_vals = [val for val in vals if val] - if not key or not valid_vals: - return "" - - eq_elems = [f'{key} contains "{elem}"' for elem in valid_vals] - or_clause = " or ".join(eq_elems) - return f"({or_clause}) and " - - def _build_time_filter( - cutoff: datetime | None, - # Slightly over 3 Months, approximately 1 fiscal quarter - untimed_doc_cutoff: timedelta = timedelta(days=92), - ) -> str: - if not cutoff: - return "" - - # For Documents that don't have an updated at, filter them out for queries asking for - # very recent documents (3 months) default. Documents that don't have an updated at - # time are assigned 3 months for time decay value - include_untimed = datetime.now(timezone.utc) - untimed_doc_cutoff > cutoff - cutoff_secs = int(cutoff.timestamp()) - - if include_untimed: - # Documents without updated_at are assigned -1 as their date - return f"!({DOC_UPDATED_AT} < {cutoff_secs}) and " - - return f"({DOC_UPDATED_AT} >= {cutoff_secs}) and " - - filter_str = f"!({HIDDEN}=true) and " if not include_hidden else "" - - # CAREFUL touching this one, currently there is no second ACL double-check post retrieval - if filters.access_control_list is not None: - filter_str += _build_or_filters( - ACCESS_CONTROL_LIST, filters.access_control_list - ) - - source_strs = ( - [s.value for s in filters.source_type] if filters.source_type else None - ) - filter_str += _build_or_filters(SOURCE_TYPE, source_strs) - - tag_attributes = None - tags = filters.tags - if tags: - tag_attributes = [tag.tag_key + INDEX_SEPARATOR + tag.tag_value for tag in tags] - filter_str += _build_or_filters(METADATA_LIST, tag_attributes) - - filter_str += _build_or_filters(DOCUMENT_SETS, filters.document_set) - - filter_str += _build_time_filter(filters.time_cutoff) - - return filter_str - - -def _process_dynamic_summary( - dynamic_summary: str, max_summary_length: int = 400 -) -> list[str]: - if not dynamic_summary: - return [] - - current_length = 0 - processed_summary: list[str] = [] - for summary_section in dynamic_summary.split(""): - # if we're past the desired max length, break at the last word - if current_length + len(summary_section) >= max_summary_length: - summary_section = summary_section[: max_summary_length - current_length] - summary_section = summary_section.lstrip() # remove any leading whitespace - - # handle the case where the truncated section is either just a - # single (partial) word or if it's empty - first_space = summary_section.find(" ") - if first_space == -1: - # add ``...`` to previous section - if processed_summary: - processed_summary[-1] += "..." - break - - # handle the valid truncated section case - summary_section = summary_section.rsplit(" ", 1)[0] - if summary_section[-1] in string.punctuation: - summary_section = summary_section[:-1] - summary_section += "..." - processed_summary.append(summary_section) - break - - processed_summary.append(summary_section) - current_length += len(summary_section) - - return processed_summary - - -def _vespa_hit_to_inference_chunk( - hit: dict[str, Any], null_score: bool = False -) -> InferenceChunkUncleaned: - fields = cast(dict[str, Any], hit["fields"]) - - # parse fields that are stored as strings, but are really json / datetime - metadata = json.loads(fields[METADATA]) if METADATA in fields else {} - updated_at = ( - datetime.fromtimestamp(fields[DOC_UPDATED_AT], tz=timezone.utc) - if DOC_UPDATED_AT in fields - else None - ) - - # The highlights might include the title but this is the best way we have so far to show the highlighting - match_highlights = _process_dynamic_summary( - # fallback to regular `content` if the `content_summary` field - # isn't present - dynamic_summary=hit["fields"].get(CONTENT_SUMMARY, hit["fields"][CONTENT]), - ) - semantic_identifier = fields.get(SEMANTIC_IDENTIFIER, "") - if not semantic_identifier: - logger.error( - f"Chunk with blurb: {fields.get(BLURB, 'Unknown')[:50]}... has no Semantic Identifier" - ) - - source_links = fields.get(SOURCE_LINKS, {}) - source_links_dict_unprocessed = ( - json.loads(source_links) if isinstance(source_links, str) else source_links - ) - source_links_dict = { - int(k): v - for k, v in cast(dict[str, str], source_links_dict_unprocessed).items() - } - - return InferenceChunkUncleaned( - chunk_id=fields[CHUNK_ID], - blurb=fields.get(BLURB, ""), # Unused - content=fields[CONTENT], # Includes extra title prefix and metadata suffix - source_links=source_links_dict or {0: ""}, - section_continuation=fields[SECTION_CONTINUATION], - document_id=fields[DOCUMENT_ID], - source_type=fields[SOURCE_TYPE], - title=fields.get(TITLE), - semantic_identifier=fields[SEMANTIC_IDENTIFIER], - boost=fields.get(BOOST, 1), - recency_bias=fields.get("matchfeatures", {}).get(RECENCY_BIAS, 1.0), - score=None if null_score else hit.get("relevance", 0), - hidden=fields.get(HIDDEN, False), - primary_owners=fields.get(PRIMARY_OWNERS), - secondary_owners=fields.get(SECONDARY_OWNERS), - metadata=metadata, - metadata_suffix=fields.get(METADATA_SUFFIX), - match_highlights=match_highlights, - updated_at=updated_at, - ) - - -@retry(tries=3, delay=1, backoff=2) -def _query_vespa( - query_params: Mapping[str, str | int | float] -) -> list[InferenceChunkUncleaned]: - if "query" in query_params and not cast(str, query_params["query"]).strip(): - raise ValueError("No/empty query received") - - params = dict( - **query_params, - **{ - "presentation.timing": True, - } - if LOG_VESPA_TIMING_INFORMATION - else {}, - ) - - response = requests.post( - SEARCH_ENDPOINT, - json=params, - ) - try: - response.raise_for_status() - except requests.HTTPError as e: - request_info = f"Headers: {response.request.headers}\nPayload: {params}" - response_info = ( - f"Status Code: {response.status_code}\n" - f"Response Content: {response.text}" - ) - error_base = "Failed to query Vespa" - logger.error( - f"{error_base}:\n" - f"{request_info}\n" - f"{response_info}\n" - f"Exception: {e}" - ) - raise requests.HTTPError(error_base) from e - - response_json: dict[str, Any] = response.json() - if LOG_VESPA_TIMING_INFORMATION: - logger.info("Vespa timing info: %s", response_json.get("timing")) - hits = response_json["root"].get("children", []) - - for hit in hits: - if hit["fields"].get(CONTENT) is None: - identifier = hit["fields"].get("documentid") or hit["id"] - logger.error( - f"Vespa Index with Vespa ID {identifier} has no contents. " - f"This is invalid because the vector is not meaningful and keywordsearch cannot " - f"fetch this document" - ) - - filtered_hits = [hit for hit in hits if hit["fields"].get(CONTENT) is not None] - - inference_chunks = [_vespa_hit_to_inference_chunk(hit) for hit in filtered_hits] - # Good Debugging Spot - return inference_chunks - - def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO: zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: @@ -698,45 +88,7 @@ def _create_document_xml_lines(doc_names: list[str | None]) -> str: return "\n".join(doc_lines) -def _clean_chunk_id_copy( - chunk: DocMetadataAwareIndexChunk, -) -> DocMetadataAwareIndexChunk: - clean_chunk = chunk.copy( - update={ - "source_document": chunk.source_document.copy( - update={ - "id": replace_invalid_doc_id_characters(chunk.source_document.id) - } - ) - } - ) - return clean_chunk - - class VespaIndex(DocumentIndex): - yql_base = ( - f"select " - f"documentid, " - f"{DOCUMENT_ID}, " - f"{CHUNK_ID}, " - f"{BLURB}, " - f"{CONTENT}, " - f"{SOURCE_TYPE}, " - f"{SOURCE_LINKS}, " - f"{SEMANTIC_IDENTIFIER}, " - f"{TITLE}, " - f"{SECTION_CONTINUATION}, " - f"{BOOST}, " - f"{HIDDEN}, " - f"{DOC_UPDATED_AT}, " - f"{PRIMARY_OWNERS}, " - f"{SECONDARY_OWNERS}, " - f"{METADATA}, " - f"{METADATA_SUFFIX}, " - f"{CONTENT_SUMMARY} " - f"from {{index_name}} where " - ) - def __init__(self, index_name: str, secondary_index_name: str | None) -> None: self.index_name = index_name self.secondary_index_name = secondary_index_name @@ -807,16 +159,64 @@ class VespaIndex(DocumentIndex): self, chunks: list[DocMetadataAwareIndexChunk], ) -> set[DocumentInsertionRecord]: + """Receive a list of chunks from a batch of documents and index the chunks into Vespa along + with updating the associated permissions. Assumes that a document will not be split into + multiple chunk batches calling this function multiple times, otherwise only the last set of + chunks will be kept""" # IMPORTANT: This must be done one index at a time, do not use secondary index here - cleaned_chunks = [_clean_chunk_id_copy(chunk) for chunk in chunks] - return _clear_and_index_vespa_chunks( - chunks=cleaned_chunks, index_name=self.index_name - ) + cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] + + existing_docs: set[str] = set() + + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for + # indexing / updates / deletes since we have to make a large volume of requests. + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, + httpx.Client(http2=True) as http_client, + ): + # Check for existing documents, existing documents need to have all of their chunks deleted + # prior to indexing as the document size (num chunks) may have shrunk + first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] + for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + existing_docs.update( + get_existing_documents_from_chunks( + chunks=chunk_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + ) + + for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): + delete_vespa_docs( + document_ids=doc_id_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + + for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): + batch_index_vespa_chunks( + chunks=chunk_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + + all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks} + + return { + DocumentInsertionRecord( + document_id=doc_id, + already_existed=doc_id in existing_docs, + ) + for doc_id in all_doc_ids + } @staticmethod def _apply_updates_batched( updates: list[_VespaUpdateRequest], - batch_size: int = _BATCH_SIZE, + batch_size: int = BATCH_SIZE, ) -> None: """Runs a batch of updates in parallel via the ThreadPoolExecutor.""" @@ -835,7 +235,7 @@ class VespaIndex(DocumentIndex): # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for # indexing / updates / deletes since we have to make a large volume of requests. with ( - concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor, + concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, httpx.Client(http2=True) as http_client, ): for update_batch in batch_generator(updates, batch_size): @@ -877,14 +277,14 @@ class VespaIndex(DocumentIndex): index_names.append(self.secondary_index_name) chunk_id_start_time = time.monotonic() - with concurrent.futures.ThreadPoolExecutor( - max_workers=_NUM_THREADS - ) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: future_to_doc_chunk_ids = { executor.submit( - _get_vespa_chunk_ids_by_document_id, + get_all_vespa_ids_for_document_id, document_id=document_id, index_name=index_name, + filters=None, + get_large_chunks=True, ): (document_id, index_name) for index_name in index_names for update_request in update_requests @@ -958,37 +358,31 @@ class VespaIndex(DocumentIndex): index_names.append(self.secondary_index_name) for index_name in index_names: - _delete_vespa_docs( + delete_vespa_docs( document_ids=doc_ids, index_name=index_name, http_client=http_client ) def id_based_retrieval( self, - document_id: str, - min_chunk_ind: int | None, - max_chunk_ind: int | None, - user_access_control_list: list[str] | None = None, + chunk_requests: list[VespaChunkRequest], + filters: IndexFilters, + batch_retrieval: bool = False, + get_large_chunks: bool = False, ) -> list[InferenceChunkUncleaned]: - document_id = replace_invalid_doc_id_characters(document_id) - - vespa_chunks = _get_vespa_chunks_by_document_id( - document_id=document_id, + if batch_retrieval: + return batch_search_api_retrieval( + index_name=self.index_name, + chunk_requests=chunk_requests, + filters=filters, + get_large_chunks=get_large_chunks, + ) + return parallel_visit_api_retrieval( index_name=self.index_name, - user_access_control_list=user_access_control_list, - min_chunk_ind=min_chunk_ind, - max_chunk_ind=max_chunk_ind, + chunk_requests=chunk_requests, + filters=filters, + get_large_chunks=get_large_chunks, ) - if not vespa_chunks: - return [] - - inference_chunks = [ - _vespa_hit_to_inference_chunk(chunk, null_score=True) - for chunk in vespa_chunks - ] - inference_chunks.sort(key=lambda chunk: chunk.chunk_id) - return inference_chunks - def hybrid_retrieval( self, query: str, @@ -1001,11 +395,11 @@ class VespaIndex(DocumentIndex): offset: int = 0, title_content_ratio: float | None = TITLE_CONTENT_RATIO, ) -> list[InferenceChunkUncleaned]: - vespa_where_clauses = _build_vespa_filters(filters) + vespa_where_clauses = build_vespa_filters(filters) # Needs to be at least as much as the value set in Vespa schema config target_hits = max(10 * num_to_retrieve, 1000) yql = ( - VespaIndex.yql_base.format(index_name=self.index_name) + YQL_BASE.format(index_name=self.index_name) + vespa_where_clauses + f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) " + f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) " @@ -1027,10 +421,10 @@ class VespaIndex(DocumentIndex): "hits": num_to_retrieve, "offset": offset, "ranking.profile": f"hybrid_search{len(query_embedding)}", - "timeout": _VESPA_TIMEOUT, + "timeout": VESPA_TIMEOUT, } - return _query_vespa(params) + return query_vespa(params) def admin_retrieval( self, @@ -1039,9 +433,9 @@ class VespaIndex(DocumentIndex): num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, ) -> list[InferenceChunkUncleaned]: - vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True) + vespa_where_clauses = build_vespa_filters(filters, include_hidden=True) yql = ( - VespaIndex.yql_base.format(index_name=self.index_name) + YQL_BASE.format(index_name=self.index_name) + vespa_where_clauses + '({grammar: "weakAnd"}userInput(@query) ' # `({defaultIndex: "content_summary"}userInput(@query))` section is @@ -1056,7 +450,7 @@ class VespaIndex(DocumentIndex): "hits": num_to_retrieve, "offset": 0, "ranking.profile": "admin_search", - "timeout": _VESPA_TIMEOUT, + "timeout": VESPA_TIMEOUT, } - return _query_vespa(params) + return query_vespa(params) diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py new file mode 100644 index 0000000000..1b16cfc494 --- /dev/null +++ b/backend/danswer/document_index/vespa/indexing_utils.py @@ -0,0 +1,227 @@ +import concurrent.futures +import json +from datetime import datetime +from datetime import timezone + +import httpx +from retry import retry + +from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( + get_experts_stores_representations, +) +from danswer.document_index.document_index_utils import get_uuid_from_chunk +from danswer.document_index.vespa.shared_utils.utils import remove_invalid_unicode_chars +from danswer.document_index.vespa.shared_utils.utils import ( + replace_invalid_doc_id_characters, +) +from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST +from danswer.document_index.vespa_constants import BLURB +from danswer.document_index.vespa_constants import BOOST +from danswer.document_index.vespa_constants import CHUNK_ID +from danswer.document_index.vespa_constants import CONTENT +from danswer.document_index.vespa_constants import CONTENT_SUMMARY +from danswer.document_index.vespa_constants import DOC_UPDATED_AT +from danswer.document_index.vespa_constants import DOCUMENT_ID +from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT +from danswer.document_index.vespa_constants import DOCUMENT_SETS +from danswer.document_index.vespa_constants import EMBEDDINGS +from danswer.document_index.vespa_constants import LARGE_CHUNK_REFERENCE_IDS +from danswer.document_index.vespa_constants import METADATA +from danswer.document_index.vespa_constants import METADATA_LIST +from danswer.document_index.vespa_constants import METADATA_SUFFIX +from danswer.document_index.vespa_constants import NUM_THREADS +from danswer.document_index.vespa_constants import PRIMARY_OWNERS +from danswer.document_index.vespa_constants import SECONDARY_OWNERS +from danswer.document_index.vespa_constants import SECTION_CONTINUATION +from danswer.document_index.vespa_constants import SEMANTIC_IDENTIFIER +from danswer.document_index.vespa_constants import SKIP_TITLE_EMBEDDING +from danswer.document_index.vespa_constants import SOURCE_LINKS +from danswer.document_index.vespa_constants import SOURCE_TYPE +from danswer.document_index.vespa_constants import TITLE +from danswer.document_index.vespa_constants import TITLE_EMBEDDING +from danswer.indexing.models import DocMetadataAwareIndexChunk +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +@retry(tries=3, delay=1, backoff=2) +def _does_document_exist( + doc_chunk_id: str, + index_name: str, + http_client: httpx.Client, +) -> bool: + """Returns whether the document already exists and the users/group whitelists + Specifically in this case, document refers to a vespa document which is equivalent to a Danswer + chunk. This checks for whether the chunk exists already in the index""" + doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" + doc_fetch_response = http_client.get(doc_url) + + if doc_fetch_response.status_code == 404: + return False + + if doc_fetch_response.status_code != 200: + logger.debug(f"Failed to check for document with URL {doc_url}") + raise RuntimeError( + f"Unexpected fetch document by ID value from Vespa " + f"with error {doc_fetch_response.status_code}" + ) + return True + + +def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None: + if not t: + return None + + if t.tzinfo != timezone.utc: + raise ValueError("Connectors must provide document update time in UTC") + + return int(t.timestamp()) + + +def get_existing_documents_from_chunks( + chunks: list[DocMetadataAwareIndexChunk], + index_name: str, + http_client: httpx.Client, + executor: concurrent.futures.ThreadPoolExecutor | None = None, +) -> set[str]: + external_executor = True + + if not executor: + external_executor = False + executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) + + document_ids: set[str] = set() + try: + chunk_existence_future = { + executor.submit( + _does_document_exist, + str(get_uuid_from_chunk(chunk)), + index_name, + http_client, + ): chunk + for chunk in chunks + } + for future in concurrent.futures.as_completed(chunk_existence_future): + chunk = chunk_existence_future[future] + chunk_already_existed = future.result() + if chunk_already_existed: + document_ids.add(chunk.source_document.id) + + finally: + if not external_executor: + executor.shutdown(wait=True) + + return document_ids + + +@retry(tries=3, delay=1, backoff=2) +def _index_vespa_chunk( + chunk: DocMetadataAwareIndexChunk, index_name: str, http_client: httpx.Client +) -> None: + json_header = { + "Content-Type": "application/json", + } + document = chunk.source_document + + # No minichunk documents in vespa, minichunk vectors are stored in the chunk itself + vespa_chunk_id = str(get_uuid_from_chunk(chunk)) + embeddings = chunk.embeddings + + embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding} + + if embeddings.mini_chunk_embeddings: + for ind, m_c_embed in enumerate(embeddings.mini_chunk_embeddings): + embeddings_name_vector_map[f"mini_chunk_{ind}"] = m_c_embed + + title = document.get_title_for_document_index() + + vespa_document_fields = { + DOCUMENT_ID: document.id, + CHUNK_ID: chunk.chunk_id, + BLURB: remove_invalid_unicode_chars(chunk.blurb), + TITLE: remove_invalid_unicode_chars(title) if title else None, + SKIP_TITLE_EMBEDDING: not title, + # For the BM25 index, the keyword suffix is used, the vector is already generated with the more + # natural language representation of the metadata section + CONTENT: remove_invalid_unicode_chars( + f"{chunk.title_prefix}{chunk.content}{chunk.metadata_suffix_keyword}" + ), + # This duplication of `content` is needed for keyword highlighting + # Note that it's not exactly the same as the actual content + # which contains the title prefix and metadata suffix + CONTENT_SUMMARY: remove_invalid_unicode_chars(chunk.content), + SOURCE_TYPE: str(document.source.value), + SOURCE_LINKS: json.dumps(chunk.source_links), + SEMANTIC_IDENTIFIER: remove_invalid_unicode_chars(document.semantic_identifier), + SECTION_CONTINUATION: chunk.section_continuation, + LARGE_CHUNK_REFERENCE_IDS: chunk.large_chunk_reference_ids, + METADATA: json.dumps(document.metadata), + # Save as a list for efficient extraction as an Attribute + METADATA_LIST: chunk.source_document.get_metadata_str_attributes(), + METADATA_SUFFIX: chunk.metadata_suffix_keyword, + EMBEDDINGS: embeddings_name_vector_map, + TITLE_EMBEDDING: chunk.title_embedding, + BOOST: chunk.boost, + DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), + PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners), + SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners), + # the only `set` vespa has is `weightedset`, so we have to give each + # element an arbitrary weight + ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()}, + DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets}, + } + + vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}" + logger.debug(f'Indexing to URL "{vespa_url}"') + res = http_client.post( + vespa_url, headers=json_header, json={"fields": vespa_document_fields} + ) + try: + res.raise_for_status() + except Exception as e: + logger.exception( + f"Failed to index document: '{document.id}'. Got response: '{res.text}'" + ) + raise e + + +def batch_index_vespa_chunks( + chunks: list[DocMetadataAwareIndexChunk], + index_name: str, + http_client: httpx.Client, + executor: concurrent.futures.ThreadPoolExecutor | None = None, +) -> None: + external_executor = True + + if not executor: + external_executor = False + executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) + + try: + chunk_index_future = { + executor.submit(_index_vespa_chunk, chunk, index_name, http_client): chunk + for chunk in chunks + } + for future in concurrent.futures.as_completed(chunk_index_future): + # Will raise exception if any indexing raised an exception + future.result() + + finally: + if not external_executor: + executor.shutdown(wait=True) + + +def clean_chunk_id_copy( + chunk: DocMetadataAwareIndexChunk, +) -> DocMetadataAwareIndexChunk: + clean_chunk = chunk.copy( + update={ + "source_document": chunk.source_document.copy( + update={ + "id": replace_invalid_doc_id_characters(chunk.source_document.id) + } + ) + } + ) + return clean_chunk diff --git a/backend/danswer/document_index/vespa/utils.py b/backend/danswer/document_index/vespa/shared_utils/utils.py similarity index 100% rename from backend/danswer/document_index/vespa/utils.py rename to backend/danswer/document_index/vespa/shared_utils/utils.py diff --git a/backend/danswer/document_index/vespa/shared_utils/vespa_request_builders.py b/backend/danswer/document_index/vespa/shared_utils/vespa_request_builders.py new file mode 100644 index 0000000000..65752aa09c --- /dev/null +++ b/backend/danswer/document_index/vespa/shared_utils/vespa_request_builders.py @@ -0,0 +1,96 @@ +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +from danswer.configs.constants import INDEX_SEPARATOR +from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST +from danswer.document_index.vespa_constants import CHUNK_ID +from danswer.document_index.vespa_constants import DOC_UPDATED_AT +from danswer.document_index.vespa_constants import DOCUMENT_ID +from danswer.document_index.vespa_constants import DOCUMENT_SETS +from danswer.document_index.vespa_constants import HIDDEN +from danswer.document_index.vespa_constants import METADATA_LIST +from danswer.document_index.vespa_constants import SOURCE_TYPE +from danswer.search.models import IndexFilters +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +def build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str: + def _build_or_filters(key: str, vals: list[str] | None) -> str: + if vals is None: + return "" + + valid_vals = [val for val in vals if val] + if not key or not valid_vals: + return "" + + eq_elems = [f'{key} contains "{elem}"' for elem in valid_vals] + or_clause = " or ".join(eq_elems) + return f"({or_clause}) and " + + def _build_time_filter( + cutoff: datetime | None, + # Slightly over 3 Months, approximately 1 fiscal quarter + untimed_doc_cutoff: timedelta = timedelta(days=92), + ) -> str: + if not cutoff: + return "" + + # For Documents that don't have an updated at, filter them out for queries asking for + # very recent documents (3 months) default. Documents that don't have an updated at + # time are assigned 3 months for time decay value + include_untimed = datetime.now(timezone.utc) - untimed_doc_cutoff > cutoff + cutoff_secs = int(cutoff.timestamp()) + + if include_untimed: + # Documents without updated_at are assigned -1 as their date + return f"!({DOC_UPDATED_AT} < {cutoff_secs}) and " + + return f"({DOC_UPDATED_AT} >= {cutoff_secs}) and " + + filter_str = f"!({HIDDEN}=true) and " if not include_hidden else "" + + # CAREFUL touching this one, currently there is no second ACL double-check post retrieval + if filters.access_control_list is not None: + filter_str += _build_or_filters( + ACCESS_CONTROL_LIST, filters.access_control_list + ) + + source_strs = ( + [s.value for s in filters.source_type] if filters.source_type else None + ) + filter_str += _build_or_filters(SOURCE_TYPE, source_strs) + + tag_attributes = None + tags = filters.tags + if tags: + tag_attributes = [tag.tag_key + INDEX_SEPARATOR + tag.tag_value for tag in tags] + filter_str += _build_or_filters(METADATA_LIST, tag_attributes) + + filter_str += _build_or_filters(DOCUMENT_SETS, filters.document_set) + + filter_str += _build_time_filter(filters.time_cutoff) + + return filter_str + + +def build_vespa_id_based_retrieval_yql( + chunk_request: VespaChunkRequest, +) -> str: + id_based_retrieval_yql_section = ( + f'({DOCUMENT_ID} contains "{chunk_request.document_id}"' + ) + + if chunk_request.is_capped: + id_based_retrieval_yql_section += ( + f" and {CHUNK_ID} >= {chunk_request.min_chunk_ind or 0}" + ) + id_based_retrieval_yql_section += ( + f" and {CHUNK_ID} <= {chunk_request.max_chunk_ind}" + ) + + id_based_retrieval_yql_section += ")" + return id_based_retrieval_yql_section diff --git a/backend/danswer/document_index/vespa_constants.py b/backend/danswer/document_index/vespa_constants.py new file mode 100644 index 0000000000..0b8949b426 --- /dev/null +++ b/backend/danswer/document_index/vespa_constants.py @@ -0,0 +1,85 @@ +from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST +from danswer.configs.app_configs import VESPA_HOST +from danswer.configs.app_configs import VESPA_PORT +from danswer.configs.app_configs import VESPA_TENANT_PORT +from danswer.configs.constants import SOURCE_TYPE + +VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM" +DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME" +DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT" +DATE_REPLACEMENT = "DATE_REPLACEMENT" + +# config server +VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}" +VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" + +# main search application +VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}" +# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd +DOCUMENT_ID_ENDPOINT = ( + f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" +) +SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" + +NUM_THREADS = ( + 32 # since Vespa doesn't allow batching of inserts / updates, we use threads +) +MAX_ID_SEARCH_QUERY_SIZE = 400 +# up from 500ms for now, since we've seen quite a few timeouts +# in the long term, we are looking to improve the performance of Vespa +# so that we can bring this back to default +VESPA_TIMEOUT = "3s" +BATCH_SIZE = 128 # Specific to Vespa + + +DOCUMENT_ID = "document_id" +CHUNK_ID = "chunk_id" +BLURB = "blurb" +CONTENT = "content" +SOURCE_LINKS = "source_links" +SEMANTIC_IDENTIFIER = "semantic_identifier" +TITLE = "title" +SKIP_TITLE_EMBEDDING = "skip_title" +SECTION_CONTINUATION = "section_continuation" +EMBEDDINGS = "embeddings" +TITLE_EMBEDDING = "title_embedding" +ACCESS_CONTROL_LIST = "access_control_list" +DOCUMENT_SETS = "document_sets" +LARGE_CHUNK_REFERENCE_IDS = "large_chunk_reference_ids" +METADATA = "metadata" +METADATA_LIST = "metadata_list" +METADATA_SUFFIX = "metadata_suffix" +BOOST = "boost" +DOC_UPDATED_AT = "doc_updated_at" # Indexed as seconds since epoch +PRIMARY_OWNERS = "primary_owners" +SECONDARY_OWNERS = "secondary_owners" +RECENCY_BIAS = "recency_bias" +HIDDEN = "hidden" + +# Specific to Vespa, needed for highlighting matching keywords / section +CONTENT_SUMMARY = "content_summary" + + +YQL_BASE = ( + f"select " + f"documentid, " + f"{DOCUMENT_ID}, " + f"{CHUNK_ID}, " + f"{BLURB}, " + f"{CONTENT}, " + f"{SOURCE_TYPE}, " + f"{SOURCE_LINKS}, " + f"{SEMANTIC_IDENTIFIER}, " + f"{TITLE}, " + f"{SECTION_CONTINUATION}, " + f"{BOOST}, " + f"{HIDDEN}, " + f"{DOC_UPDATED_AT}, " + f"{PRIMARY_OWNERS}, " + f"{SECONDARY_OWNERS}, " + f"{LARGE_CHUNK_REFERENCE_IDS}, " + f"{METADATA}, " + f"{METADATA_SUFFIX}, " + f"{CONTENT_SUMMARY} " + f"from {{index_name}} where " +) diff --git a/backend/danswer/indexing/chunker.py b/backend/danswer/indexing/chunker.py index d52e9f5577..03a03f30f4 100644 --- a/backend/danswer/indexing/chunker.py +++ b/backend/danswer/indexing/chunker.py @@ -1,9 +1,5 @@ -import abc -from collections.abc import Callable -from typing import Optional -from typing import TYPE_CHECKING - from danswer.configs.app_configs import BLURB_SIZE +from danswer.configs.app_configs import LARGE_CHUNK_RATIO from danswer.configs.app_configs import MINI_CHUNK_SIZE from danswer.configs.app_configs import SKIP_METADATA_IN_CHUNK from danswer.configs.constants import DocumentSource @@ -15,13 +11,9 @@ from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( ) from danswer.connectors.models import Document from danswer.indexing.models import DocAwareChunk -from danswer.natural_language_processing.utils import get_tokenizer +from danswer.natural_language_processing.utils import BaseTokenizer from danswer.utils.logger import setup_logger from danswer.utils.text_processing import shared_precompare_cleanup -from shared_configs.enums import EmbeddingProvider - -if TYPE_CHECKING: - from llama_index.text_splitter import SentenceSplitter # type:ignore # Not supporting overlaps, we need a clean combination of chunks and it is unclear if overlaps @@ -36,49 +28,6 @@ CHUNK_MIN_CONTENT = 256 logger = setup_logger() -ChunkFunc = Callable[[Document], list[DocAwareChunk]] - - -def extract_blurb(text: str, blurb_splitter: "SentenceSplitter") -> str: - texts = blurb_splitter.split_text(text) - if not texts: - return "" - return texts[0] - - -def chunk_large_section( - section_text: str, - section_link_text: str, - document: Document, - start_chunk_id: int, - blurb: str, - chunk_splitter: "SentenceSplitter", - mini_chunk_splitter: Optional["SentenceSplitter"], - title_prefix: str, - metadata_suffix_semantic: str, - metadata_suffix_keyword: str, -) -> list[DocAwareChunk]: - split_texts = chunk_splitter.split_text(section_text) - - chunks = [ - DocAwareChunk( - source_document=document, - chunk_id=start_chunk_id + chunk_ind, - blurb=blurb, - content=chunk_text, - source_links={0: section_link_text}, - section_continuation=(chunk_ind != 0), - title_prefix=title_prefix, - metadata_suffix_semantic=metadata_suffix_semantic, - metadata_suffix_keyword=metadata_suffix_keyword, - mini_chunk_texts=mini_chunk_splitter.split_text(chunk_text) - if mini_chunk_splitter and chunk_text.strip() - else None, - ) - for chunk_ind, chunk_text in enumerate(split_texts) - ] - return chunks - def _get_metadata_suffix_for_document_index( metadata: dict[str, str | list[str]], include_separator: bool = False @@ -121,206 +70,235 @@ def _get_metadata_suffix_for_document_index( return metadata_semantic, metadata_keyword -def chunk_document( - document: Document, - model_name: str, - provider_type: EmbeddingProvider | None, - enable_multipass: bool, - chunk_tok_size: int = DOC_EMBEDDING_CONTEXT_SIZE, - subsection_overlap: int = CHUNK_OVERLAP, - blurb_size: int = BLURB_SIZE, # Used for both title and content - include_metadata: bool = not SKIP_METADATA_IN_CHUNK, - mini_chunk_size: int = MINI_CHUNK_SIZE, -) -> list[DocAwareChunk]: - from llama_index.text_splitter import SentenceSplitter - - tokenizer = get_tokenizer( - model_name=model_name, - provider_type=provider_type, +def _combine_chunks(chunks: list[DocAwareChunk], index: int) -> DocAwareChunk: + merged_chunk = DocAwareChunk( + source_document=chunks[0].source_document, + chunk_id=index, + blurb=chunks[0].blurb, + content=chunks[0].content, + source_links=chunks[0].source_links or {}, + section_continuation=(index > 0), + title_prefix=chunks[0].title_prefix, + metadata_suffix_semantic=chunks[0].metadata_suffix_semantic, + metadata_suffix_keyword=chunks[0].metadata_suffix_keyword, + large_chunk_reference_ids=[chunks[0].chunk_id], + mini_chunk_texts=None, ) - blurb_splitter = SentenceSplitter( - tokenizer=tokenizer.tokenize, chunk_size=blurb_size, chunk_overlap=0 - ) + offset = 0 + for i in range(1, len(chunks)): + merged_chunk.content += SECTION_SEPARATOR + chunks[i].content + merged_chunk.large_chunk_reference_ids.append(chunks[i].chunk_id) - chunk_splitter = SentenceSplitter( - tokenizer=tokenizer.tokenize, - chunk_size=chunk_tok_size, - chunk_overlap=subsection_overlap, - ) + offset += len(SECTION_SEPARATOR) + len(chunks[i - 1].content) + for link_offset, link_text in (chunks[i].source_links or {}).items(): + if merged_chunk.source_links is None: + merged_chunk.source_links = {} + merged_chunk.source_links[link_offset + offset] = link_text - mini_chunk_splitter = SentenceSplitter( - tokenizer=tokenizer.tokenize, - chunk_size=mini_chunk_size, - chunk_overlap=0, - ) + return merged_chunk - title = extract_blurb(document.get_title_for_document_index() or "", blurb_splitter) - title_prefix = title + RETURN_SEPARATOR if title else "" - title_tokens = len(tokenizer.tokenize(title_prefix)) - metadata_suffix_semantic = "" - metadata_suffix_keyword = "" - metadata_tokens = 0 - if include_metadata: - ( - metadata_suffix_semantic, - metadata_suffix_keyword, - ) = _get_metadata_suffix_for_document_index( - document.metadata, include_separator=True - ) - metadata_tokens = len(tokenizer.tokenize(metadata_suffix_semantic)) - - if metadata_tokens >= chunk_tok_size * MAX_METADATA_PERCENTAGE: - # Note: we can keep the keyword suffix even if the semantic suffix is too long to fit in the model - # context, there is no limit for the keyword component - metadata_suffix_semantic = "" - metadata_tokens = 0 - - content_token_limit = chunk_tok_size - title_tokens - metadata_tokens - - # If there is not enough context remaining then just index the chunk with no prefix/suffix - if content_token_limit <= CHUNK_MIN_CONTENT: - content_token_limit = chunk_tok_size - title_prefix = "" - metadata_suffix_semantic = "" - - chunks: list[DocAwareChunk] = [] - link_offsets: dict[int, str] = {} - chunk_text = "" - for section in document.sections: - section_text = section.text - section_link_text = section.link or "" - - section_tok_length = len(tokenizer.tokenize(section_text)) - current_tok_length = len(tokenizer.tokenize(chunk_text)) - curr_offset_len = len(shared_precompare_cleanup(chunk_text)) - - # Large sections are considered self-contained/unique therefore they start a new chunk and are not concatenated - # at the end by other sections - if section_tok_length > content_token_limit: - if chunk_text: - chunks.append( - DocAwareChunk( - source_document=document, - chunk_id=len(chunks), - blurb=extract_blurb(chunk_text, blurb_splitter), - content=chunk_text, - source_links=link_offsets, - section_continuation=False, - title_prefix=title_prefix, - metadata_suffix_semantic=metadata_suffix_semantic, - metadata_suffix_keyword=metadata_suffix_keyword, - mini_chunk_texts=mini_chunk_splitter.split_text(chunk_text) - if enable_multipass and chunk_text.strip() - else None, - ) - ) - link_offsets = {} - chunk_text = "" - - large_section_chunks = chunk_large_section( - section_text=section_text, - section_link_text=section_link_text, - document=document, - start_chunk_id=len(chunks), - chunk_splitter=chunk_splitter, - mini_chunk_splitter=mini_chunk_splitter - if enable_multipass and chunk_text.strip() - else None, - blurb=extract_blurb(section_text, blurb_splitter), - title_prefix=title_prefix, - metadata_suffix_semantic=metadata_suffix_semantic, - metadata_suffix_keyword=metadata_suffix_keyword, - ) - chunks.extend(large_section_chunks) - continue - - # In the case where the whole section is shorter than a chunk, either adding to chunk or start a new one - if ( - current_tok_length - + len(tokenizer.tokenize(SECTION_SEPARATOR)) - + section_tok_length - <= content_token_limit - ): - chunk_text += ( - SECTION_SEPARATOR + section_text if chunk_text else section_text - ) - link_offsets[curr_offset_len] = section_link_text - else: - chunks.append( - DocAwareChunk( - source_document=document, - chunk_id=len(chunks), - blurb=extract_blurb(chunk_text, blurb_splitter), - content=chunk_text, - source_links=link_offsets, - section_continuation=False, - title_prefix=title_prefix, - metadata_suffix_semantic=metadata_suffix_semantic, - metadata_suffix_keyword=metadata_suffix_keyword, - mini_chunk_texts=mini_chunk_splitter.split_text(chunk_text) - if enable_multipass and chunk_text.strip() - else None, - ) - ) - link_offsets = {0: section_link_text} - chunk_text = section_text - - # Once we hit the end, if we're still in the process of building a chunk, add what we have. If there is only whitespace left - # then don't include it. If there are no chunks at all from the doc, we can just create a single chunk with the title. - if chunk_text.strip() or not chunks: - chunks.append( - DocAwareChunk( - source_document=document, - chunk_id=len(chunks), - blurb=extract_blurb(chunk_text, blurb_splitter), - content=chunk_text, - source_links=link_offsets or {0: section_link_text}, - section_continuation=False, - title_prefix=title_prefix, - metadata_suffix_semantic=metadata_suffix_semantic, - metadata_suffix_keyword=metadata_suffix_keyword, - mini_chunk_texts=mini_chunk_splitter.split_text(chunk_text) - if enable_multipass and chunk_text.strip() - else None, - ) - ) - - # If the chunk does not have any useable content, it will not be indexed - return chunks +def generate_large_chunks(chunks: list[DocAwareChunk]) -> list[DocAwareChunk]: + large_chunks = [ + _combine_chunks(chunks[i : i + LARGE_CHUNK_RATIO], idx) + for idx, i in enumerate(range(0, len(chunks), LARGE_CHUNK_RATIO)) + if len(chunks[i : i + LARGE_CHUNK_RATIO]) > 1 + ] + return large_chunks class Chunker: - @abc.abstractmethod - def chunk( - self, - document: Document, - ) -> list[DocAwareChunk]: - raise NotImplementedError + """ + Chunks documents into smaller chunks for indexing. + """ - -class DefaultChunker(Chunker): def __init__( self, - model_name: str, - provider_type: EmbeddingProvider | None, - enable_multipass: bool, - ): - self.model_name = model_name - self.provider_type = provider_type - self.enable_multipass = enable_multipass + tokenizer: BaseTokenizer, + enable_multipass: bool = False, + enable_large_chunks: bool = False, + blurb_size: int = BLURB_SIZE, + include_metadata: bool = not SKIP_METADATA_IN_CHUNK, + chunk_token_limit: int = DOC_EMBEDDING_CONTEXT_SIZE, + chunk_overlap: int = CHUNK_OVERLAP, + mini_chunk_size: int = MINI_CHUNK_SIZE, + ) -> None: + from llama_index.text_splitter import SentenceSplitter - def chunk( + self.include_metadata = include_metadata + self.chunk_token_limit = chunk_token_limit + self.enable_multipass = enable_multipass + self.enable_large_chunks = enable_large_chunks + self.tokenizer = tokenizer + + self.blurb_splitter = SentenceSplitter( + tokenizer=tokenizer.tokenize, + chunk_size=blurb_size, + chunk_overlap=0, + ) + + self.chunk_splitter = SentenceSplitter( + tokenizer=tokenizer.tokenize, + chunk_size=chunk_token_limit, + chunk_overlap=chunk_overlap, + ) + + self.mini_chunk_splitter = ( + SentenceSplitter( + tokenizer=tokenizer.tokenize, + chunk_size=mini_chunk_size, + chunk_overlap=0, + ) + if enable_multipass + else None + ) + + def _extract_blurb(self, text: str) -> str: + texts = self.blurb_splitter.split_text(text) + if not texts: + return "" + return texts[0] + + def _get_mini_chunk_texts(self, chunk_text: str) -> list[str] | None: + if self.mini_chunk_splitter and chunk_text.strip(): + return self.mini_chunk_splitter.split_text(chunk_text) + return None + + def _chunk_document( self, document: Document, + title_prefix: str, + metadata_suffix_semantic: str, + metadata_suffix_keyword: str, + content_token_limit: int, ) -> list[DocAwareChunk]: + """ + Loops through sections of the document, adds metadata and converts them into chunks. + """ + chunks: list[DocAwareChunk] = [] + link_offsets: dict[int, str] = {} + chunk_text = "" + + def _create_chunk( + text: str, + links: dict[int, str], + is_continuation: bool = False, + ) -> DocAwareChunk: + return DocAwareChunk( + source_document=document, + chunk_id=len(chunks), + blurb=self._extract_blurb(text), + content=text, + source_links=links or {0: ""}, + section_continuation=is_continuation, + title_prefix=title_prefix, + metadata_suffix_semantic=metadata_suffix_semantic, + metadata_suffix_keyword=metadata_suffix_keyword, + mini_chunk_texts=self._get_mini_chunk_texts(text), + ) + + for section in document.sections: + section_text = section.text + section_link_text = section.link or "" + + section_token_count = len(self.tokenizer.tokenize(section_text)) + + # Large sections are considered self-contained/unique + # Therefore, they start a new chunk and are not concatenated + # at the end by other sections + if section_token_count > content_token_limit: + if chunk_text: + chunks.append(_create_chunk(chunk_text, link_offsets)) + link_offsets = {} + chunk_text = "" + + split_texts = self.chunk_splitter.split_text(section_text) + for i, split_text in enumerate(split_texts): + chunks.append( + _create_chunk( + text=split_text, + links={0: section_link_text}, + is_continuation=(i != 0), + ) + ) + continue + + current_token_count = len(self.tokenizer.tokenize(chunk_text)) + current_offset = len(shared_precompare_cleanup(chunk_text)) + # In the case where the whole section is shorter than a chunk, either add + # to chunk or start a new one + next_section_tokens = ( + len(self.tokenizer.tokenize(SECTION_SEPARATOR)) + section_token_count + ) + if next_section_tokens + current_token_count <= content_token_limit: + if chunk_text: + chunk_text += SECTION_SEPARATOR + chunk_text += section_text + link_offsets[current_offset] = section_link_text + else: + chunks.append(_create_chunk(chunk_text, link_offsets)) + link_offsets = {0: section_link_text} + chunk_text = section_text + + # Once we hit the end, if we're still in the process of building a chunk, add what we have. + # If there is only whitespace left then don't include it. If there are no chunks at all + # from the doc, we can just create a single chunk with the title. + if chunk_text.strip() or not chunks: + chunks.append( + _create_chunk( + chunk_text, + link_offsets or {0: section_link_text}, + ) + ) + + # If the chunk does not have any useable content, it will not be indexed + return chunks + + def chunk(self, document: Document) -> list[DocAwareChunk]: # Specifically for reproducing an issue with gmail if document.source == DocumentSource.GMAIL: logger.debug(f"Chunking {document.semantic_identifier}") - return chunk_document( - document=document, - model_name=self.model_name, - provider_type=self.provider_type, - enable_multipass=self.enable_multipass, + + title = self._extract_blurb(document.get_title_for_document_index() or "") + title_prefix = title + RETURN_SEPARATOR if title else "" + title_tokens = len(self.tokenizer.tokenize(title_prefix)) + + metadata_suffix_semantic = "" + metadata_suffix_keyword = "" + metadata_tokens = 0 + if self.include_metadata: + ( + metadata_suffix_semantic, + metadata_suffix_keyword, + ) = _get_metadata_suffix_for_document_index( + document.metadata, include_separator=True + ) + metadata_tokens = len(self.tokenizer.tokenize(metadata_suffix_semantic)) + + if metadata_tokens >= self.chunk_token_limit * MAX_METADATA_PERCENTAGE: + # Note: we can keep the keyword suffix even if the semantic suffix is too long to fit in the model + # context, there is no limit for the keyword component + metadata_suffix_semantic = "" + metadata_tokens = 0 + + content_token_limit = self.chunk_token_limit - title_tokens - metadata_tokens + # If there is not enough context remaining then just index the chunk with no prefix/suffix + if content_token_limit <= CHUNK_MIN_CONTENT: + content_token_limit = self.chunk_token_limit + title_prefix = "" + metadata_suffix_semantic = "" + + normal_chunks = self._chunk_document( + document, + title_prefix, + metadata_suffix_semantic, + metadata_suffix_keyword, + content_token_limit, ) + + if self.enable_multipass and self.enable_large_chunks: + large_chunks = generate_large_chunks(normal_chunks) + normal_chunks.extend(large_chunks) + + return normal_chunks diff --git a/backend/danswer/indexing/embedder.py b/backend/danswer/indexing/embedder.py index aafc9bf6d9..b140f87920 100644 --- a/backend/danswer/indexing/embedder.py +++ b/backend/danswer/indexing/embedder.py @@ -3,7 +3,6 @@ from abc import abstractmethod from sqlalchemy.orm import Session -from danswer.configs.model_configs import DOC_EMBEDDING_CONTEXT_SIZE from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.models import EmbeddingModel as DbEmbeddingModel @@ -13,6 +12,7 @@ from danswer.indexing.models import DocAwareChunk from danswer.indexing.models import IndexChunk from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.utils.logger import setup_logger +from danswer.utils.timing import log_function_time from shared_configs.configs import INDEXING_MODEL_SERVER_HOST from shared_configs.configs import INDEXING_MODEL_SERVER_PORT from shared_configs.enums import EmbeddingProvider @@ -40,6 +40,19 @@ class IndexingEmbedder(ABC): self.provider_type = provider_type self.api_key = api_key + self.embedding_model = EmbeddingModel( + model_name=model_name, + query_prefix=query_prefix, + passage_prefix=passage_prefix, + normalize=normalize, + api_key=api_key, + provider_type=provider_type, + # The below are globally set, this flow always uses the indexing one + server_host=INDEXING_MODEL_SERVER_HOST, + server_port=INDEXING_MODEL_SERVER_PORT, + retrim_content=True, + ) + @abstractmethod def embed_chunks( self, @@ -61,28 +74,18 @@ class DefaultIndexingEmbedder(IndexingEmbedder): super().__init__( model_name, normalize, query_prefix, passage_prefix, provider_type, api_key ) - self.max_seq_length = DOC_EMBEDDING_CONTEXT_SIZE # Currently not customizable - - self.embedding_model = EmbeddingModel( - model_name=model_name, - query_prefix=query_prefix, - passage_prefix=passage_prefix, - normalize=normalize, - api_key=api_key, - provider_type=provider_type, - # The below are globally set, this flow always uses the indexing one - server_host=INDEXING_MODEL_SERVER_HOST, - server_port=INDEXING_MODEL_SERVER_PORT, - retrim_content=True, - ) + @log_function_time() def embed_chunks( self, chunks: list[DocAwareChunk], ) -> list[IndexChunk]: # All chunks at this point must have some non-empty content flat_chunk_texts: list[str] = [] + large_chunks_present = False for chunk in chunks: + if chunk.large_chunk_reference_ids: + large_chunks_present = True chunk_text = ( f"{chunk.title_prefix}{chunk.content}{chunk.metadata_suffix_semantic}" ) or chunk.source_document.get_title_for_document_index() @@ -98,7 +101,9 @@ class DefaultIndexingEmbedder(IndexingEmbedder): flat_chunk_texts.extend(chunk.mini_chunk_texts) embeddings = self.embedding_model.encode( - flat_chunk_texts, text_type=EmbedTextType.PASSAGE + texts=flat_chunk_texts, + text_type=EmbedTextType.PASSAGE, + large_chunks_present=large_chunks_present, ) chunk_titles = { diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 8d73eb79b7..dd0607985a 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -22,7 +22,6 @@ from danswer.db.tag import create_or_add_document_tag_list from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentMetadata from danswer.indexing.chunker import Chunker -from danswer.indexing.chunker import DefaultChunker from danswer.indexing.embedder import IndexingEmbedder from danswer.indexing.models import DocAwareChunk from danswer.indexing.models import DocMetadataAwareIndexChunk @@ -183,11 +182,9 @@ def index_doc_batch( ) logger.debug("Starting chunking") - chunks: list[DocAwareChunk] = [ - chunk - for document in updatable_docs - for chunk in chunker.chunk(document=document) - ] + chunks: list[DocAwareChunk] = [] + for document in updatable_docs: + chunks.extend(chunker.chunk(document=document)) logger.debug("Starting embedding") chunks_with_embeddings = ( @@ -274,10 +271,20 @@ def build_indexing_pipeline( if search_settings else ENABLE_MULTIPASS_INDEXING ) - chunker = chunker or DefaultChunker( - model_name=embedder.model_name, - provider_type=embedder.provider_type, + enable_large_chunks = multipass and ( + embedder.provider_type is not None or embedder.model_name.startswith("nomic-ai") + ) + + if multipass and not enable_large_chunks: + logger.warning( + "Multipass indexing is enabled, but the provider type or model name" + " is not supported. Only mini chunks will be indexed." + ) + + chunker = chunker or Chunker( + tokenizer=embedder.embedding_model.tokenizer, enable_multipass=multipass, + enable_large_chunks=enable_large_chunks, ) return partial( diff --git a/backend/danswer/indexing/models.py b/backend/danswer/indexing/models.py index 0714b28d81..317a4404a5 100644 --- a/backend/danswer/indexing/models.py +++ b/backend/danswer/indexing/models.py @@ -46,6 +46,8 @@ class DocAwareChunk(BaseChunk): mini_chunk_texts: list[str] | None + large_chunk_reference_ids: list[int] = [] + def to_short_descriptor(self) -> str: """Used when logging the identity of a chunk""" return ( diff --git a/backend/danswer/natural_language_processing/search_nlp_models.py b/backend/danswer/natural_language_processing/search_nlp_models.py index c34d57cb81..a5ae704dd7 100644 --- a/backend/danswer/natural_language_processing/search_nlp_models.py +++ b/backend/danswer/natural_language_processing/search_nlp_models.py @@ -8,6 +8,7 @@ import requests from httpx import HTTPError from retry import retry +from danswer.configs.app_configs import LARGE_CHUNK_RATIO from danswer.configs.model_configs import BATCH_SIZE_ENCODE_CHUNKS from danswer.configs.model_configs import ( BATCH_SIZE_ENCODE_CHUNKS_FOR_API_EMBEDDING_SERVICES, @@ -89,18 +90,18 @@ class EmbeddingModel: passage_prefix: str | None, api_key: str | None, provider_type: EmbeddingProvider | None, - # The following are globals are currently not configurable - max_seq_length: int = DOC_EMBEDDING_CONTEXT_SIZE, retrim_content: bool = False, ) -> None: self.api_key = api_key self.provider_type = provider_type - self.max_seq_length = max_seq_length self.query_prefix = query_prefix self.passage_prefix = passage_prefix self.normalize = normalize self.model_name = model_name self.retrim_content = retrim_content + self.tokenizer = get_tokenizer( + model_name=model_name, provider_type=provider_type + ) model_server_url = build_model_server_url(server_host, server_port) self.embed_server_endpoint = f"{model_server_url}/encoder/bi-encoder-embed" @@ -129,50 +130,26 @@ class EmbeddingModel: else: return _make_request() - def _encode_api_model( - self, texts: list[str], text_type: EmbedTextType, batch_size: int - ) -> list[Embedding]: - if not self.provider_type: - raise ValueError("Provider type is not set for API embedding") - - embeddings: list[Embedding] = [] - - text_batches = batch_list(texts, batch_size) - for idx, text_batch in enumerate(text_batches, start=1): - logger.debug(f"Encoding batch {idx} of {len(text_batches)}") - embed_request = EmbedRequest( - model_name=self.model_name, - texts=text_batch, - max_context_length=self.max_seq_length, - normalize_embeddings=self.normalize, - api_key=self.api_key, - provider_type=self.provider_type, - text_type=text_type, - manual_query_prefix=self.query_prefix, - manual_passage_prefix=self.passage_prefix, - ) - response = self._make_model_server_request(embed_request) - embeddings.extend(response.embeddings) - - return embeddings - - def _encode_local_model( + def _batch_encode_texts( self, texts: list[str], text_type: EmbedTextType, batch_size: int, + max_seq_length: int, ) -> list[Embedding]: text_batches = batch_list(texts, batch_size) - embeddings: list[Embedding] = [] + logger.debug( f"Encoding {len(texts)} texts in {len(text_batches)} batches for local model" ) + + embeddings: list[Embedding] = [] for idx, text_batch in enumerate(text_batches, start=1): logger.debug(f"Encoding batch {idx} of {len(text_batches)}") embed_request = EmbedRequest( model_name=self.model_name, texts=text_batch, - max_context_length=self.max_seq_length, + max_context_length=max_seq_length, normalize_embeddings=self.normalize, api_key=self.api_key, provider_type=self.provider_type, @@ -189,12 +166,17 @@ class EmbeddingModel: self, texts: list[str], text_type: EmbedTextType, + large_chunks_present: bool = False, local_embedding_batch_size: int = BATCH_SIZE_ENCODE_CHUNKS, api_embedding_batch_size: int = BATCH_SIZE_ENCODE_CHUNKS_FOR_API_EMBEDDING_SERVICES, + max_seq_length: int = DOC_EMBEDDING_CONTEXT_SIZE, ) -> list[Embedding]: if not texts or not all(texts): raise ValueError(f"Empty or missing text for embedding: {texts}") + if large_chunks_present: + max_seq_length *= LARGE_CHUNK_RATIO + if self.retrim_content: # This is applied during indexing as a catchall for overly long titles (or other uncapped fields) # Note that this uses just the default tokenizer which may also lead to very minor miscountings @@ -202,25 +184,28 @@ class EmbeddingModel: texts = [ tokenizer_trim_content( content=text, - desired_length=self.max_seq_length, - tokenizer=get_tokenizer( - model_name=self.model_name, - provider_type=self.provider_type, - ), + desired_length=max_seq_length, + tokenizer=self.tokenizer, ) for text in texts ] - if self.provider_type: - if self.provider_type == "openai": - texts = [clean_openai_text(text) for text in texts] - return self._encode_api_model( - texts=texts, text_type=text_type, batch_size=api_embedding_batch_size - ) + if self.provider_type == EmbeddingProvider.OPENAI: + # If the provider is openai, we need to clean the text + # as a temporary workaround for the openai API + texts = [clean_openai_text(text) for text in texts] - # if no provider, use local model - return self._encode_local_model( - texts=texts, text_type=text_type, batch_size=local_embedding_batch_size + batch_size = ( + api_embedding_batch_size + if self.provider_type + else local_embedding_batch_size + ) + + return self._batch_encode_texts( + texts=texts, + text_type=text_type, + batch_size=batch_size, + max_seq_length=max_seq_length, ) diff --git a/backend/danswer/search/models.py b/backend/danswer/search/models.py index 8c3138cfd0..d7027b929a 100644 --- a/backend/danswer/search/models.py +++ b/backend/danswer/search/models.py @@ -172,6 +172,7 @@ class InferenceChunk(BaseChunk): updated_at: datetime | None primary_owners: list[str] | None = None secondary_owners: list[str] | None = None + large_chunk_reference_ids: list[int] = [] @property def unique_id(self) -> str: diff --git a/backend/danswer/search/pipeline.py b/backend/danswer/search/pipeline.py index b5b5e84aea..52e046153b 100644 --- a/backend/danswer/search/pipeline.py +++ b/backend/danswer/search/pipeline.py @@ -10,6 +10,7 @@ from danswer.configs.chat_configs import DISABLE_LLM_DOC_RELEVANCE from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.models import User from danswer.document_index.factory import get_default_document_index +from danswer.document_index.interfaces import VespaChunkRequest from danswer.llm.answering.models import DocumentPruningConfig from danswer.llm.answering.models import PromptConfig from danswer.llm.answering.prune_and_merge import _merge_sections @@ -26,6 +27,7 @@ from danswer.search.models import RerankMetricsContainer from danswer.search.models import RetrievalMetricsContainer from danswer.search.models import SearchQuery from danswer.search.models import SearchRequest +from danswer.search.postprocessing.postprocessing import cleanup_chunks from danswer.search.postprocessing.postprocessing import search_postprocessing from danswer.search.preprocessing.preprocessing import retrieval_preprocessing from danswer.search.retrieval.search_runner import retrieve_chunks @@ -35,7 +37,6 @@ from danswer.secondary_llm_flows.agentic_evaluation import evaluate_inference_se from danswer.utils.logger import setup_logger from danswer.utils.threadpool_concurrency import FunctionCall from danswer.utils.threadpool_concurrency import run_functions_in_parallel -from danswer.utils.threadpool_concurrency import run_functions_tuples_in_parallel from danswer.utils.timing import log_function_time logger = setup_logger() @@ -145,6 +146,7 @@ class SearchPipeline: if self._retrieved_chunks is not None: return self._retrieved_chunks + # These chunks do not include large chunks and have been deduped self._retrieved_chunks = retrieve_chunks( query=self.search_query, document_index=self.document_index, @@ -165,49 +167,51 @@ class SearchPipeline: if self._retrieved_sections is not None: return self._retrieved_sections + # These chunks are ordered, deduped, and contain no large chunks retrieved_chunks = self._get_chunks() above = self.search_query.chunks_above below = self.search_query.chunks_below - functions_with_args: list[tuple[Callable, tuple]] = [] expanded_inference_sections = [] + inference_chunks: list[InferenceChunk] = [] + chunk_requests: list[VespaChunkRequest] = [] # Full doc setting takes priority if self.search_query.full_doc: seen_document_ids = set() - unique_chunks = [] # This preserves the ordering since the chunks are retrieved in score order for chunk in retrieved_chunks: if chunk.document_id not in seen_document_ids: seen_document_ids.add(chunk.document_id) - unique_chunks.append(chunk) - - functions_with_args.append( - ( - self.document_index.id_based_retrieval, - ( - chunk.document_id, - None, # Start chunk ind - None, # End chunk ind - # There is no chunk level permissioning, this expansion around chunks - # can be assumed to be safe - IndexFilters(access_control_list=None), - ), + chunk_requests.append( + VespaChunkRequest( + document_id=chunk.document_id, ) ) - list_inference_chunks = run_functions_tuples_in_parallel( - functions_with_args, allow_failures=False + + inference_chunks.extend( + cleanup_chunks( + self.document_index.id_based_retrieval( + chunk_requests=chunk_requests, + filters=IndexFilters(access_control_list=None), + ) + ) ) - for ind, chunk in enumerate(unique_chunks): - inf_chunks = list_inference_chunks[ind] + # Create a dictionary to group chunks by document_id + grouped_inference_chunks: dict[str, list[InferenceChunk]] = {} + for chunk in inference_chunks: + if chunk.document_id not in grouped_inference_chunks: + grouped_inference_chunks[chunk.document_id] = [] + grouped_inference_chunks[chunk.document_id].append(chunk) + for chunk_group in grouped_inference_chunks.values(): inference_section = inference_section_from_chunks( - center_chunk=chunk, - chunks=inf_chunks, + center_chunk=chunk_group[0], + chunks=chunk_group, ) if inference_section is not None: @@ -244,37 +248,34 @@ class SearchPipeline: ] flat_ranges: list[ChunkRange] = [r for ranges in merged_ranges for r in ranges] - flattened_inference_chunks: list[InferenceChunk] = [] - parallel_functions_with_args = [] for chunk_range in flat_ranges: # Don't need to fetch chunks within range for merging if chunk_above / below are 0. if above == below == 0: - flattened_inference_chunks.extend(chunk_range.chunks) + inference_chunks.extend(chunk_range.chunks) else: - parallel_functions_with_args.append( - ( - self.document_index.id_based_retrieval, - ( - chunk_range.chunks[0].document_id, - chunk_range.start, - chunk_range.end, - IndexFilters(access_control_list=None), - ), + chunk_requests.append( + VespaChunkRequest( + document_id=chunk_range.chunks[0].document_id, + min_chunk_ind=chunk_range.start, + max_chunk_ind=chunk_range.end, ) ) - if parallel_functions_with_args: - list_inference_chunks = run_functions_tuples_in_parallel( - parallel_functions_with_args, allow_failures=False + if chunk_requests: + inference_chunks.extend( + cleanup_chunks( + self.document_index.id_based_retrieval( + chunk_requests=chunk_requests, + filters=IndexFilters(access_control_list=None), + batch_retrieval=True, + ) + ) ) - for inference_chunks in list_inference_chunks: - flattened_inference_chunks.extend(inference_chunks) doc_chunk_ind_to_chunk = { - (chunk.document_id, chunk.chunk_id): chunk - for chunk in flattened_inference_chunks + (chunk.document_id, chunk.chunk_id): chunk for chunk in inference_chunks } # Build the surroundings for all of the initial retrieved chunks diff --git a/backend/danswer/search/retrieval/search_runner.py b/backend/danswer/search/retrieval/search_runner.py index 15eca79147..8da889b4b9 100644 --- a/backend/danswer/search/retrieval/search_runner.py +++ b/backend/danswer/search/retrieval/search_runner.py @@ -9,10 +9,15 @@ from sqlalchemy.orm import Session from danswer.db.embedding_model import get_current_db_embedding_model from danswer.document_index.interfaces import DocumentIndex +from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.vespa.shared_utils.utils import ( + replace_invalid_doc_id_characters, +) from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.search.models import ChunkMetric from danswer.search.models import IndexFilters from danswer.search.models import InferenceChunk +from danswer.search.models import InferenceChunkUncleaned from danswer.search.models import InferenceSection from danswer.search.models import MAX_METRICS_CONTENT from danswer.search.models import RetrievalMetricsContainer @@ -110,6 +115,12 @@ def doc_index_retrieval( document_index: DocumentIndex, db_session: Session, ) -> list[InferenceChunk]: + """ + This function performs the search to retrieve the chunks, + extracts chunks from the large chunks, persists the scores + from the large chunks to the referenced chunks, + dedupes the chunks, and cleans the chunks. + """ db_embedding_model = get_current_db_embedding_model(db_session) model = EmbeddingModel( @@ -137,7 +148,71 @@ def doc_index_retrieval( offset=query.offset, ) - return cleanup_chunks(top_chunks) + retrieval_requests: list[VespaChunkRequest] = [] + normal_chunks: list[InferenceChunkUncleaned] = [] + referenced_chunk_scores: dict[tuple[str, int], float] = {} + for chunk in top_chunks: + if chunk.large_chunk_reference_ids: + retrieval_requests.append( + VespaChunkRequest( + document_id=replace_invalid_doc_id_characters(chunk.document_id), + min_chunk_ind=chunk.large_chunk_reference_ids[0], + max_chunk_ind=chunk.large_chunk_reference_ids[-1], + ) + ) + # for each referenced chunk, persist the + # highest score to the referenced chunk + for chunk_id in chunk.large_chunk_reference_ids: + key = (chunk.document_id, chunk_id) + referenced_chunk_scores[key] = max( + referenced_chunk_scores.get(key, 0), chunk.score or 0 + ) + else: + normal_chunks.append(chunk) + + # If there are no large chunks, just return the normal chunks + if not retrieval_requests: + return cleanup_chunks(normal_chunks) + + # Retrieve and return the referenced normal chunks from the large chunks + retrieved_inference_chunks = document_index.id_based_retrieval( + chunk_requests=retrieval_requests, + filters=query.filters, + batch_retrieval=True, + ) + + # Apply the scores from the large chunks to the chunks referenced + # by each large chunk + for chunk in retrieved_inference_chunks: + if (chunk.document_id, chunk.chunk_id) in referenced_chunk_scores: + chunk.score = referenced_chunk_scores[(chunk.document_id, chunk.chunk_id)] + referenced_chunk_scores.pop((chunk.document_id, chunk.chunk_id)) + else: + logger.error( + f"Chunk {chunk.document_id} {chunk.chunk_id} not found in referenced chunk scores" + ) + + # Log any chunks that were not found in the retrieved chunks + for reference in referenced_chunk_scores.keys(): + logger.error(f"Chunk {reference} not found in retrieved chunks") + + unique_chunks: dict[tuple[str, int], InferenceChunkUncleaned] = { + (chunk.document_id, chunk.chunk_id): chunk for chunk in normal_chunks + } + + # persist the highest score of each deduped chunk + for chunk in retrieved_inference_chunks: + key = (chunk.document_id, chunk.chunk_id) + # For duplicates, keep the highest score + if key not in unique_chunks or (chunk.score or 0) > ( + unique_chunks[key].score or 0 + ): + unique_chunks[key] = chunk + + # Deduplicate the chunks + deduped_chunks = list(unique_chunks.values()) + deduped_chunks.sort(key=lambda chunk: chunk.score or 0, reverse=True) + return cleanup_chunks(deduped_chunks) def _simplify_text(text: str) -> str: @@ -220,34 +295,42 @@ def inference_sections_from_ids( document_index: DocumentIndex, ) -> list[InferenceSection]: # Currently only fetches whole docs - doc_ids_set = set(doc_id for doc_id, chunk_id in doc_identifiers) + doc_ids_set = set(doc_id for doc_id, _ in doc_identifiers) + + chunk_requests: list[VespaChunkRequest] = [ + VespaChunkRequest(document_id=doc_id) for doc_id in doc_ids_set + ] # No need for ACL here because the doc ids were validated beforehand filters = IndexFilters(access_control_list=None) - functions_with_args: list[tuple[Callable, tuple]] = [ - (document_index.id_based_retrieval, (doc_id, None, None, filters)) - for doc_id in doc_ids_set - ] - - parallel_results = run_functions_tuples_in_parallel( - functions_with_args, allow_failures=True + retrieved_chunks = document_index.id_based_retrieval( + chunk_requests=chunk_requests, + filters=filters, ) - # Any failures to retrieve would give a None, drop the Nones and empty lists - inference_chunks_sets = [res for res in parallel_results if res] + cleaned_chunks = cleanup_chunks(retrieved_chunks) + if not cleaned_chunks: + return [] - return [ - inference_section - for inference_section in [ - inference_section_from_chunks( + # Group chunks by document ID + chunks_by_doc_id: dict[str, list[InferenceChunk]] = {} + for chunk in cleaned_chunks: + chunks_by_doc_id.setdefault(chunk.document_id, []).append(chunk) + + inference_sections = [ + section + for chunks in chunks_by_doc_id.values() + if chunks + and ( + section := inference_section_from_chunks( # The scores will always be 0 because the fetching by id gives back # no search scores. This is not needed though if the user is explicitly # selecting a document. - center_chunk=chunk_set[0], - chunks=chunk_set, + center_chunk=chunks[0], + chunks=chunks, ) - for chunk_set in inference_chunks_sets - ] - if inference_section is not None + ) ] + + return inference_sections diff --git a/backend/danswer/server/documents/document.py b/backend/danswer/server/documents/document.py index 6d46b72993..fefd49900b 100644 --- a/backend/danswer/server/documents/document.py +++ b/backend/danswer/server/documents/document.py @@ -9,8 +9,10 @@ from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.engine import get_session from danswer.db.models import User from danswer.document_index.factory import get_default_document_index +from danswer.document_index.interfaces import VespaChunkRequest from danswer.natural_language_processing.utils import get_tokenizer from danswer.prompts.prompt_utils import build_doc_context_str +from danswer.search.models import IndexFilters from danswer.search.preprocessing.access_filters import build_access_filters_for_user from danswer.server.documents.models import ChunkInfo from danswer.server.documents.models import DocumentInfo @@ -35,10 +37,8 @@ def get_document_info( user_acl_filters = build_access_filters_for_user(user, db_session) inference_chunks = document_index.id_based_retrieval( - document_id=document_id, - min_chunk_ind=None, - max_chunk_ind=None, - user_access_control_list=user_acl_filters, + chunk_requests=[VespaChunkRequest(document_id=document_id)], + filters=IndexFilters(access_control_list=user_acl_filters), ) if not inference_chunks: @@ -83,11 +83,15 @@ def get_chunk_info( ) user_acl_filters = build_access_filters_for_user(user, db_session) - inference_chunks = document_index.id_based_retrieval( + chunk_request = VespaChunkRequest( document_id=document_id, min_chunk_ind=chunk_id, max_chunk_ind=chunk_id, - user_access_control_list=user_acl_filters, + ) + inference_chunks = document_index.id_based_retrieval( + chunk_requests=[chunk_request], + filters=IndexFilters(access_control_list=user_acl_filters), + batch_retrieval=True, ) if not inference_chunks: diff --git a/backend/model_server/custom_models.py b/backend/model_server/custom_models.py index fa0fa60a3e..adddadd89c 100644 --- a/backend/model_server/custom_models.py +++ b/backend/model_server/custom_models.py @@ -65,6 +65,7 @@ def warm_up_intent_model() -> None: tokens = intent_tokenizer( MODEL_WARM_UP_STRING, return_tensors="pt", truncation=True, padding=True ) + intent_model = get_local_intent_model() device = intent_model.device intent_model( diff --git a/backend/tests/regression/answer_quality/README.md b/backend/tests/regression/answer_quality/README.md index 244a778c55..27a0bd5ae9 100644 --- a/backend/tests/regression/answer_quality/README.md +++ b/backend/tests/regression/answer_quality/README.md @@ -17,14 +17,17 @@ This Python script automates the process of running search quality tests for a b 1. Ensure you have the required dependencies installed. 2. Configure the `search_test_config.yaml` file based on the `search_test_config.yaml.template` file. 3. Configure the `.env_eval` file in `deployment/docker_compose` with the correct environment variables. -4. Navigate to Danswer repo: +4. Set up the PYTHONPATH permanently: + Add the following line to your shell configuration file (e.g., `~/.bashrc`, `~/.zshrc`, or `~/.bash_profile`): + ``` + export PYTHONPATH=$PYTHONPATH:/path/to/danswer/backend + ``` + Replace `/path/to/danswer` with the actual path to your Danswer repository. + After adding this line, restart your terminal or run `source ~/.bashrc` (or the appropriate config file) to apply the changes. +5. Navigate to Danswer repo: ``` cd path/to/danswer ``` -5. Set Python Path variable: -``` -export PYTHONPATH=$PYTHONPATH:$PWD/backend -``` 6. Navigate to the answer_quality folder: ``` cd backend/tests/regression/answer_quality diff --git a/backend/tests/regression/answer_quality/api_utils.py b/backend/tests/regression/answer_quality/api_utils.py index 5d74b7a5a9..92b3b96a08 100644 --- a/backend/tests/regression/answer_quality/api_utils.py +++ b/backend/tests/regression/answer_quality/api_utils.py @@ -66,7 +66,6 @@ def get_answer_from_query( except Exception as e: print("Failed to answer the questions:") print(f"\t {str(e)}") - print("Try restarting vespa container and trying agian") raise e return context_data_list, answer diff --git a/backend/tests/regression/answer_quality/file_uploader.py b/backend/tests/regression/answer_quality/file_uploader.py index bab78629d4..8cbc632b5b 100644 --- a/backend/tests/regression/answer_quality/file_uploader.py +++ b/backend/tests/regression/answer_quality/file_uploader.py @@ -52,6 +52,7 @@ def upload_test_files(zip_file_path: str, env_name: str) -> None: def manage_file_upload(zip_file_path: str, env_name: str) -> None: + start_time = time.time() unzipped_file_paths = unzip_and_get_file_paths(zip_file_path) total_file_count = len(unzipped_file_paths) problem_file_list: list[str] = [] @@ -84,15 +85,17 @@ def manage_file_upload(zip_file_path: str, env_name: str) -> None: time.sleep(10) - problem_file_csv_path = os.path.join(current_dir, "problem_files.csv") - with open(problem_file_csv_path, "w", newline="") as csvfile: - csvwriter = csv.writer(csvfile) - csvwriter.writerow(["Problematic File Paths"]) - for problem_file in problem_file_list: - csvwriter.writerow([problem_file]) + if problem_file_list: + problem_file_csv_path = os.path.join(current_dir, "problem_files.csv") + with open(problem_file_csv_path, "w", newline="") as csvfile: + csvwriter = csv.writer(csvfile) + csvwriter.writerow(["Problematic File Paths"]) + for problem_file in problem_file_list: + csvwriter.writerow([problem_file]) for file in unzipped_file_paths: os.unlink(file) + print(f"Total time taken: {(time.time() - start_time)/60} minutes") if __name__ == "__main__": diff --git a/backend/tests/unit/danswer/indexing/test_chunker.py b/backend/tests/unit/danswer/indexing/test_chunker.py index 30221e84ec..f3a72fe17a 100644 --- a/backend/tests/unit/danswer/indexing/test_chunker.py +++ b/backend/tests/unit/danswer/indexing/test_chunker.py @@ -1,7 +1,7 @@ from danswer.configs.constants import DocumentSource from danswer.connectors.models import Document from danswer.connectors.models import Section -from danswer.indexing.chunker import chunk_document +from danswer.indexing.chunker import Chunker from danswer.indexing.embedder import DefaultIndexingEmbedder @@ -37,12 +37,12 @@ def test_chunk_document() -> None: passage_prefix=None, ) - chunks = chunk_document( - document=document, - model_name=embedder.model_name, - provider_type=embedder.provider_type, + chunker = Chunker( + tokenizer=embedder.embedding_model.tokenizer, enable_multipass=False, ) + chunks = chunker.chunk(document) + assert len(chunks) == 5 assert short_section_1 in chunks[0].content assert short_section_3 in chunks[-1].content