diff --git a/backend/danswer/background/celery/celery.py b/backend/danswer/background/celery/celery.py index 83e55aff8..90706feb6 100644 --- a/backend/danswer/background/celery/celery.py +++ b/backend/danswer/background/celery/celery.py @@ -28,6 +28,7 @@ from danswer.db.engine import SYNC_DB_API from danswer.db.models import DocumentSet from danswer.db.tasks import check_live_task_not_timed_out from danswer.db.tasks import get_latest_task +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest @@ -114,15 +115,17 @@ def sync_document_set_task(document_set_id: int) -> None: } # update Vespa - document_index.update( - update_requests=[ + for index_name in get_both_index_names(): + update_requests = [ UpdateRequest( document_ids=[document_id], document_sets=set(document_set_map.get(document_id, [])), ) for document_id in document_ids ] - ) + document_index.update( + update_requests=update_requests, index_name=index_name + ) with Session(get_sqlalchemy_engine()) as db_session: try: diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 21bfbbfd7..cd7f35bcf 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -31,6 +31,7 @@ from danswer.db.document_set import ( from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import delete_index_attempts from danswer.db.models import ConnectorCredentialPair +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest from danswer.server.documents.models import ConnectorCredentialPairIdentifier @@ -61,7 +62,10 @@ def _delete_connector_credential_pair_batch( document_id for document_id, cnt in document_connector_cnts if cnt == 1 ] logger.debug(f"Deleting documents: {document_ids_to_delete}") - document_index.delete(doc_ids=document_ids_to_delete) + + for index_name in get_both_index_names(): + document_index.delete(doc_ids=document_ids_to_delete, index_name=index_name) + delete_documents_complete( db_session=db_session, document_ids=document_ids_to_delete, @@ -87,7 +91,12 @@ def _delete_connector_credential_pair_batch( for document_id, access in access_for_documents.items() ] logger.debug(f"Updating documents: {document_ids_to_update}") - document_index.update(update_requests=update_requests) + + for index_name in get_both_index_names(): + document_index.update( + update_requests=update_requests, index_name=index_name + ) + delete_document_by_connector_credential_pair( db_session=db_session, document_ids=document_ids_to_update, diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 0d7960159..3bcb367b4 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -26,6 +26,7 @@ from danswer.db.index_attempt import mark_attempt_succeeded from danswer.db.index_attempt import update_docs_indexed from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus +from danswer.document_index.document_index_utils import get_index_name from danswer.indexing.indexing_pipeline import build_indexing_pipeline from danswer.utils.logger import IndexAttemptSingleton from danswer.utils.logger import setup_logger @@ -102,7 +103,9 @@ def _run_indexing( attempt_status=IndexingStatus.IN_PROGRESS, ) - indexing_pipeline = build_indexing_pipeline() + # TODO UPDATE THIS FOR SECONDARY INDEXING + indexing_pipeline = build_indexing_pipeline(index_name=get_index_name()) + db_connector = index_attempt.connector db_credential = index_attempt.credential last_successful_index_time = get_last_successful_attempt_time( diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index 0151634ed..ea5628fa1 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -47,6 +47,12 @@ SECTION_SEPARATOR = "\n\n" # For combining attributes, doesn't have to be unique/perfect to work INDEX_SEPARATOR = "===" +# Index Related +CURRENT_EMBEDDING_MODEL = "CURRENT_EMBEDDING_MODEL" +CURRENT_EMBEDDING_DIM = "CURRENT_EMBEDDING_DIM" +UPCOMING_EMBEDDING_MODEL = "UPCOMING_EMBEDDING_MODEL" +UPCOMING_EMBEDDING_DIM = "UPCOMING_EMBEDDING_DIM" + # Messages DISABLED_GEN_AI_MSG = ( diff --git a/backend/danswer/db/feedback.py b/backend/danswer/db/feedback.py index a1e104770..ad7381375 100644 --- a/backend/danswer/db/feedback.py +++ b/backend/danswer/db/feedback.py @@ -12,6 +12,7 @@ from danswer.db.chat import get_chat_message from danswer.db.models import ChatMessageFeedback from danswer.db.models import Document as DbDocument from danswer.db.models import DocumentRetrievalFeedback +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest @@ -57,7 +58,8 @@ def update_document_boost( boost=boost, ) - document_index.update([update]) + for index_name in get_both_index_names(): + document_index.update(update_requests=[update], index_name=index_name) db_session.commit() @@ -77,7 +79,8 @@ def update_document_hidden( hidden=hidden, ) - document_index.update([update]) + for index_name in get_both_index_names(): + document_index.update(update_requests=[update], index_name=index_name) db_session.commit() @@ -123,7 +126,8 @@ def create_doc_retrieval_feedback( document_ids=[document_id], boost=db_doc.boost, hidden=db_doc.hidden ) # Updates are generally batched for efficiency, this case only 1 doc/value is updated - document_index.update([update]) + for index_name in get_both_index_names(): + document_index.update(update_requests=[update], index_name=index_name) db_session.add(retrieval_feedback) db_session.commit() diff --git a/backend/danswer/document_index/document_index_utils.py b/backend/danswer/document_index/document_index_utils.py index 47986aa5b..ae10bccf9 100644 --- a/backend/danswer/document_index/document_index_utils.py +++ b/backend/danswer/document_index/document_index_utils.py @@ -1,6 +1,11 @@ import math import uuid +from typing import cast +from danswer.configs.constants import CURRENT_EMBEDDING_MODEL +from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL +from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.models import IndexChunk from danswer.indexing.models import InferenceChunk @@ -8,6 +13,40 @@ from danswer.indexing.models import InferenceChunk DEFAULT_BATCH_SIZE = 30 +def clean_model_name(model_str: str) -> str: + return model_str.replace("/", "_").replace("-", "_").replace(".", "_") + + +def get_index_name(secondary_index: bool = False) -> str: + # TODO make this more efficient if needed + kv_store = get_dynamic_config_store() + if not secondary_index: + try: + embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) + return f"danswer_chunk_{clean_model_name(embed_model)}" + except ConfigNotFoundError: + return "danswer_chunk" + + embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) + return f"danswer_chunk_{clean_model_name(embed_model)}" + + +def get_both_index_names() -> list[str]: + kv_store = get_dynamic_config_store() + try: + embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) + indices = [f"danswer_chunk_{clean_model_name(embed_model)}"] + except ConfigNotFoundError: + indices = ["danswer_chunk"] + + try: + embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) + indices.append(f"danswer_chunk_{clean_model_name(embed_model)}") + return indices + except ConfigNotFoundError: + return indices + + def translate_boost_count_to_multiplier(boost: int) -> float: """Mapping boost integer values to a multiplier according to a sigmoid curve Piecewise such that at many downvotes, its 0.5x the score and with many upvotes diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py index c6200d2cc..1943f2369 100644 --- a/backend/danswer/document_index/interfaces.py +++ b/backend/danswer/document_index/interfaces.py @@ -58,7 +58,9 @@ class Verifiable(abc.ABC): class Indexable(abc.ABC): @abc.abstractmethod def index( - self, chunks: list[DocMetadataAwareIndexChunk] + self, + chunks: list[DocMetadataAwareIndexChunk], + index_name: str, ) -> set[DocumentInsertionRecord]: """Indexes document chunks into the Document Index and return the IDs of all the documents indexed""" raise NotImplementedError @@ -66,14 +68,14 @@ class Indexable(abc.ABC): class Deletable(abc.ABC): @abc.abstractmethod - def delete(self, doc_ids: list[str]) -> None: + def delete(self, doc_ids: list[str], index_name: str) -> None: """Removes the specified documents from the Index""" raise NotImplementedError class Updatable(abc.ABC): @abc.abstractmethod - def update(self, update_requests: list[UpdateRequest]) -> None: + def update(self, update_requests: list[UpdateRequest], index_name: str) -> None: """Updates metadata for the specified documents sets in the Index""" raise NotImplementedError @@ -85,6 +87,7 @@ class IdRetrievalCapable(abc.ABC): document_id: str, chunk_ind: int | None, filters: IndexFilters, + index_name: str, ) -> list[InferenceChunk]: raise NotImplementedError @@ -96,6 +99,7 @@ class KeywordCapable(abc.ABC): query: str, filters: IndexFilters, time_decay_multiplier: float, + index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: @@ -109,6 +113,7 @@ class VectorCapable(abc.ABC): query: str, filters: IndexFilters, time_decay_multiplier: float, + index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: @@ -123,6 +128,7 @@ class HybridCapable(abc.ABC): filters: IndexFilters, time_decay_multiplier: float, num_to_retrieve: int, + index_name: str, offset: int = 0, hybrid_alpha: float | None = None, ) -> list[InferenceChunk]: @@ -135,6 +141,7 @@ class AdminCapable(abc.ABC): self, query: str, filters: IndexFilters, + index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: diff --git a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd index 3b6a2ec43..2e545530d 100644 --- a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd +++ b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd @@ -1,5 +1,5 @@ -schema danswer_chunk { - document danswer_chunk { +schema DANSWER_CHUNK_NAME { + document DANSWER_CHUNK_NAME { # Not to be confused with the UUID generated for this chunk which is called documentid by default field document_id type string { indexing: summary | attribute diff --git a/backend/danswer/document_index/vespa/app_config/services.xml b/backend/danswer/document_index/vespa/app_config/services.xml index 492eb225e..01f2c191a 100644 --- a/backend/danswer/document_index/vespa/app_config/services.xml +++ b/backend/danswer/document_index/vespa/app_config/services.xml @@ -13,7 +13,8 @@ 1 - + + DOCUMENT_REPLACEMENT diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 0679231e6..fef57f862 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -19,7 +19,6 @@ import httpx import requests from retry import retry -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP from danswer.configs.app_configs import VESPA_HOST @@ -35,6 +34,8 @@ from danswer.configs.constants import BLURB from danswer.configs.constants import BOOST from danswer.configs.constants import CHUNK_ID from danswer.configs.constants import CONTENT +from danswer.configs.constants import CURRENT_EMBEDDING_DIM +from danswer.configs.constants import CURRENT_EMBEDDING_MODEL from danswer.configs.constants import DOC_UPDATED_AT from danswer.configs.constants import DOCUMENT_ID from danswer.configs.constants import DOCUMENT_SETS @@ -54,16 +55,21 @@ from danswer.configs.constants import SOURCE_TYPE from danswer.configs.constants import TITLE from danswer.configs.constants import TITLE_EMBEDDING from danswer.configs.constants import TITLE_SEPARATOR +from danswer.configs.constants import UPCOMING_EMBEDDING_DIM +from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL from danswer.configs.model_configs import DOC_EMBEDDING_DIM from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, ) +from danswer.document_index.document_index_utils import clean_model_name from danswer.document_index.document_index_utils import get_uuid_from_chunk from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentInsertionRecord from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.vespa.utils import remove_invalid_unicode_chars +from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.models import DocMetadataAwareIndexChunk from danswer.indexing.models import InferenceChunk from danswer.search.models import IndexFilters @@ -77,12 +83,14 @@ from danswer.utils.threadpool_concurrency import run_functions_tuples_in_paralle logger = setup_logger() VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM" +DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME" +DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT" VESPA_CONFIG_SERVER_URL = f"http://{VESPA_HOST}:{VESPA_TENANT_PORT}" VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}" VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" # danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd DOCUMENT_ID_ENDPOINT = ( - f"{VESPA_APP_CONTAINER_URL}/document/v1/default/danswer_chunk/docid" + f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" ) SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" _BATCH_SIZE = 100 # Specific to Vespa @@ -107,12 +115,15 @@ class _VespaUpdateRequest: @retry(tries=3, delay=1, backoff=2) def _does_document_exist( doc_chunk_id: str, + index_name: str, http_client: httpx.Client, ) -> bool: """Returns whether the document already exists and the users/group whitelists Specifically in this case, document refers to a vespa document which is equivalent to a Danswer chunk. This checks for whether the chunk exists already in the index""" - doc_fetch_response = http_client.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}") + doc_fetch_response = http_client.get( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" + ) if doc_fetch_response.status_code == 404: return False @@ -136,6 +147,7 @@ def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None: def _get_vespa_chunk_ids_by_document_id( document_id: str, + index_name: str, hits_per_page: int = _BATCH_SIZE, index_filters: IndexFilters | None = None, ) -> list[str]: @@ -148,7 +160,7 @@ def _get_vespa_chunk_ids_by_document_id( offset = 0 doc_chunk_ids = [] params: dict[str, int | str] = { - "yql": f"select documentid from {DOCUMENT_INDEX_NAME} where {filters_str}document_id contains '{document_id}'", + "yql": f"select documentid from {index_name} where {filters_str}document_id contains '{document_id}'", "timeout": "10s", "offset": offset, "hits": hits_per_page, @@ -168,16 +180,23 @@ def _get_vespa_chunk_ids_by_document_id( @retry(tries=3, delay=1, backoff=2) -def _delete_vespa_doc_chunks(document_id: str, http_client: httpx.Client) -> None: - doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id) +def _delete_vespa_doc_chunks( + document_id: str, index_name: str, http_client: httpx.Client +) -> None: + doc_chunk_ids = _get_vespa_chunk_ids_by_document_id( + document_id=document_id, index_name=index_name + ) for chunk_id in doc_chunk_ids: - res = http_client.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}") + res = http_client.delete( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}" + ) res.raise_for_status() def _delete_vespa_docs( document_ids: list[str], + index_name: str, http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> None: @@ -189,7 +208,9 @@ def _delete_vespa_docs( try: doc_deletion_future = { - executor.submit(_delete_vespa_doc_chunks, doc_id, http_client): doc_id + executor.submit( + _delete_vespa_doc_chunks, doc_id, index_name, http_client + ): doc_id for doc_id in document_ids } for future in concurrent.futures.as_completed(doc_deletion_future): @@ -203,6 +224,7 @@ def _delete_vespa_docs( def _get_existing_documents_from_chunks( chunks: list[DocMetadataAwareIndexChunk], + index_name: str, http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> set[str]: @@ -216,7 +238,10 @@ def _get_existing_documents_from_chunks( try: chunk_existence_future = { executor.submit( - _does_document_exist, str(get_uuid_from_chunk(chunk)), http_client + _does_document_exist, + str(get_uuid_from_chunk(chunk)), + index_name, + http_client, ): chunk for chunk in chunks } @@ -235,7 +260,7 @@ def _get_existing_documents_from_chunks( @retry(tries=3, delay=1, backoff=2) def _index_vespa_chunk( - chunk: DocMetadataAwareIndexChunk, http_client: httpx.Client + chunk: DocMetadataAwareIndexChunk, index_name: str, http_client: httpx.Client ) -> None: json_header = { "Content-Type": "application/json", @@ -280,7 +305,7 @@ def _index_vespa_chunk( DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets}, } - vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}" + vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}" logger.debug(f'Indexing to URL "{vespa_url}"') res = http_client.post( vespa_url, headers=json_header, json={"fields": vespa_document_fields} @@ -296,6 +321,7 @@ def _index_vespa_chunk( def _batch_index_vespa_chunks( chunks: list[DocMetadataAwareIndexChunk], + index_name: str, http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> None: @@ -307,7 +333,7 @@ def _batch_index_vespa_chunks( try: chunk_index_future = { - executor.submit(_index_vespa_chunk, chunk, http_client): chunk + executor.submit(_index_vespa_chunk, chunk, index_name, http_client): chunk for chunk in chunks } for future in concurrent.futures.as_completed(chunk_index_future): @@ -321,6 +347,7 @@ def _batch_index_vespa_chunks( def _clear_and_index_vespa_chunks( chunks: list[DocMetadataAwareIndexChunk], + index_name: str, ) -> set[DocumentInsertionRecord]: """Receive a list of chunks from a batch of documents and index the chunks into Vespa along with updating the associated permissions. Assumes that a document will not be split into @@ -328,7 +355,7 @@ def _clear_and_index_vespa_chunks( chunks will be kept""" existing_docs: set[str] = set() - # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for # indexing / updates / deletes since we have to make a large volume of requests. with ( concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor, @@ -340,18 +367,27 @@ def _clear_and_index_vespa_chunks( for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE): existing_docs.update( _get_existing_documents_from_chunks( - chunks=chunk_batch, http_client=http_client, executor=executor + chunks=chunk_batch, + index_name=index_name, + http_client=http_client, + executor=executor, ) ) for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE): _delete_vespa_docs( - document_ids=doc_id_batch, http_client=http_client, executor=executor + document_ids=doc_id_batch, + index_name=index_name, + http_client=http_client, + executor=executor, ) for chunk_batch in batch_generator(chunks, _BATCH_SIZE): _batch_index_vespa_chunks( - chunks=chunk_batch, http_client=http_client, executor=executor + chunks=chunk_batch, + index_name=index_name, + http_client=http_client, + executor=executor, ) all_doc_ids = {chunk.source_document.id for chunk in chunks} @@ -567,8 +603,10 @@ def _query_vespa(query_params: Mapping[str, str | int | float]) -> list[Inferenc @retry(tries=3, delay=1, backoff=2) -def _inference_chunk_by_vespa_id(vespa_id: str) -> InferenceChunk: - res = requests.get(f"{DOCUMENT_ID_ENDPOINT}/{vespa_id}") +def _inference_chunk_by_vespa_id(vespa_id: str, index_name: str) -> InferenceChunk: + res = requests.get( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_id}" + ) res.raise_for_status() return _vespa_hit_to_inference_chunk(res.json()) @@ -583,6 +621,13 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO: return zip_buffer +def _create_document_xml_lines(doc_names: list[str]) -> str: + doc_lines = [ + f'' for doc_name in doc_names + ] + return "\n".join(doc_lines) + + class VespaIndex(DocumentIndex): yql_base = ( f"select " @@ -602,7 +647,7 @@ class VespaIndex(DocumentIndex): f"{SECONDARY_OWNERS}, " f"{METADATA}, " f"{CONTENT_SUMMARY} " - f"from {DOCUMENT_INDEX_NAME} where " + f"from {{index_name}} where " ) def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None: @@ -625,19 +670,52 @@ class VespaIndex(DocumentIndex): schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd") services_file = os.path.join(vespa_schema_path, "services.xml") - with open(schema_file, "r") as schema_f: - schema = schema_f.read() - schema = schema.replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim)) - schema_bytes = schema.encode("utf-8") + kv_store = get_dynamic_config_store() + try: + curr_embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) + schema_name = f"danswer_chunk_{clean_model_name(curr_embed_model)}" + embedding_dim = cast(int, kv_store.load(CURRENT_EMBEDDING_DIM)) + except ConfigNotFoundError: + schema_name = "danswer_chunk" - with open(services_file, "rb") as services_f: - services_bytes = services_f.read() + doc_names = [schema_name] + + try: + upcoming_embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) + upcoming_schema_name = ( + f"danswer_chunk_{clean_model_name(upcoming_embed_model)}" + ) + upcoming_embedding_dim = cast(int, kv_store.load(UPCOMING_EMBEDDING_DIM)) + doc_names.append(upcoming_schema_name) + except ConfigNotFoundError: + upcoming_schema_name = None + + with open(services_file, "r") as services_f: + services_template = services_f.read() + + doc_lines = _create_document_xml_lines(doc_names) + services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines) zip_dict = { - "schemas/danswer_chunk.sd": schema_bytes, - "services.xml": services_bytes, + "services.xml": services.encode("utf-8"), } + with open(schema_file, "r") as schema_f: + schema_template = schema_f.read() + + schema = schema_template.replace( + DANSWER_CHUNK_REPLACEMENT_PAT, schema_name + ).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim)) + zip_dict[f"schemas/{schema_name}.sd"] = schema.encode("utf-8") + + if upcoming_schema_name: + upcoming_schema = schema_template.replace( + DANSWER_CHUNK_REPLACEMENT_PAT, upcoming_schema_name + ).replace(VESPA_DIM_REPLACEMENT_PAT, str(upcoming_embedding_dim)) + zip_dict[f"schemas/{upcoming_schema_name}.sd"] = upcoming_schema.encode( + "utf-8" + ) + zip_file = in_memory_zip_from_file_bytes(zip_dict) headers = {"Content-Type": "application/zip"} @@ -650,8 +728,9 @@ class VespaIndex(DocumentIndex): def index( self, chunks: list[DocMetadataAwareIndexChunk], + index_name: str, ) -> set[DocumentInsertionRecord]: - return _clear_and_index_vespa_chunks(chunks=chunks) + return _clear_and_index_vespa_chunks(chunks=chunks, index_name=index_name) @staticmethod def _apply_updates_batched( @@ -695,7 +774,7 @@ class VespaIndex(DocumentIndex): failure_msg = f"Failed to update document: {future_to_document_id[future]}" raise requests.HTTPError(failure_msg) from e - def update(self, update_requests: list[UpdateRequest]) -> None: + def update(self, update_requests: list[UpdateRequest], index_name: str) -> None: logger.info(f"Updating {len(update_requests)} documents in Vespa") start = time.time() @@ -724,11 +803,13 @@ class VespaIndex(DocumentIndex): continue for document_id in update_request.document_ids: - for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(document_id): + for doc_chunk_id in _get_vespa_chunk_ids_by_document_id( + document_id=document_id, index_name=index_name + ): processed_updates_requests.append( _VespaUpdateRequest( document_id=document_id, - url=f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}", + url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}", update_request=update_dict, ) ) @@ -738,27 +819,33 @@ class VespaIndex(DocumentIndex): "Finished updating Vespa documents in %s seconds", time.time() - start ) - def delete(self, doc_ids: list[str]) -> None: + def delete(self, doc_ids: list[str], index_name: str) -> None: logger.info(f"Deleting {len(doc_ids)} documents from Vespa") # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for # indexing / updates / deletes since we have to make a large volume of requests. with httpx.Client(http2=True) as http_client: - _delete_vespa_docs(document_ids=doc_ids, http_client=http_client) + _delete_vespa_docs( + document_ids=doc_ids, index_name=index_name, http_client=http_client + ) def id_based_retrieval( - self, document_id: str, chunk_ind: int | None, filters: IndexFilters + self, + document_id: str, + chunk_ind: int | None, + filters: IndexFilters, + index_name: str, ) -> list[InferenceChunk]: if chunk_ind is None: vespa_chunk_ids = _get_vespa_chunk_ids_by_document_id( - document_id=document_id, index_filters=filters + document_id=document_id, index_name=index_name, index_filters=filters ) if not vespa_chunk_ids: return [] functions_with_args: list[tuple[Callable, tuple]] = [ - (_inference_chunk_by_vespa_id, (vespa_chunk_id,)) + (_inference_chunk_by_vespa_id, (vespa_chunk_id, index_name)) for vespa_chunk_id in vespa_chunk_ids ] @@ -774,7 +861,7 @@ class VespaIndex(DocumentIndex): else: filters_str = _build_vespa_filters(filters=filters, include_hidden=True) yql = ( - VespaIndex.yql_base + VespaIndex.yql_base.format(index_name=index_name) + filters_str + f"({DOCUMENT_ID} contains '{document_id}' and {CHUNK_ID} contains '{chunk_ind}')" ) @@ -785,6 +872,7 @@ class VespaIndex(DocumentIndex): query: str, filters: IndexFilters, time_decay_multiplier: float, + index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, edit_keyword_query: bool = EDIT_KEYWORD_QUERY, @@ -792,7 +880,7 @@ class VespaIndex(DocumentIndex): # IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY vespa_where_clauses = _build_vespa_filters(filters) yql = ( - VespaIndex.yql_base + VespaIndex.yql_base.format(index_name=index_name) + vespa_where_clauses # `({defaultIndex: "content_summary"}userInput(@query))` section is # needed for highlighting while the N-gram highlighting is broken / @@ -820,6 +908,7 @@ class VespaIndex(DocumentIndex): query: str, filters: IndexFilters, time_decay_multiplier: float, + index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, @@ -828,7 +917,7 @@ class VespaIndex(DocumentIndex): # IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY vespa_where_clauses = _build_vespa_filters(filters) yql = ( - VespaIndex.yql_base + VespaIndex.yql_base.format(index_name=index_name) + vespa_where_clauses + f"(({{targetHits: {10 * num_to_retrieve}}}nearestNeighbor(embeddings, query_embedding)) " # `({defaultIndex: "content_summary"}userInput(@query))` section is @@ -864,6 +953,7 @@ class VespaIndex(DocumentIndex): filters: IndexFilters, time_decay_multiplier: float, num_to_retrieve: int, + index_name: str, offset: int = 0, hybrid_alpha: float | None = HYBRID_ALPHA, title_content_ratio: float | None = TITLE_CONTENT_RATIO, @@ -874,7 +964,7 @@ class VespaIndex(DocumentIndex): # Needs to be at least as much as the value set in Vespa schema config target_hits = max(10 * num_to_retrieve, 1000) yql = ( - VespaIndex.yql_base + VespaIndex.yql_base.format(index_name=index_name) + vespa_where_clauses + f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) " + f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) " @@ -913,12 +1003,13 @@ class VespaIndex(DocumentIndex): self, query: str, filters: IndexFilters, + index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, ) -> list[InferenceChunk]: vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True) yql = ( - VespaIndex.yql_base + VespaIndex.yql_base.format(index_name=index_name) + vespa_where_clauses + '({grammar: "weakAnd"}userInput(@query) ' # `({defaultIndex: "content_summary"}userInput(@query))` section is diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index be676e035..23fa2bd86 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -96,6 +96,7 @@ def index_doc_batch( chunker: Chunker, embedder: Embedder, document_index: DocumentIndex, + index_name: str, documents: list[Document], index_attempt_metadata: IndexAttemptMetadata, ignore_time_skip: bool = False, @@ -188,7 +189,7 @@ def index_doc_batch( # documents with chunks in this set, are fully represented by the chunks # in this set insertion_records = document_index.index( - chunks=access_aware_chunks, + chunks=access_aware_chunks, index_name=index_name ) successful_doc_ids = [record.document_id for record in insertion_records] @@ -217,6 +218,7 @@ def build_indexing_pipeline( chunker: Chunker | None = None, embedder: Embedder | None = None, document_index: DocumentIndex | None = None, + index_name: str, ignore_time_skip: bool = False, ) -> IndexingPipelineProtocol: """Builds a pipline which takes in a list (batch) of docs and indexes them.""" @@ -231,5 +233,6 @@ def build_indexing_pipeline( chunker=chunker, embedder=embedder, document_index=document_index, + index_name=index_name, ignore_time_skip=ignore_time_skip, ) diff --git a/backend/danswer/search/search_runner.py b/backend/danswer/search/search_runner.py index 54e69c31e..c920d59b6 100644 --- a/backend/danswer/search/search_runner.py +++ b/backend/danswer/search/search_runner.py @@ -17,6 +17,7 @@ from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MAX from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MIN from danswer.configs.model_configs import SIM_SCORE_RANGE_HIGH from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.document_index_utils import ( translate_boost_count_to_multiplier, ) @@ -142,12 +143,14 @@ def doc_index_retrieval( document_index: DocumentIndex, hybrid_alpha: float = HYBRID_ALPHA, ) -> list[InferenceChunk]: + index = get_index_name() if query.search_type == SearchType.KEYWORD: top_chunks = document_index.keyword_retrieval( query=query.query, filters=query.filters, time_decay_multiplier=query.recency_bias_multiplier, num_to_retrieve=query.num_hits, + index_name=index, ) elif query.search_type == SearchType.SEMANTIC: @@ -156,6 +159,7 @@ def doc_index_retrieval( filters=query.filters, time_decay_multiplier=query.recency_bias_multiplier, num_to_retrieve=query.num_hits, + index_name=index, ) elif query.search_type == SearchType.HYBRID: @@ -166,6 +170,7 @@ def doc_index_retrieval( num_to_retrieve=query.num_hits, offset=query.offset, hybrid_alpha=hybrid_alpha, + index_name=index, ) else: diff --git a/backend/danswer/server/danswer_api/ingestion.py b/backend/danswer/server/danswer_api/ingestion.py index 80511ef0d..712310b34 100644 --- a/backend/danswer/server/danswer_api/ingestion.py +++ b/backend/danswer/server/danswer_api/ingestion.py @@ -15,6 +15,7 @@ from danswer.db.connector import fetch_ingestion_connector_by_name from danswer.db.connector_credential_pair import get_connector_credential_pair from danswer.db.credentials import fetch_credential_by_id from danswer.db.engine import get_session +from danswer.document_index.document_index_utils import get_both_index_names from danswer.dynamic_configs import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.indexing_pipeline import build_indexing_pipeline @@ -141,7 +142,11 @@ def document_ingestion( if document.source == DocumentSource.INGESTION_API: document.source = DocumentSource.FILE - indexing_pipeline = build_indexing_pipeline(ignore_time_skip=True) + index_names = get_both_index_names() + + indexing_pipeline = build_indexing_pipeline( + ignore_time_skip=True, index_name=index_names[0] + ) new_doc, chunks = indexing_pipeline( documents=[document], @@ -151,4 +156,14 @@ def document_ingestion( ), ) + # If there's a secondary index being built, index the doc but don't use it for return here + if len(index_names) > 1: + indexing_pipeline( + documents=[document], + index_attempt_metadata=IndexAttemptMetadata( + connector_id=connector_id, + credential_id=credential_id, + ), + ) + return IngestionResult(document_id=document.id, already_existed=not bool(new_doc)) diff --git a/backend/danswer/server/documents/document.py b/backend/danswer/server/documents/document.py index 35784142b..7552322f2 100644 --- a/backend/danswer/server/documents/document.py +++ b/backend/danswer/server/documents/document.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Session from danswer.auth.users import current_user from danswer.db.engine import get_session from danswer.db.models import User +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.llm.utils import get_default_llm_token_encode from danswer.search.access_filters import build_access_filters_for_user @@ -35,6 +36,7 @@ def get_document_info( document_id=document_id, chunk_ind=None, filters=filters, + index_name=get_index_name(), ) if not inference_chunks: @@ -67,6 +69,7 @@ def get_chunk_info( document_id=document_id, chunk_ind=chunk_id, filters=filters, + index_name=get_index_name(), ) if not inference_chunks: diff --git a/backend/danswer/server/query_and_chat/query_backend.py b/backend/danswer/server/query_and_chat/query_backend.py index 8d2c264b6..27d02973d 100644 --- a/backend/danswer/server/query_and_chat/query_backend.py +++ b/backend/danswer/server/query_and_chat/query_backend.py @@ -10,6 +10,7 @@ from danswer.configs.constants import DocumentSource from danswer.db.engine import get_session from danswer.db.models import User from danswer.db.tag import get_tags_by_value_prefix_for_source_types +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.document_index.vespa.index import VespaIndex from danswer.one_shot_answer.answer_question import stream_search_answer @@ -59,7 +60,9 @@ def admin_search( detail="Cannot use admin-search when using a non-Vespa document index", ) - matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters) + matching_chunks = document_index.admin_retrieval( + query=query, filters=final_filters, index_name=get_index_name() + ) documents = chunks_to_search_docs(matching_chunks) diff --git a/backend/danswer/utils/acl.py b/backend/danswer/utils/acl.py index 3cc31ccf2..01256a813 100644 --- a/backend/danswer/utils/acl.py +++ b/backend/danswer/utils/acl.py @@ -7,6 +7,7 @@ from danswer.access.models import DocumentAccess from danswer.db.document import get_acccess_info_for_documents from danswer.db.engine import get_sqlalchemy_engine from danswer.db.models import Document +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.vespa.index import VespaIndex @@ -37,22 +38,23 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None: logger.info("Populating Access Control List fields in Vespa") with Session(get_sqlalchemy_engine()) as db_session: - # for all documents, set the `access_control_list` field apporpriately + # for all documents, set the `access_control_list` field appropriately # based on the state of Postgres documents = db_session.scalars(select(Document)).all() document_access_info = get_acccess_info_for_documents( db_session=db_session, document_ids=[document.id for document in documents], ) - vespa_index.update( - update_requests=[ + + for index_name in get_both_index_names(): + update_requests = [ UpdateRequest( document_ids=[document_id], access=DocumentAccess.build(user_ids, is_public), ) for document_id, user_ids, is_public in document_access_info - ], - ) + ] + vespa_index.update(update_requests=update_requests, index_name=index_name) dynamic_config_store.store(_COMPLETED_ACL_UPDATE_KEY, True)