Multiple Indices in Vespa (#1000)

This commit is contained in:
Yuhong Sun 2024-01-25 13:56:29 -08:00 committed by GitHub
parent d6d83e79f1
commit d0fa02c8dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 258 additions and 64 deletions

View File

@ -28,6 +28,7 @@ from danswer.db.engine import SYNC_DB_API
from danswer.db.models import DocumentSet
from danswer.db.tasks import check_live_task_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
@ -114,15 +115,17 @@ def sync_document_set_task(document_set_id: int) -> None:
}
# update Vespa
document_index.update(
update_requests=[
for index_name in get_both_index_names():
update_requests = [
UpdateRequest(
document_ids=[document_id],
document_sets=set(document_set_map.get(document_id, [])),
)
for document_id in document_ids
]
)
document_index.update(
update_requests=update_requests, index_name=index_name
)
with Session(get_sqlalchemy_engine()) as db_session:
try:

View File

@ -31,6 +31,7 @@ from danswer.db.document_set import (
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import delete_index_attempts
from danswer.db.models import ConnectorCredentialPair
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
@ -61,7 +62,10 @@ def _delete_connector_credential_pair_batch(
document_id for document_id, cnt in document_connector_cnts if cnt == 1
]
logger.debug(f"Deleting documents: {document_ids_to_delete}")
document_index.delete(doc_ids=document_ids_to_delete)
for index_name in get_both_index_names():
document_index.delete(doc_ids=document_ids_to_delete, index_name=index_name)
delete_documents_complete(
db_session=db_session,
document_ids=document_ids_to_delete,
@ -87,7 +91,12 @@ def _delete_connector_credential_pair_batch(
for document_id, access in access_for_documents.items()
]
logger.debug(f"Updating documents: {document_ids_to_update}")
document_index.update(update_requests=update_requests)
for index_name in get_both_index_names():
document_index.update(
update_requests=update_requests, index_name=index_name
)
delete_document_by_connector_credential_pair(
db_session=db_session,
document_ids=document_ids_to_update,

View File

@ -26,6 +26,7 @@ from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.document_index.document_index_utils import get_index_name
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
from danswer.utils.logger import IndexAttemptSingleton
from danswer.utils.logger import setup_logger
@ -102,7 +103,9 @@ def _run_indexing(
attempt_status=IndexingStatus.IN_PROGRESS,
)
indexing_pipeline = build_indexing_pipeline()
# TODO UPDATE THIS FOR SECONDARY INDEXING
indexing_pipeline = build_indexing_pipeline(index_name=get_index_name())
db_connector = index_attempt.connector
db_credential = index_attempt.credential
last_successful_index_time = get_last_successful_attempt_time(

View File

@ -47,6 +47,12 @@ SECTION_SEPARATOR = "\n\n"
# For combining attributes, doesn't have to be unique/perfect to work
INDEX_SEPARATOR = "==="
# Index Related
CURRENT_EMBEDDING_MODEL = "CURRENT_EMBEDDING_MODEL"
CURRENT_EMBEDDING_DIM = "CURRENT_EMBEDDING_DIM"
UPCOMING_EMBEDDING_MODEL = "UPCOMING_EMBEDDING_MODEL"
UPCOMING_EMBEDDING_DIM = "UPCOMING_EMBEDDING_DIM"
# Messages
DISABLED_GEN_AI_MSG = (

View File

@ -12,6 +12,7 @@ from danswer.db.chat import get_chat_message
from danswer.db.models import ChatMessageFeedback
from danswer.db.models import Document as DbDocument
from danswer.db.models import DocumentRetrievalFeedback
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
@ -57,7 +58,8 @@ def update_document_boost(
boost=boost,
)
document_index.update([update])
for index_name in get_both_index_names():
document_index.update(update_requests=[update], index_name=index_name)
db_session.commit()
@ -77,7 +79,8 @@ def update_document_hidden(
hidden=hidden,
)
document_index.update([update])
for index_name in get_both_index_names():
document_index.update(update_requests=[update], index_name=index_name)
db_session.commit()
@ -123,7 +126,8 @@ def create_doc_retrieval_feedback(
document_ids=[document_id], boost=db_doc.boost, hidden=db_doc.hidden
)
# Updates are generally batched for efficiency, this case only 1 doc/value is updated
document_index.update([update])
for index_name in get_both_index_names():
document_index.update(update_requests=[update], index_name=index_name)
db_session.add(retrieval_feedback)
db_session.commit()

View File

@ -1,6 +1,11 @@
import math
import uuid
from typing import cast
from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.models import IndexChunk
from danswer.indexing.models import InferenceChunk
@ -8,6 +13,40 @@ from danswer.indexing.models import InferenceChunk
DEFAULT_BATCH_SIZE = 30
def clean_model_name(model_str: str) -> str:
return model_str.replace("/", "_").replace("-", "_").replace(".", "_")
def get_index_name(secondary_index: bool = False) -> str:
# TODO make this more efficient if needed
kv_store = get_dynamic_config_store()
if not secondary_index:
try:
embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
return f"danswer_chunk_{clean_model_name(embed_model)}"
except ConfigNotFoundError:
return "danswer_chunk"
embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
return f"danswer_chunk_{clean_model_name(embed_model)}"
def get_both_index_names() -> list[str]:
kv_store = get_dynamic_config_store()
try:
embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
indices = [f"danswer_chunk_{clean_model_name(embed_model)}"]
except ConfigNotFoundError:
indices = ["danswer_chunk"]
try:
embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
indices.append(f"danswer_chunk_{clean_model_name(embed_model)}")
return indices
except ConfigNotFoundError:
return indices
def translate_boost_count_to_multiplier(boost: int) -> float:
"""Mapping boost integer values to a multiplier according to a sigmoid curve
Piecewise such that at many downvotes, its 0.5x the score and with many upvotes

View File

@ -58,7 +58,9 @@ class Verifiable(abc.ABC):
class Indexable(abc.ABC):
@abc.abstractmethod
def index(
self, chunks: list[DocMetadataAwareIndexChunk]
self,
chunks: list[DocMetadataAwareIndexChunk],
index_name: str,
) -> set[DocumentInsertionRecord]:
"""Indexes document chunks into the Document Index and return the IDs of all the documents indexed"""
raise NotImplementedError
@ -66,14 +68,14 @@ class Indexable(abc.ABC):
class Deletable(abc.ABC):
@abc.abstractmethod
def delete(self, doc_ids: list[str]) -> None:
def delete(self, doc_ids: list[str], index_name: str) -> None:
"""Removes the specified documents from the Index"""
raise NotImplementedError
class Updatable(abc.ABC):
@abc.abstractmethod
def update(self, update_requests: list[UpdateRequest]) -> None:
def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
"""Updates metadata for the specified documents sets in the Index"""
raise NotImplementedError
@ -85,6 +87,7 @@ class IdRetrievalCapable(abc.ABC):
document_id: str,
chunk_ind: int | None,
filters: IndexFilters,
index_name: str,
) -> list[InferenceChunk]:
raise NotImplementedError
@ -96,6 +99,7 @@ class KeywordCapable(abc.ABC):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
@ -109,6 +113,7 @@ class VectorCapable(abc.ABC):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
@ -123,6 +128,7 @@ class HybridCapable(abc.ABC):
filters: IndexFilters,
time_decay_multiplier: float,
num_to_retrieve: int,
index_name: str,
offset: int = 0,
hybrid_alpha: float | None = None,
) -> list[InferenceChunk]:
@ -135,6 +141,7 @@ class AdminCapable(abc.ABC):
self,
query: str,
filters: IndexFilters,
index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:

View File

@ -1,5 +1,5 @@
schema danswer_chunk {
document danswer_chunk {
schema DANSWER_CHUNK_NAME {
document DANSWER_CHUNK_NAME {
# Not to be confused with the UUID generated for this chunk which is called documentid by default
field document_id type string {
indexing: summary | attribute

View File

@ -13,7 +13,8 @@
<content id="danswer_index" version="1.0">
<redundancy>1</redundancy>
<documents>
<document type="danswer_chunk" mode="index" />
<!-- <document type="danswer_chunk" mode="index" /> -->
DOCUMENT_REPLACEMENT
</documents>
<nodes>
<node hostalias="danswer-node" distribution-key="0" />

View File

@ -19,7 +19,6 @@ import httpx
import requests
from retry import retry
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION
from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP
from danswer.configs.app_configs import VESPA_HOST
@ -35,6 +34,8 @@ from danswer.configs.constants import BLURB
from danswer.configs.constants import BOOST
from danswer.configs.constants import CHUNK_ID
from danswer.configs.constants import CONTENT
from danswer.configs.constants import CURRENT_EMBEDDING_DIM
from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
from danswer.configs.constants import DOC_UPDATED_AT
from danswer.configs.constants import DOCUMENT_ID
from danswer.configs.constants import DOCUMENT_SETS
@ -54,16 +55,21 @@ from danswer.configs.constants import SOURCE_TYPE
from danswer.configs.constants import TITLE
from danswer.configs.constants import TITLE_EMBEDDING
from danswer.configs.constants import TITLE_SEPARATOR
from danswer.configs.constants import UPCOMING_EMBEDDING_DIM
from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
)
from danswer.document_index.document_index_utils import clean_model_name
from danswer.document_index.document_index_utils import get_uuid_from_chunk
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import DocumentInsertionRecord
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.vespa.utils import remove_invalid_unicode_chars
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.indexing.models import InferenceChunk
from danswer.search.models import IndexFilters
@ -77,12 +83,14 @@ from danswer.utils.threadpool_concurrency import run_functions_tuples_in_paralle
logger = setup_logger()
VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM"
DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME"
DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT"
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_HOST}:{VESPA_TENANT_PORT}"
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd
DOCUMENT_ID_ENDPOINT = (
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/danswer_chunk/docid"
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid"
)
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
_BATCH_SIZE = 100 # Specific to Vespa
@ -107,12 +115,15 @@ class _VespaUpdateRequest:
@retry(tries=3, delay=1, backoff=2)
def _does_document_exist(
doc_chunk_id: str,
index_name: str,
http_client: httpx.Client,
) -> bool:
"""Returns whether the document already exists and the users/group whitelists
Specifically in this case, document refers to a vespa document which is equivalent to a Danswer
chunk. This checks for whether the chunk exists already in the index"""
doc_fetch_response = http_client.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}")
doc_fetch_response = http_client.get(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}"
)
if doc_fetch_response.status_code == 404:
return False
@ -136,6 +147,7 @@ def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None:
def _get_vespa_chunk_ids_by_document_id(
document_id: str,
index_name: str,
hits_per_page: int = _BATCH_SIZE,
index_filters: IndexFilters | None = None,
) -> list[str]:
@ -148,7 +160,7 @@ def _get_vespa_chunk_ids_by_document_id(
offset = 0
doc_chunk_ids = []
params: dict[str, int | str] = {
"yql": f"select documentid from {DOCUMENT_INDEX_NAME} where {filters_str}document_id contains '{document_id}'",
"yql": f"select documentid from {index_name} where {filters_str}document_id contains '{document_id}'",
"timeout": "10s",
"offset": offset,
"hits": hits_per_page,
@ -168,16 +180,23 @@ def _get_vespa_chunk_ids_by_document_id(
@retry(tries=3, delay=1, backoff=2)
def _delete_vespa_doc_chunks(document_id: str, http_client: httpx.Client) -> None:
doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id)
def _delete_vespa_doc_chunks(
document_id: str, index_name: str, http_client: httpx.Client
) -> None:
doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(
document_id=document_id, index_name=index_name
)
for chunk_id in doc_chunk_ids:
res = http_client.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}")
res = http_client.delete(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}"
)
res.raise_for_status()
def _delete_vespa_docs(
document_ids: list[str],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None:
@ -189,7 +208,9 @@ def _delete_vespa_docs(
try:
doc_deletion_future = {
executor.submit(_delete_vespa_doc_chunks, doc_id, http_client): doc_id
executor.submit(
_delete_vespa_doc_chunks, doc_id, index_name, http_client
): doc_id
for doc_id in document_ids
}
for future in concurrent.futures.as_completed(doc_deletion_future):
@ -203,6 +224,7 @@ def _delete_vespa_docs(
def _get_existing_documents_from_chunks(
chunks: list[DocMetadataAwareIndexChunk],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> set[str]:
@ -216,7 +238,10 @@ def _get_existing_documents_from_chunks(
try:
chunk_existence_future = {
executor.submit(
_does_document_exist, str(get_uuid_from_chunk(chunk)), http_client
_does_document_exist,
str(get_uuid_from_chunk(chunk)),
index_name,
http_client,
): chunk
for chunk in chunks
}
@ -235,7 +260,7 @@ def _get_existing_documents_from_chunks(
@retry(tries=3, delay=1, backoff=2)
def _index_vespa_chunk(
chunk: DocMetadataAwareIndexChunk, http_client: httpx.Client
chunk: DocMetadataAwareIndexChunk, index_name: str, http_client: httpx.Client
) -> None:
json_header = {
"Content-Type": "application/json",
@ -280,7 +305,7 @@ def _index_vespa_chunk(
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
vespa_url = f"{DOCUMENT_ID_ENDPOINT}/{vespa_chunk_id}"
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}"
logger.debug(f'Indexing to URL "{vespa_url}"')
res = http_client.post(
vespa_url, headers=json_header, json={"fields": vespa_document_fields}
@ -296,6 +321,7 @@ def _index_vespa_chunk(
def _batch_index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None:
@ -307,7 +333,7 @@ def _batch_index_vespa_chunks(
try:
chunk_index_future = {
executor.submit(_index_vespa_chunk, chunk, http_client): chunk
executor.submit(_index_vespa_chunk, chunk, index_name, http_client): chunk
for chunk in chunks
}
for future in concurrent.futures.as_completed(chunk_index_future):
@ -321,6 +347,7 @@ def _batch_index_vespa_chunks(
def _clear_and_index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk],
index_name: str,
) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
@ -328,7 +355,7 @@ def _clear_and_index_vespa_chunks(
chunks will be kept"""
existing_docs: set[str] = set()
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor,
@ -340,18 +367,27 @@ def _clear_and_index_vespa_chunks(
for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE):
existing_docs.update(
_get_existing_documents_from_chunks(
chunks=chunk_batch, http_client=http_client, executor=executor
chunks=chunk_batch,
index_name=index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE):
_delete_vespa_docs(
document_ids=doc_id_batch, http_client=http_client, executor=executor
document_ids=doc_id_batch,
index_name=index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(chunks, _BATCH_SIZE):
_batch_index_vespa_chunks(
chunks=chunk_batch, http_client=http_client, executor=executor
chunks=chunk_batch,
index_name=index_name,
http_client=http_client,
executor=executor,
)
all_doc_ids = {chunk.source_document.id for chunk in chunks}
@ -567,8 +603,10 @@ def _query_vespa(query_params: Mapping[str, str | int | float]) -> list[Inferenc
@retry(tries=3, delay=1, backoff=2)
def _inference_chunk_by_vespa_id(vespa_id: str) -> InferenceChunk:
res = requests.get(f"{DOCUMENT_ID_ENDPOINT}/{vespa_id}")
def _inference_chunk_by_vespa_id(vespa_id: str, index_name: str) -> InferenceChunk:
res = requests.get(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_id}"
)
res.raise_for_status()
return _vespa_hit_to_inference_chunk(res.json())
@ -583,6 +621,13 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
return zip_buffer
def _create_document_xml_lines(doc_names: list[str]) -> str:
doc_lines = [
f'<document type="{doc_name}" mode="index" />' for doc_name in doc_names
]
return "\n".join(doc_lines)
class VespaIndex(DocumentIndex):
yql_base = (
f"select "
@ -602,7 +647,7 @@ class VespaIndex(DocumentIndex):
f"{SECONDARY_OWNERS}, "
f"{METADATA}, "
f"{CONTENT_SUMMARY} "
f"from {DOCUMENT_INDEX_NAME} where "
f"from {{index_name}} where "
)
def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None:
@ -625,19 +670,52 @@ class VespaIndex(DocumentIndex):
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
services_file = os.path.join(vespa_schema_path, "services.xml")
with open(schema_file, "r") as schema_f:
schema = schema_f.read()
schema = schema.replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
schema_bytes = schema.encode("utf-8")
kv_store = get_dynamic_config_store()
try:
curr_embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
schema_name = f"danswer_chunk_{clean_model_name(curr_embed_model)}"
embedding_dim = cast(int, kv_store.load(CURRENT_EMBEDDING_DIM))
except ConfigNotFoundError:
schema_name = "danswer_chunk"
with open(services_file, "rb") as services_f:
services_bytes = services_f.read()
doc_names = [schema_name]
try:
upcoming_embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
upcoming_schema_name = (
f"danswer_chunk_{clean_model_name(upcoming_embed_model)}"
)
upcoming_embedding_dim = cast(int, kv_store.load(UPCOMING_EMBEDDING_DIM))
doc_names.append(upcoming_schema_name)
except ConfigNotFoundError:
upcoming_schema_name = None
with open(services_file, "r") as services_f:
services_template = services_f.read()
doc_lines = _create_document_xml_lines(doc_names)
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
zip_dict = {
"schemas/danswer_chunk.sd": schema_bytes,
"services.xml": services_bytes,
"services.xml": services.encode("utf-8"),
}
with open(schema_file, "r") as schema_f:
schema_template = schema_f.read()
schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, schema_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
zip_dict[f"schemas/{schema_name}.sd"] = schema.encode("utf-8")
if upcoming_schema_name:
upcoming_schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, upcoming_schema_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(upcoming_embedding_dim))
zip_dict[f"schemas/{upcoming_schema_name}.sd"] = upcoming_schema.encode(
"utf-8"
)
zip_file = in_memory_zip_from_file_bytes(zip_dict)
headers = {"Content-Type": "application/zip"}
@ -650,8 +728,9 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
index_name: str,
) -> set[DocumentInsertionRecord]:
return _clear_and_index_vespa_chunks(chunks=chunks)
return _clear_and_index_vespa_chunks(chunks=chunks, index_name=index_name)
@staticmethod
def _apply_updates_batched(
@ -695,7 +774,7 @@ class VespaIndex(DocumentIndex):
failure_msg = f"Failed to update document: {future_to_document_id[future]}"
raise requests.HTTPError(failure_msg) from e
def update(self, update_requests: list[UpdateRequest]) -> None:
def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
logger.info(f"Updating {len(update_requests)} documents in Vespa")
start = time.time()
@ -724,11 +803,13 @@ class VespaIndex(DocumentIndex):
continue
for document_id in update_request.document_ids:
for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(document_id):
for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(
document_id=document_id, index_name=index_name
):
processed_updates_requests.append(
_VespaUpdateRequest(
document_id=document_id,
url=f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}",
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
update_request=update_dict,
)
)
@ -738,27 +819,33 @@ class VespaIndex(DocumentIndex):
"Finished updating Vespa documents in %s seconds", time.time() - start
)
def delete(self, doc_ids: list[str]) -> None:
def delete(self, doc_ids: list[str], index_name: str) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
_delete_vespa_docs(document_ids=doc_ids, http_client=http_client)
_delete_vespa_docs(
document_ids=doc_ids, index_name=index_name, http_client=http_client
)
def id_based_retrieval(
self, document_id: str, chunk_ind: int | None, filters: IndexFilters
self,
document_id: str,
chunk_ind: int | None,
filters: IndexFilters,
index_name: str,
) -> list[InferenceChunk]:
if chunk_ind is None:
vespa_chunk_ids = _get_vespa_chunk_ids_by_document_id(
document_id=document_id, index_filters=filters
document_id=document_id, index_name=index_name, index_filters=filters
)
if not vespa_chunk_ids:
return []
functions_with_args: list[tuple[Callable, tuple]] = [
(_inference_chunk_by_vespa_id, (vespa_chunk_id,))
(_inference_chunk_by_vespa_id, (vespa_chunk_id, index_name))
for vespa_chunk_id in vespa_chunk_ids
]
@ -774,7 +861,7 @@ class VespaIndex(DocumentIndex):
else:
filters_str = _build_vespa_filters(filters=filters, include_hidden=True)
yql = (
VespaIndex.yql_base
VespaIndex.yql_base.format(index_name=index_name)
+ filters_str
+ f"({DOCUMENT_ID} contains '{document_id}' and {CHUNK_ID} contains '{chunk_ind}')"
)
@ -785,6 +872,7 @@ class VespaIndex(DocumentIndex):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
edit_keyword_query: bool = EDIT_KEYWORD_QUERY,
@ -792,7 +880,7 @@ class VespaIndex(DocumentIndex):
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
vespa_where_clauses = _build_vespa_filters(filters)
yql = (
VespaIndex.yql_base
VespaIndex.yql_base.format(index_name=index_name)
+ vespa_where_clauses
# `({defaultIndex: "content_summary"}userInput(@query))` section is
# needed for highlighting while the N-gram highlighting is broken /
@ -820,6 +908,7 @@ class VespaIndex(DocumentIndex):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
@ -828,7 +917,7 @@ class VespaIndex(DocumentIndex):
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
vespa_where_clauses = _build_vespa_filters(filters)
yql = (
VespaIndex.yql_base
VespaIndex.yql_base.format(index_name=index_name)
+ vespa_where_clauses
+ f"(({{targetHits: {10 * num_to_retrieve}}}nearestNeighbor(embeddings, query_embedding)) "
# `({defaultIndex: "content_summary"}userInput(@query))` section is
@ -864,6 +953,7 @@ class VespaIndex(DocumentIndex):
filters: IndexFilters,
time_decay_multiplier: float,
num_to_retrieve: int,
index_name: str,
offset: int = 0,
hybrid_alpha: float | None = HYBRID_ALPHA,
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
@ -874,7 +964,7 @@ class VespaIndex(DocumentIndex):
# Needs to be at least as much as the value set in Vespa schema config
target_hits = max(10 * num_to_retrieve, 1000)
yql = (
VespaIndex.yql_base
VespaIndex.yql_base.format(index_name=index_name)
+ vespa_where_clauses
+ f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) "
+ f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) "
@ -913,12 +1003,13 @@ class VespaIndex(DocumentIndex):
self,
query: str,
filters: IndexFilters,
index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
) -> list[InferenceChunk]:
vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True)
yql = (
VespaIndex.yql_base
VespaIndex.yql_base.format(index_name=index_name)
+ vespa_where_clauses
+ '({grammar: "weakAnd"}userInput(@query) '
# `({defaultIndex: "content_summary"}userInput(@query))` section is

View File

@ -96,6 +96,7 @@ def index_doc_batch(
chunker: Chunker,
embedder: Embedder,
document_index: DocumentIndex,
index_name: str,
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
ignore_time_skip: bool = False,
@ -188,7 +189,7 @@ def index_doc_batch(
# documents with chunks in this set, are fully represented by the chunks
# in this set
insertion_records = document_index.index(
chunks=access_aware_chunks,
chunks=access_aware_chunks, index_name=index_name
)
successful_doc_ids = [record.document_id for record in insertion_records]
@ -217,6 +218,7 @@ def build_indexing_pipeline(
chunker: Chunker | None = None,
embedder: Embedder | None = None,
document_index: DocumentIndex | None = None,
index_name: str,
ignore_time_skip: bool = False,
) -> IndexingPipelineProtocol:
"""Builds a pipline which takes in a list (batch) of docs and indexes them."""
@ -231,5 +233,6 @@ def build_indexing_pipeline(
chunker=chunker,
embedder=embedder,
document_index=document_index,
index_name=index_name,
ignore_time_skip=ignore_time_skip,
)

View File

@ -17,6 +17,7 @@ from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MAX
from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MIN
from danswer.configs.model_configs import SIM_SCORE_RANGE_HIGH
from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW
from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.document_index_utils import (
translate_boost_count_to_multiplier,
)
@ -142,12 +143,14 @@ def doc_index_retrieval(
document_index: DocumentIndex,
hybrid_alpha: float = HYBRID_ALPHA,
) -> list[InferenceChunk]:
index = get_index_name()
if query.search_type == SearchType.KEYWORD:
top_chunks = document_index.keyword_retrieval(
query=query.query,
filters=query.filters,
time_decay_multiplier=query.recency_bias_multiplier,
num_to_retrieve=query.num_hits,
index_name=index,
)
elif query.search_type == SearchType.SEMANTIC:
@ -156,6 +159,7 @@ def doc_index_retrieval(
filters=query.filters,
time_decay_multiplier=query.recency_bias_multiplier,
num_to_retrieve=query.num_hits,
index_name=index,
)
elif query.search_type == SearchType.HYBRID:
@ -166,6 +170,7 @@ def doc_index_retrieval(
num_to_retrieve=query.num_hits,
offset=query.offset,
hybrid_alpha=hybrid_alpha,
index_name=index,
)
else:

View File

@ -15,6 +15,7 @@ from danswer.db.connector import fetch_ingestion_connector_by_name
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.engine import get_session
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
@ -141,7 +142,11 @@ def document_ingestion(
if document.source == DocumentSource.INGESTION_API:
document.source = DocumentSource.FILE
indexing_pipeline = build_indexing_pipeline(ignore_time_skip=True)
index_names = get_both_index_names()
indexing_pipeline = build_indexing_pipeline(
ignore_time_skip=True, index_name=index_names[0]
)
new_doc, chunks = indexing_pipeline(
documents=[document],
@ -151,4 +156,14 @@ def document_ingestion(
),
)
# If there's a secondary index being built, index the doc but don't use it for return here
if len(index_names) > 1:
indexing_pipeline(
documents=[document],
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
credential_id=credential_id,
),
)
return IngestionResult(document_id=document.id, already_existed=not bool(new_doc))

View File

@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from danswer.auth.users import current_user
from danswer.db.engine import get_session
from danswer.db.models import User
from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.llm.utils import get_default_llm_token_encode
from danswer.search.access_filters import build_access_filters_for_user
@ -35,6 +36,7 @@ def get_document_info(
document_id=document_id,
chunk_ind=None,
filters=filters,
index_name=get_index_name(),
)
if not inference_chunks:
@ -67,6 +69,7 @@ def get_chunk_info(
document_id=document_id,
chunk_ind=chunk_id,
filters=filters,
index_name=get_index_name(),
)
if not inference_chunks:

View File

@ -10,6 +10,7 @@ from danswer.configs.constants import DocumentSource
from danswer.db.engine import get_session
from danswer.db.models import User
from danswer.db.tag import get_tags_by_value_prefix_for_source_types
from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.vespa.index import VespaIndex
from danswer.one_shot_answer.answer_question import stream_search_answer
@ -59,7 +60,9 @@ def admin_search(
detail="Cannot use admin-search when using a non-Vespa document index",
)
matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters)
matching_chunks = document_index.admin_retrieval(
query=query, filters=final_filters, index_name=get_index_name()
)
documents = chunks_to_search_docs(matching_chunks)

View File

@ -7,6 +7,7 @@ from danswer.access.models import DocumentAccess
from danswer.db.document import get_acccess_info_for_documents
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.models import Document
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.vespa.index import VespaIndex
@ -37,22 +38,23 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None:
logger.info("Populating Access Control List fields in Vespa")
with Session(get_sqlalchemy_engine()) as db_session:
# for all documents, set the `access_control_list` field apporpriately
# for all documents, set the `access_control_list` field appropriately
# based on the state of Postgres
documents = db_session.scalars(select(Document)).all()
document_access_info = get_acccess_info_for_documents(
db_session=db_session,
document_ids=[document.id for document in documents],
)
vespa_index.update(
update_requests=[
for index_name in get_both_index_names():
update_requests = [
UpdateRequest(
document_ids=[document_id],
access=DocumentAccess.build(user_ids, is_public),
)
for document_id, user_ids, is_public in document_access_info
],
)
]
vespa_index.update(update_requests=update_requests, index_name=index_name)
dynamic_config_store.store(_COMPLETED_ACL_UPDATE_KEY, True)