diff --git a/backend/alembic/env.py b/backend/alembic/env.py
index 556f31496..a201c0a1f 100644
--- a/backend/alembic/env.py
+++ b/backend/alembic/env.py
@@ -23,7 +23,7 @@ if config.config_file_name is not None:
# target_metadata = mymodel.Base.metadata
target_metadata = [Base.metadata, ResultModelBase.metadata]
-def get_schema_options() -> Tuple[str, bool]:
+def get_schema_options() -> str:
x_args_raw = context.get_x_argument()
x_args = {}
for arg in x_args_raw:
diff --git a/backend/danswer/auth/schemas.py b/backend/danswer/auth/schemas.py
index 31a67b47d..118cc7baf 100644
--- a/backend/danswer/auth/schemas.py
+++ b/backend/danswer/auth/schemas.py
@@ -33,7 +33,7 @@ class UserRead(schemas.BaseUser[uuid.UUID]):
class UserCreate(schemas.BaseUserCreate):
role: UserRole = UserRole.BASIC
- tenant_id: int | None = None
+ tenant_id: str | None = None
class UserUpdate(schemas.BaseUserUpdate):
diff --git a/backend/danswer/background/celery/celery_app.py b/backend/danswer/background/celery/celery_app.py
index a013fb9da..9a95e880b 100644
--- a/backend/danswer/background/celery/celery_app.py
+++ b/backend/danswer/background/celery/celery_app.py
@@ -451,14 +451,11 @@ def check_for_prune_task(tenant_id: str | None) -> None:
# Celery Beat (Periodic Tasks) Settings
#####
-def schedule_tenant_tasks():
- if MULTI_TENANT:
- tenants = get_all_tenant_ids()
- else:
- tenants = [None]
+def schedule_tenant_tasks() -> None:
+ tenants = get_all_tenant_ids()
# Filter out any invalid tenants if necessary
- valid_tenants = [tenant for tenant in tenants if not tenant.startswith('pg_')]
+ valid_tenants = [tenant for tenant in tenants if tenant is None or not tenant.startswith('pg_')]
logger.info(f"Scheduling tasks for tenants: {valid_tenants}")
for tenant_id in valid_tenants:
diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py
index d900d8d98..bc1758c21 100644
--- a/backend/danswer/background/connector_deletion.py
+++ b/backend/danswer/background/connector_deletion.py
@@ -47,7 +47,7 @@ def delete_connector_credential_pair_batch(
connector_id: int,
credential_id: int,
document_index: DocumentIndex,
- tenant_id: str = DEFAULT_SCHEMA,
+ tenant_id: str | None
) -> None:
"""
Removes a batch of documents ids from a cc-pair. If no other cc-pair uses a document anymore
@@ -126,7 +126,7 @@ def delete_connector_credential_pair(
db_session: Session,
document_index: DocumentIndex,
cc_pair: ConnectorCredentialPair,
- tenant_id: str = DEFAULT_SCHEMA,
+ tenant_id: str | None
) -> int:
connector_id = cc_pair.connector_id
credential_id = cc_pair.credential_id
diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py
index 67536e0ed..9a8521882 100644
--- a/backend/danswer/background/indexing/run_indexing.py
+++ b/backend/danswer/background/indexing/run_indexing.py
@@ -409,7 +409,7 @@ def run_indexing_entrypoint(index_attempt_id: int, tenant_id: str | None, is_ee:
_run_indexing(db_session, attempt, tenant_id)
logger.info(
- f"Indexing finished for tenant {tenant_id}: "
+ f"Indexing finished for tenant {tenant_id}: " if tenant_id is not None else "" +
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
diff --git a/backend/danswer/background/task_utils.py b/backend/danswer/background/task_utils.py
index 3a1369c94..92ef2206e 100644
--- a/backend/danswer/background/task_utils.py
+++ b/backend/danswer/background/task_utils.py
@@ -14,7 +14,7 @@ from danswer.db.tasks import mark_task_start
from danswer.db.tasks import register_task
-def name_cc_cleanup_task(connector_id: int, credential_id: int, tenant_id: str | None) -> str:
+def name_cc_cleanup_task(connector_id: int, credential_id: int, tenant_id: str | None = None) -> str:
task_name = f"cleanup_connector_credential_pair_{connector_id}_{credential_id}"
if tenant_id is not None:
task_name += f"_{tenant_id}"
diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py
index 695de8481..929440c53 100755
--- a/backend/danswer/background/update.py
+++ b/backend/danswer/background/update.py
@@ -183,7 +183,6 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob], tenant_id
all_connector_credential_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair in all_connector_credential_pairs:
- logger.info(f"iterating! {cc_pair.id}")
for search_settings_instance in search_settings:
# Check if there is an ongoing indexing attempt for this connector credential pair
if (cc_pair.id, search_settings_instance.id) in ongoing:
@@ -199,9 +198,8 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob], tenant_id
secondary_index_building=len(search_settings) > 1,
db_session=db_session,
):
- logger.info(f"Not creating new indexing job for {cc_pair.id} and {search_settings_instance.id}")
continue
- logger.info(f"Creating new indexing job for {cc_pair.id} and {search_settings_instance.id}")
+
create_index_attempt(
cc_pair.id, search_settings_instance.id, db_session
)
@@ -391,7 +389,9 @@ def kickoff_indexing_jobs(
return existing_jobs_copy
-def get_all_tenant_ids() -> list[str]:
+def get_all_tenant_ids() -> list[str] | list[None]:
+ if not MULTI_TENANT:
+ return [None]
with Session(get_sqlalchemy_engine(schema='public')) as session:
result = session.execute(text("""
SELECT schema_name
@@ -443,11 +443,18 @@ def update_loop(
)
try:
- tenants = get_all_tenant_ids() if MULTI_TENANT else [None]
- tenants = [tenant for tenant in tenants if not tenant.startswith('pg_')] if MULTI_TENANT else tenants
+ tenants: list[str | None] = []
+ if MULTI_TENANT:
+ tenants = [
+ tenant for tenant in tenants
+ if tenant is None or not tenant.startswith('pg_')
+ ]
+ else:
+ tenants = tenants
if MULTI_TENANT:
logger.info(f"Found valid tenants: {tenants}")
+
for tenant_id in tenants:
try:
logger.debug(f"Processing {'index attempts' if tenant_id is None else f'tenant {tenant_id}'}")
diff --git a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py
index 436576663..c8de9f71c 100644
--- a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py
+++ b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py
@@ -171,6 +171,7 @@ def handle_regular_answer(
persona=persona,
actual_user_input=query_text,
max_llm_token_override=remaining_tokens,
+ db_session=db_session,
)
else:
max_document_tokens = (
diff --git a/backend/danswer/document_index/factory.py b/backend/danswer/document_index/factory.py
index 965ee370e..23f0bb36e 100644
--- a/backend/danswer/document_index/factory.py
+++ b/backend/danswer/document_index/factory.py
@@ -1,33 +1,23 @@
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.vespa.index import VespaIndex
-from danswer.document_index.vespa.index import MultiTenantVespaIndex
-
-
-
-
-def get_default_multi_tenant_document_index(
- indices: list[str],
-) -> DocumentIndex:
- """Primary index is the index that is used for querying/updating etc.
- Secondary index is for when both the currently used index and the upcoming
- index both need to be updated, updates are applied to both indices"""
- # Currently only supporting Vespa
- return MultiTenantVespaIndex(
- indices=indices,
- )
-
def get_default_document_index(
- primary_index_name: str,
- secondary_index_name: str | None,
+ primary_index_name: str | None = None,
+ indices: list[str] | None = None,
+ secondary_index_name: str | None = None
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
- pass
+
+ indices = [primary_index_name] if primary_index_name is not None else indices
+ if not indices:
+ raise ValueError("No indices provided")
+
return VespaIndex(
- index_name=primary_index_name, secondary_index_name=secondary_index_name
+ indices=indices,
+ secondary_index_name=secondary_index_name
)
diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py
index 2acd09779..0a8ed14c4 100644
--- a/backend/danswer/document_index/interfaces.py
+++ b/backend/danswer/document_index/interfaces.py
@@ -77,7 +77,7 @@ class Verifiable(abc.ABC):
all valid in the schema.
Parameters:
- - index_name: The name of the primary index currently used for querying
+ - indices: The names of the primary indices currently used for querying
- secondary_index_name: The name of the secondary index being built in the background, if it
currently exists. Some functions on the document index act on both the primary and
secondary index, some act on just one.
@@ -86,20 +86,21 @@ class Verifiable(abc.ABC):
@abc.abstractmethod
def __init__(
self,
- index_name: str,
+ indices: list[str],
secondary_index_name: str | None,
*args: Any,
**kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
- self.index_name = index_name
+ self.indices = indices
self.secondary_index_name = secondary_index_name
@abc.abstractmethod
def ensure_indices_exist(
self,
- index_embedding_dim: int,
- secondary_index_embedding_dim: int | None,
+ embedding_dims: list[int] | None = None,
+ index_embedding_dim: int | None = None,
+ secondary_index_embedding_dim: int | None = None
) -> None:
"""
Verify that the document index exists and is consistent with the expectations in the code.
diff --git a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd
index 3b0808c2e..b98c9343f 100644
--- a/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd
+++ b/backend/danswer/document_index/vespa/app_config/schemas/danswer_chunk.sd
@@ -1,11 +1,7 @@
schema DANSWER_CHUNK_NAME {
document DANSWER_CHUNK_NAME {
+ TENANT_ID_REPLACEMENT
# Not to be confused with the UUID generated for this chunk which is called documentid by default
- field tenant_id type string {
- indexing: summary | attribute
- rank: filter
- attribute: fast-search
- }
field document_id type string {
indexing: summary | attribute
}
diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py
index c5e34df7f..7f0bd2e1b 100644
--- a/backend/danswer/document_index/vespa/index.py
+++ b/backend/danswer/document_index/vespa/index.py
@@ -13,9 +13,9 @@ from typing import cast
import httpx
import requests
+from danswer.configs.app_configs import MULTI_TENANT
from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import NUM_RETURNED_HITS
-from danswer.configs.chat_configs import VESPA_SEARCHER_THREADS
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
from danswer.configs.constants import KV_REINDEX_KEY
from danswer.document_index.interfaces import DocumentIndex
@@ -45,10 +45,10 @@ from danswer.document_index.vespa.shared_utils.vespa_request_builders import (
from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST
from danswer.document_index.vespa_constants import BATCH_SIZE
from danswer.document_index.vespa_constants import BOOST
-
-from danswer.document_index.vespa_constants import SEARCH_THREAD_NUMBER_PAT
from danswer.document_index.vespa_constants import CONTENT_SUMMARY
from danswer.document_index.vespa_constants import DANSWER_CHUNK_REPLACEMENT_PAT
+from danswer.document_index.vespa_constants import TENANT_ID_PAT
+from danswer.document_index.vespa_constants import TENANT_ID_REPLACEMENT
from danswer.document_index.vespa_constants import DATE_REPLACEMENT
from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from danswer.document_index.vespa_constants import DOCUMENT_REPLACEMENT_PAT
@@ -86,7 +86,7 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
return zip_buffer
-def _create_document_xml_lines(doc_names: list[str | None]) -> str:
+def _create_document_xml_lines(doc_names: list[str]) -> str:
doc_lines = [
f''
for doc_name in doc_names
@@ -110,13 +110,30 @@ def add_ngrams_to_schema(schema_content: str) -> str:
return schema_content
-class MultiTenantVespaIndex(DocumentIndex):
- def __init__(self, indices: list[str], tenant_id: str = "public") -> None:
+class VespaIndex(DocumentIndex):
+ def __init__(self, indices: list[str], secondary_index_name: str | None) -> None:
self.indices = indices
- self.tenant_id = tenant_id
+ self.secondary_index_name = secondary_index_name
+ @property
+ def index_name(self) -> str:
+ if len(self.indices) == 0:
+ raise ValueError("No indices provided")
+ return self.indices[0]
+
+ def ensure_indices_exist(
+ self,
+ embedding_dims: list[int] | None = None,
+ index_embedding_dim: int | None = None,
+ secondary_index_embedding_dim: int | None = None
+ ) -> None:
+
+ if embedding_dims is None:
+ if index_embedding_dim is not None:
+ embedding_dims = [index_embedding_dim]
+ else:
+ raise ValueError("No embedding dimensions provided")
- def ensure_indices_exist(self, embedding_dims: list[int]) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.debug(f"Sending Vespa zip to {deploy_url}")
@@ -133,7 +150,12 @@ class MultiTenantVespaIndex(DocumentIndex):
# Generate schema names from index settings
schema_names = [index_name for index_name in self.indices]
- doc_lines = _create_document_xml_lines(schema_names)
+ full_schemas = schema_names
+ if self.secondary_index_name:
+ full_schemas.append(self.secondary_index_name)
+
+ doc_lines = _create_document_xml_lines(full_schemas)
+
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
kv_store = get_dynamic_config_store()
@@ -164,13 +186,24 @@ class MultiTenantVespaIndex(DocumentIndex):
for i, index_name in enumerate(self.indices):
embedding_dim = embedding_dims[i]
+ logger.info(f"Creating index: {index_name} with embedding dimension: {embedding_dim}")
schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
+ schema = schema.replace(TENANT_ID_PAT, TENANT_ID_REPLACEMENT if MULTI_TENANT else "")
schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
zip_dict[f"schemas/{index_name}.sd"] = schema.encode("utf-8")
+ if self.secondary_index_name:
+ logger.info("Creating secondary index:"
+ f"{self.secondary_index_name} with embedding dimension: {secondary_index_embedding_dim}")
+ upcoming_schema = schema_template.replace(
+ DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
+ ).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
+ upcoming_schema = upcoming_schema.replace(TENANT_ID_PAT, TENANT_ID_REPLACEMENT if MULTI_TENANT else "")
+ zip_dict[f"schemas/{self.secondary_index_name}.sd"] = upcoming_schema.encode("utf-8")
+
zip_file = in_memory_zip_from_file_bytes(zip_dict)
headers = {"Content-Type": "application/zip"}
@@ -181,388 +214,6 @@ class MultiTenantVespaIndex(DocumentIndex):
f"Failed to prepare Vespa Danswer Indexes. Response: {response.text}"
)
- def index(
- self,
- chunks: list[DocMetadataAwareIndexChunk],
- ) -> set[DocumentInsertionRecord]:
- """Receive a list of chunks from a batch of documents and index the chunks into Vespa along
- with updating the associated permissions. Assumes that a document will not be split into
- multiple chunk batches calling this function multiple times, otherwise only the last set of
- chunks will be kept"""
- # IMPORTANT: This must be done one index at a time, do not use secondary index here
- cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]
-
- existing_docs: set[str] = set()
-
- # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
- # indexing / updates / deletes since we have to make a large volume of requests.
- with (
- concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
- httpx.Client(http2=True) as http_client,
- ):
- # Check for existing documents, existing documents need to have all of their chunks deleted
- # prior to indexing as the document size (num chunks) may have shrunk
- first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
- for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
- existing_docs.update(
- get_existing_documents_from_chunks(
- chunks=chunk_batch,
- index_name=self.index_name,
- http_client=http_client,
- executor=executor,
- )
- )
-
- for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
- delete_vespa_docs(
- document_ids=doc_id_batch,
- index_name=self.index_name,
- http_client=http_client,
- executor=executor,
- )
-
- for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
- batch_index_vespa_chunks(
- chunks=chunk_batch,
- index_name=self.index_name,
- http_client=http_client,
- executor=executor,
- )
-
- all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
- return {
- DocumentInsertionRecord(
- document_id=doc_id,
- already_existed=doc_id in existing_docs,
- )
- for doc_id in all_doc_ids
- }
-
- @staticmethod
- def _apply_updates_batched(
- updates: list[_VespaUpdateRequest],
- batch_size: int = BATCH_SIZE,
- ) -> None:
- """Runs a batch of updates in parallel via the ThreadPoolExecutor."""
-
- def _update_chunk(
- update: _VespaUpdateRequest, http_client: httpx.Client
- ) -> httpx.Response:
- logger.debug(
- f"Updating with request to {update.url} with body {update.update_request}"
- )
- return http_client.put(
- update.url,
- headers={"Content-Type": "application/json"},
- json=update.update_request,
- )
-
- # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
- # indexing / updates / deletes since we have to make a large volume of requests.
- with (
- concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
- httpx.Client(http2=True) as http_client,
- ):
- for update_batch in batch_generator(updates, batch_size):
- future_to_document_id = {
- executor.submit(
- _update_chunk,
- update,
- http_client,
- ): update.document_id
- for update in update_batch
- }
- for future in concurrent.futures.as_completed(future_to_document_id):
- res = future.result()
- try:
- res.raise_for_status()
- except requests.HTTPError as e:
- failure_msg = f"Failed to update document: {future_to_document_id[future]}"
- raise requests.HTTPError(failure_msg) from e
-
- def update(self, update_requests: list[UpdateRequest]) -> None:
- logger.info(f"Updating {len(update_requests)} documents in Vespa")
-
- # Handle Vespa character limitations
- # Mutating update_requests but it's not used later anyway
- for update_request in update_requests:
- update_request.document_ids = [
- replace_invalid_doc_id_characters(doc_id)
- for doc_id in update_request.document_ids
- ]
-
- update_start = time.monotonic()
-
- processed_updates_requests: list[_VespaUpdateRequest] = []
- all_doc_chunk_ids: dict[str, list[str]] = {}
-
- # Fetch all chunks for each document ahead of time
- index_names = [self.index_name]
- if self.secondary_index_name:
- index_names.append(self.secondary_index_name)
-
- chunk_id_start_time = time.monotonic()
- with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
- future_to_doc_chunk_ids = {
- executor.submit(
- get_all_vespa_ids_for_document_id,
- document_id=document_id,
- index_name=index_name,
- filters=None,
- get_large_chunks=True,
- ): (document_id, index_name)
- for index_name in index_names
- for update_request in update_requests
- for document_id in update_request.document_ids
- }
- for future in concurrent.futures.as_completed(future_to_doc_chunk_ids):
- document_id, index_name = future_to_doc_chunk_ids[future]
- try:
- doc_chunk_ids = future.result()
- if document_id not in all_doc_chunk_ids:
- all_doc_chunk_ids[document_id] = []
- all_doc_chunk_ids[document_id].extend(doc_chunk_ids)
- except Exception as e:
- logger.error(
- f"Error retrieving chunk IDs for document {document_id} in index {index_name}: {e}"
- )
- logger.debug(
- f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs"
- )
-
- # Build the _VespaUpdateRequest objects
- for update_request in update_requests:
- update_dict: dict[str, dict] = {"fields": {}}
- if update_request.boost is not None:
- update_dict["fields"][BOOST] = {"assign": update_request.boost}
- if update_request.document_sets is not None:
- update_dict["fields"][DOCUMENT_SETS] = {
- "assign": {
- document_set: 1 for document_set in update_request.document_sets
- }
- }
- if update_request.access is not None:
- update_dict["fields"][ACCESS_CONTROL_LIST] = {
- "assign": {
- acl_entry: 1 for acl_entry in update_request.access.to_acl()
- }
- }
- if update_request.hidden is not None:
- update_dict["fields"][HIDDEN] = {"assign": update_request.hidden}
-
- if not update_dict["fields"]:
- logger.error("Update request received but nothing to update")
- continue
-
- for document_id in update_request.document_ids:
- for doc_chunk_id in all_doc_chunk_ids[document_id]:
- processed_updates_requests.append(
- _VespaUpdateRequest(
- document_id=document_id,
- url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}",
- update_request=update_dict,
- )
- )
-
- self._apply_updates_batched(processed_updates_requests)
- logger.debug(
- "Finished updating Vespa documents in %.2f seconds",
- time.monotonic() - update_start,
- )
-
- def delete(self, doc_ids: list[str]) -> None:
- logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
-
- doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids]
-
- # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
- # indexing / updates / deletes since we have to make a large volume of requests.
- with httpx.Client(http2=True) as http_client:
- index_names = [self.index_name]
- if self.secondary_index_name:
- index_names.append(self.secondary_index_name)
-
- for index_name in index_names:
- delete_vespa_docs(
- document_ids=doc_ids, index_name=index_name, http_client=http_client
- )
-
- def id_based_retrieval(
- self,
- chunk_requests: list[VespaChunkRequest],
- filters: IndexFilters,
- batch_retrieval: bool = False,
- get_large_chunks: bool = False,
- ) -> list[InferenceChunkUncleaned]:
- if batch_retrieval:
- return batch_search_api_retrieval(
- index_name=self.index_name,
- chunk_requests=chunk_requests,
- filters=filters,
- get_large_chunks=get_large_chunks,
- )
- return parallel_visit_api_retrieval(
- index_name=self.index_name,
- chunk_requests=chunk_requests,
- filters=filters,
- get_large_chunks=get_large_chunks,
- )
-
- def hybrid_retrieval(
- self,
- query: str,
- query_embedding: Embedding,
- final_keywords: list[str] | None,
- filters: IndexFilters,
- hybrid_alpha: float,
- time_decay_multiplier: float,
- num_to_retrieve: int,
- offset: int = 0,
- title_content_ratio: float | None = TITLE_CONTENT_RATIO,
- ) -> list[InferenceChunkUncleaned]:
- vespa_where_clauses = build_vespa_filters(filters)
- # Needs to be at least as much as the value set in Vespa schema config
- target_hits = max(10 * num_to_retrieve, 1000)
- yql = (
- YQL_BASE.format(index_name=self.index_name)
- + vespa_where_clauses
- + f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) "
- + f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) "
- + 'or ({grammar: "weakAnd"}userInput(@query)) '
- + f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
- )
-
- final_query = " ".join(final_keywords) if final_keywords else query
-
- logger.debug(f"Query YQL: {yql}")
-
- params: dict[str, str | int | float] = {
- "yql": yql,
- "query": final_query,
- "input.query(query_embedding)": str(query_embedding),
- "input.query(decay_factor)": str(DOC_TIME_DECAY * time_decay_multiplier),
- "input.query(alpha)": hybrid_alpha,
- "input.query(title_content_ratio)": title_content_ratio
- if title_content_ratio is not None
- else TITLE_CONTENT_RATIO,
- "hits": num_to_retrieve,
- "offset": offset,
- "ranking.profile": f"hybrid_search{len(query_embedding)}",
- "timeout": VESPA_TIMEOUT,
- }
-
- return query_vespa(params)
-
- def admin_retrieval(
- self,
- query: str,
- filters: IndexFilters,
- num_to_retrieve: int = NUM_RETURNED_HITS,
- offset: int = 0,
- ) -> list[InferenceChunkUncleaned]:
- vespa_where_clauses = build_vespa_filters(filters, include_hidden=True)
- yql = (
- YQL_BASE.format(index_name=self.index_name)
- + vespa_where_clauses
- + '({grammar: "weakAnd"}userInput(@query) '
- # `({defaultIndex: "content_summary"}userInput(@query))` section is
- # needed for highlighting while the N-gram highlighting is broken /
- # not working as desired
- + f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
- )
-
- params: dict[str, str | int] = {
- "yql": yql,
- "query": query,
- "hits": num_to_retrieve,
- "offset": 0,
- "ranking.profile": "admin_search",
- "timeout": VESPA_TIMEOUT,
- }
-
- return query_vespa(params)
-
-
-
-
-
-class VespaIndex(DocumentIndex):
- def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
- self.index_name = index_name
- self.secondary_index_name = secondary_index_name
-
- def ensure_indices_exist(
- self,
- index_embedding_dim: int,
- secondary_index_embedding_dim: int | None,
- ) -> None:
- deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
- logger.info(f"Deploying Vespa application package to {deploy_url}")
-
- vespa_schema_path = os.path.join(
- os.getcwd(), "danswer", "document_index", "vespa", "app_config"
- )
- schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
- services_file = os.path.join(vespa_schema_path, "services.xml")
- overrides_file = os.path.join(vespa_schema_path, "validation-overrides.xml")
-
- with open(services_file, "r") as services_f:
- services_template = services_f.read()
-
- schema_names = [self.index_name, self.secondary_index_name]
-
- doc_lines = _create_document_xml_lines(schema_names)
- services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
- services = services.replace(
- SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS)
- )
-
- kv_store = get_dynamic_config_store()
-
- needs_reindexing = False
- try:
- needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY))
- except Exception:
- logger.debug("Could not load the reindexing flag. Using ngrams")
-
- with open(overrides_file, "r") as overrides_f:
- overrides_template = overrides_f.read()
-
- # Vespa requires an override to erase data including the indices we're no longer using
- # It also has a 30 day cap from current so we set it to 7 dynamically
- now = datetime.now()
- date_in_7_days = now + timedelta(days=7)
- formatted_date = date_in_7_days.strftime("%Y-%m-%d")
-
- overrides = overrides_template.replace(DATE_REPLACEMENT, formatted_date)
-
- zip_dict = {
- "services.xml": services.encode("utf-8"),
- "validation-overrides.xml": overrides.encode("utf-8"),
- }
-
- with open(schema_file, "r") as schema_f:
- schema_template = schema_f.read()
- schema = schema_template.replace(
- DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name
- ).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))
- schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
- zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8")
-
- if self.secondary_index_name:
- upcoming_schema = schema_template.replace(
- DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
- ).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
- zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8")
-
- zip_file = in_memory_zip_from_file_bytes(zip_dict)
-
- headers = {"Content-Type": "application/zip"}
- response = requests.post(deploy_url, headers=headers, data=zip_file)
- if response.status_code != 200:
- raise RuntimeError(
- f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
- )
def index(
self,
@@ -613,7 +264,6 @@ class VespaIndex(DocumentIndex):
)
all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
-
return {
DocumentInsertionRecord(
document_id=doc_id,
diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py
index 3b06c71ce..456e37fb1 100644
--- a/backend/danswer/document_index/vespa/indexing_utils.py
+++ b/backend/danswer/document_index/vespa/indexing_utils.py
@@ -98,7 +98,7 @@ def get_existing_documents_from_chunks(
try:
chunk_existence_future = {
executor.submit(
- _does_document_exist,
+ _does_document_exist,
str(get_uuid_from_chunk(chunk)),
index_name,
http_client,
@@ -142,7 +142,6 @@ def _index_vespa_chunk(
title = document.get_title_for_document_index()
vespa_document_fields = {
- TENANT_ID: chunk.tenant_id,
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: remove_invalid_unicode_chars(chunk.blurb),
@@ -178,6 +177,8 @@ def _index_vespa_chunk(
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
+ if chunk.tenant_id:
+ vespa_document_fields[TENANT_ID] = chunk.tenant_id
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}"
logger.debug(f'Indexing to URL "{vespa_url}" with TENANT ID "{chunk.tenant_id}"')
diff --git a/backend/danswer/document_index/vespa_constants.py b/backend/danswer/document_index/vespa_constants.py
index 98e83203b..10cfefe55 100644
--- a/backend/danswer/document_index/vespa_constants.py
+++ b/backend/danswer/document_index/vespa_constants.py
@@ -9,7 +9,13 @@ DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME"
DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT"
DATE_REPLACEMENT = "DATE_REPLACEMENT"
SEARCH_THREAD_NUMBER_PAT = "SEARCH_THREAD_NUMBER"
+TENANT_ID_PAT = "TENANT_ID_REPLACEMENT"
+TENANT_ID_REPLACEMENT = """field tenant_id type string {
+ indexing: summary | attribute
+ rank: filter
+ attribute: fast-search
+ }"""
# config server
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
diff --git a/backend/danswer/main.py b/backend/danswer/main.py
index 8a1a2f9e7..ac47f6a58 100644
--- a/backend/danswer/main.py
+++ b/backend/danswer/main.py
@@ -1,6 +1,5 @@
-from danswer.document_index.vespa.index import MultiTenantVespaIndex
-from shared_configs.configs import SupportedEmbeddingModel
+from danswer.document_index.vespa.index import VespaIndex
import time
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@@ -16,6 +15,8 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from httpx_oauth.clients.google import GoogleOAuth2
+from danswer.document_index.interfaces import DocumentIndex
+from danswer.configs.app_configs import MULTI_TENANT
from danswer import __version__
from danswer.auth.schemas import UserCreate
from danswer.auth.schemas import UserRead
@@ -23,6 +24,7 @@ from danswer.auth.schemas import UserUpdate
from danswer.auth.users import auth_backend
from danswer.auth.users import fastapi_users
from sqlalchemy.orm import Session
+from danswer.indexing.models import IndexingSetting
from danswer.configs.app_configs import APP_API_PREFIX
from danswer.configs.app_configs import APP_HOST
from danswer.configs.app_configs import APP_PORT
@@ -52,7 +54,7 @@ from danswer.db.search_settings import get_secondary_search_settings
from danswer.db.search_settings import update_current_search_settings
from danswer.db.search_settings import update_secondary_search_settings
from danswer.db.swap_index import check_index_swap
-from danswer.document_index.factory import get_default_multi_tenant_document_index
+from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
@@ -229,15 +231,17 @@ def mark_reindex_flag(db_session: Session) -> None:
def setup_vespa(
- document_index: MultiTenantVespaIndex,
- supported_embedding_models: list[SupportedEmbeddingModel]
+ document_index: DocumentIndex,
+ embedding_dims: list[int],
+ secondary_embedding_dim: int | None = None
) -> None:
# Vespa startup is a bit slow, so give it a few seconds
wait_time = 5
for _ in range(5):
try:
document_index.ensure_indices_exist(
- embedding_dims=[model.dim for model in supported_embedding_models],
+ embedding_dims=embedding_dims,
+ secondary_index_embedding_dim=secondary_embedding_dim
)
break
except Exception:
@@ -320,14 +324,38 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# ensure Vespa is setup correctly
logger.notice("Verifying Document Index(s) is/are available.")
- document_index = get_default_multi_tenant_document_index(
- indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS],
- )
+ # document_index = get_default_document_index(
+ # indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS]
+ # ) if MULTI_TENANT else get_default_document_index(
+ # indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS],
+ # secondary_index_name=secondary_search_settings.index_name if secondary_search_settings else None
+ # )
- setup_vespa(
- document_index,
- SUPPORTED_EMBEDDING_MODELS
- )
+ if MULTI_TENANT:
+ document_index = get_default_document_index(
+ indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS]
+ )
+ setup_vespa(
+ document_index,
+ [model.dim for model in SUPPORTED_EMBEDDING_MODELS],
+ secondary_embedding_dim=secondary_search_settings.model_dim if secondary_search_settings else None
+ )
+
+ else:
+ document_index = get_default_document_index(
+ indices=[search_settings.index_name],
+ secondary_index_name=secondary_search_settings.index_name if secondary_search_settings else None
+ )
+
+ setup_vespa(
+ document_index,
+ [IndexingSetting.from_db_model(search_settings).model_dim],
+ secondary_embedding_dim=(
+ IndexingSetting.from_db_model(secondary_search_settings).model_dim
+ if secondary_search_settings
+ else None
+ )
+ )
logger.notice(f"Model Server: http://{MODEL_SERVER_HOST}:{MODEL_SERVER_PORT}")
if search_settings.provider_type is None:
diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py
index 0ac90ba8d..dcfb1c61b 100644
--- a/backend/danswer/server/manage/administrative.py
+++ b/backend/danswer/server/manage/administrative.py
@@ -38,6 +38,7 @@ from danswer.server.manage.models import BoostUpdateRequest
from danswer.server.manage.models import HiddenUpdateRequest
from danswer.server.models import StatusResponse
from danswer.utils.logger import setup_logger
+from danswer.db.engine import current_tenant_id
router = APIRouter(prefix="/manage")
logger = setup_logger()
@@ -195,7 +196,7 @@ def create_deletion_attempt_for_connector_id(
)
# actually kick off the deletion
cleanup_connector_credential_pair_task.apply_async(
- kwargs=dict(connector_id=connector_id, credential_id=credential_id),
+ kwargs=dict(connector_id=connector_id, credential_id=credential_id, tenant_id=current_tenant_id.get()),
)
if cc_pair.connector.source == DocumentSource.FILE:
diff --git a/backend/danswer/server/manage/search_settings.py b/backend/danswer/server/manage/search_settings.py
index 7af3a1fcc..05787ed3f 100644
--- a/backend/danswer/server/manage/search_settings.py
+++ b/backend/danswer/server/manage/search_settings.py
@@ -98,6 +98,8 @@ def set_new_search_settings(
primary_index_name=search_settings.index_name,
secondary_index_name=new_search_settings.index_name,
)
+ print("ensuring indices exist")
+
document_index.ensure_indices_exist(
index_embedding_dim=search_settings.model_dim,
secondary_index_embedding_dim=new_search_settings.model_dim,
diff --git a/backend/danswer/server/tenants/api.py b/backend/danswer/server/tenants/api.py
index e3813c6ed..123d94865 100644
--- a/backend/danswer/server/tenants/api.py
+++ b/backend/danswer/server/tenants/api.py
@@ -22,7 +22,7 @@ logger = setup_logger()
basic_router = APIRouter(prefix="/tenants")
@basic_router.post("/create")
-def create_tenant(tenant_id: str, _ = Depends(control_plane_dep)) -> dict[str, str]:
+def create_tenant(tenant_id: str, _: None= Depends(control_plane_dep)) -> dict[str, str]:
if not MULTI_TENANT:
raise HTTPException(status_code=403, detail="Multi-tenant is not enabled")
diff --git a/backend/danswer/server/tenants/provisioning.py b/backend/danswer/server/tenants/provisioning.py
index 49b1805e8..32a2cdd65 100644
--- a/backend/danswer/server/tenants/provisioning.py
+++ b/backend/danswer/server/tenants/provisioning.py
@@ -50,7 +50,7 @@ def run_alembic_migrations(schema_name: str) -> None:
# Prepare the x arguments
x_arguments = [f"schema={schema_name}"]
- alembic_cfg.cmd_opts.x = x_arguments
+ alembic_cfg.cmd_opts.x = x_arguments # type: ignore
# Run migrations programmatically
command.upgrade(alembic_cfg, 'head')
diff --git a/backend/ee/danswer/auth/users.py b/backend/ee/danswer/auth/users.py
index 9edb0515b..b31726dc1 100644
--- a/backend/ee/danswer/auth/users.py
+++ b/backend/ee/danswer/auth/users.py
@@ -25,7 +25,7 @@ def verify_auth_setting() -> None:
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")
-async def control_plane_dep(request: Request):
+async def control_plane_dep(request: Request) -> None:
auth_header = request.headers.get("Authorization")
api_key = request.headers.get("X-API-KEY")
diff --git a/backend/ee/danswer/server/query_and_chat/query_backend.py b/backend/ee/danswer/server/query_and_chat/query_backend.py
index 53bbb0161..d1970da6c 100644
--- a/backend/ee/danswer/server/query_and_chat/query_backend.py
+++ b/backend/ee/danswer/server/query_and_chat/query_backend.py
@@ -154,6 +154,7 @@ def get_answer_with_quote(
persona=persona,
actual_user_input=query,
max_llm_token_override=remaining_tokens,
+ db_session=db_session,
)
answer_details = get_search_answer(
diff --git a/backend/tests/integration/common_utils/reset.py b/backend/tests/integration/common_utils/reset.py
index 7c641c017..96ab1a7bb 100644
--- a/backend/tests/integration/common_utils/reset.py
+++ b/backend/tests/integration/common_utils/reset.py
@@ -132,9 +132,9 @@ def reset_vespa() -> None:
index_name = search_settings.index_name
setup_vespa(
- document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
- index_setting=IndexingSetting.from_db_model(search_settings),
- secondary_index_setting=None,
+ document_index=VespaIndex(indices=[index_name], secondary_index_name=None),
+ embedding_dims=[search_settings.model_dim],
+ secondary_embedding_dim=None,
)
for _ in range(5):
diff --git a/web/src/app/admin/embeddings/modals/ProviderCreationModal.tsx b/web/src/app/admin/embeddings/modals/ProviderCreationModal.tsx
index 4b2ad9c51..52920bf0a 100644
--- a/web/src/app/admin/embeddings/modals/ProviderCreationModal.tsx
+++ b/web/src/app/admin/embeddings/modals/ProviderCreationModal.tsx
@@ -173,7 +173,7 @@ export function ProviderCreationModal({
-
+
{useFileUpload ? (
<>