fix typing issues / build issues + backwards-compatibility for indexing

This commit is contained in:
pablodanswer 2024-09-27 12:48:21 -07:00
parent e5f3f2d73a
commit 7419bf6b06
23 changed files with 144 additions and 463 deletions

View File

@ -23,7 +23,7 @@ if config.config_file_name is not None:
# target_metadata = mymodel.Base.metadata
target_metadata = [Base.metadata, ResultModelBase.metadata]
def get_schema_options() -> Tuple[str, bool]:
def get_schema_options() -> str:
x_args_raw = context.get_x_argument()
x_args = {}
for arg in x_args_raw:

View File

@ -33,7 +33,7 @@ class UserRead(schemas.BaseUser[uuid.UUID]):
class UserCreate(schemas.BaseUserCreate):
role: UserRole = UserRole.BASIC
tenant_id: int | None = None
tenant_id: str | None = None
class UserUpdate(schemas.BaseUserUpdate):

View File

@ -451,14 +451,11 @@ def check_for_prune_task(tenant_id: str | None) -> None:
# Celery Beat (Periodic Tasks) Settings
#####
def schedule_tenant_tasks():
if MULTI_TENANT:
tenants = get_all_tenant_ids()
else:
tenants = [None]
def schedule_tenant_tasks() -> None:
tenants = get_all_tenant_ids()
# Filter out any invalid tenants if necessary
valid_tenants = [tenant for tenant in tenants if not tenant.startswith('pg_')]
valid_tenants = [tenant for tenant in tenants if tenant is None or not tenant.startswith('pg_')]
logger.info(f"Scheduling tasks for tenants: {valid_tenants}")
for tenant_id in valid_tenants:

View File

@ -47,7 +47,7 @@ def delete_connector_credential_pair_batch(
connector_id: int,
credential_id: int,
document_index: DocumentIndex,
tenant_id: str = DEFAULT_SCHEMA,
tenant_id: str | None
) -> None:
"""
Removes a batch of documents ids from a cc-pair. If no other cc-pair uses a document anymore
@ -126,7 +126,7 @@ def delete_connector_credential_pair(
db_session: Session,
document_index: DocumentIndex,
cc_pair: ConnectorCredentialPair,
tenant_id: str = DEFAULT_SCHEMA,
tenant_id: str | None
) -> int:
connector_id = cc_pair.connector_id
credential_id = cc_pair.credential_id

View File

@ -409,7 +409,7 @@ def run_indexing_entrypoint(index_attempt_id: int, tenant_id: str | None, is_ee:
_run_indexing(db_session, attempt, tenant_id)
logger.info(
f"Indexing finished for tenant {tenant_id}: "
f"Indexing finished for tenant {tenant_id}: " if tenant_id is not None else "" +
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"

View File

@ -14,7 +14,7 @@ from danswer.db.tasks import mark_task_start
from danswer.db.tasks import register_task
def name_cc_cleanup_task(connector_id: int, credential_id: int, tenant_id: str | None) -> str:
def name_cc_cleanup_task(connector_id: int, credential_id: int, tenant_id: str | None = None) -> str:
task_name = f"cleanup_connector_credential_pair_{connector_id}_{credential_id}"
if tenant_id is not None:
task_name += f"_{tenant_id}"

View File

@ -183,7 +183,6 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob], tenant_id
all_connector_credential_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair in all_connector_credential_pairs:
logger.info(f"iterating! {cc_pair.id}")
for search_settings_instance in search_settings:
# Check if there is an ongoing indexing attempt for this connector credential pair
if (cc_pair.id, search_settings_instance.id) in ongoing:
@ -199,9 +198,8 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob], tenant_id
secondary_index_building=len(search_settings) > 1,
db_session=db_session,
):
logger.info(f"Not creating new indexing job for {cc_pair.id} and {search_settings_instance.id}")
continue
logger.info(f"Creating new indexing job for {cc_pair.id} and {search_settings_instance.id}")
create_index_attempt(
cc_pair.id, search_settings_instance.id, db_session
)
@ -391,7 +389,9 @@ def kickoff_indexing_jobs(
return existing_jobs_copy
def get_all_tenant_ids() -> list[str]:
def get_all_tenant_ids() -> list[str] | list[None]:
if not MULTI_TENANT:
return [None]
with Session(get_sqlalchemy_engine(schema='public')) as session:
result = session.execute(text("""
SELECT schema_name
@ -443,11 +443,18 @@ def update_loop(
)
try:
tenants = get_all_tenant_ids() if MULTI_TENANT else [None]
tenants = [tenant for tenant in tenants if not tenant.startswith('pg_')] if MULTI_TENANT else tenants
tenants: list[str | None] = []
if MULTI_TENANT:
tenants = [
tenant for tenant in tenants
if tenant is None or not tenant.startswith('pg_')
]
else:
tenants = tenants
if MULTI_TENANT:
logger.info(f"Found valid tenants: {tenants}")
for tenant_id in tenants:
try:
logger.debug(f"Processing {'index attempts' if tenant_id is None else f'tenant {tenant_id}'}")

View File

@ -171,6 +171,7 @@ def handle_regular_answer(
persona=persona,
actual_user_input=query_text,
max_llm_token_override=remaining_tokens,
db_session=db_session,
)
else:
max_document_tokens = (

View File

@ -1,33 +1,23 @@
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.vespa.index import VespaIndex
from danswer.document_index.vespa.index import MultiTenantVespaIndex
def get_default_multi_tenant_document_index(
indices: list[str],
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
return MultiTenantVespaIndex(
indices=indices,
)
def get_default_document_index(
primary_index_name: str,
secondary_index_name: str | None,
primary_index_name: str | None = None,
indices: list[str] | None = None,
secondary_index_name: str | None = None
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
pass
indices = [primary_index_name] if primary_index_name is not None else indices
if not indices:
raise ValueError("No indices provided")
return VespaIndex(
index_name=primary_index_name, secondary_index_name=secondary_index_name
indices=indices,
secondary_index_name=secondary_index_name
)

View File

@ -77,7 +77,7 @@ class Verifiable(abc.ABC):
all valid in the schema.
Parameters:
- index_name: The name of the primary index currently used for querying
- indices: The names of the primary indices currently used for querying
- secondary_index_name: The name of the secondary index being built in the background, if it
currently exists. Some functions on the document index act on both the primary and
secondary index, some act on just one.
@ -86,20 +86,21 @@ class Verifiable(abc.ABC):
@abc.abstractmethod
def __init__(
self,
index_name: str,
indices: list[str],
secondary_index_name: str | None,
*args: Any,
**kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self.index_name = index_name
self.indices = indices
self.secondary_index_name = secondary_index_name
@abc.abstractmethod
def ensure_indices_exist(
self,
index_embedding_dim: int,
secondary_index_embedding_dim: int | None,
embedding_dims: list[int] | None = None,
index_embedding_dim: int | None = None,
secondary_index_embedding_dim: int | None = None
) -> None:
"""
Verify that the document index exists and is consistent with the expectations in the code.

View File

@ -1,11 +1,7 @@
schema DANSWER_CHUNK_NAME {
document DANSWER_CHUNK_NAME {
TENANT_ID_REPLACEMENT
# Not to be confused with the UUID generated for this chunk which is called documentid by default
field tenant_id type string {
indexing: summary | attribute
rank: filter
attribute: fast-search
}
field document_id type string {
indexing: summary | attribute
}

View File

@ -13,9 +13,9 @@ from typing import cast
import httpx
import requests
from danswer.configs.app_configs import MULTI_TENANT
from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import NUM_RETURNED_HITS
from danswer.configs.chat_configs import VESPA_SEARCHER_THREADS
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
from danswer.configs.constants import KV_REINDEX_KEY
from danswer.document_index.interfaces import DocumentIndex
@ -45,10 +45,10 @@ from danswer.document_index.vespa.shared_utils.vespa_request_builders import (
from danswer.document_index.vespa_constants import ACCESS_CONTROL_LIST
from danswer.document_index.vespa_constants import BATCH_SIZE
from danswer.document_index.vespa_constants import BOOST
from danswer.document_index.vespa_constants import SEARCH_THREAD_NUMBER_PAT
from danswer.document_index.vespa_constants import CONTENT_SUMMARY
from danswer.document_index.vespa_constants import DANSWER_CHUNK_REPLACEMENT_PAT
from danswer.document_index.vespa_constants import TENANT_ID_PAT
from danswer.document_index.vespa_constants import TENANT_ID_REPLACEMENT
from danswer.document_index.vespa_constants import DATE_REPLACEMENT
from danswer.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from danswer.document_index.vespa_constants import DOCUMENT_REPLACEMENT_PAT
@ -86,7 +86,7 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
return zip_buffer
def _create_document_xml_lines(doc_names: list[str | None]) -> str:
def _create_document_xml_lines(doc_names: list[str]) -> str:
doc_lines = [
f'<document type="{doc_name}" mode="index" />'
for doc_name in doc_names
@ -110,13 +110,30 @@ def add_ngrams_to_schema(schema_content: str) -> str:
return schema_content
class MultiTenantVespaIndex(DocumentIndex):
def __init__(self, indices: list[str], tenant_id: str = "public") -> None:
class VespaIndex(DocumentIndex):
def __init__(self, indices: list[str], secondary_index_name: str | None) -> None:
self.indices = indices
self.tenant_id = tenant_id
self.secondary_index_name = secondary_index_name
@property
def index_name(self) -> str:
if len(self.indices) == 0:
raise ValueError("No indices provided")
return self.indices[0]
def ensure_indices_exist(
self,
embedding_dims: list[int] | None = None,
index_embedding_dim: int | None = None,
secondary_index_embedding_dim: int | None = None
) -> None:
if embedding_dims is None:
if index_embedding_dim is not None:
embedding_dims = [index_embedding_dim]
else:
raise ValueError("No embedding dimensions provided")
def ensure_indices_exist(self, embedding_dims: list[int]) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.debug(f"Sending Vespa zip to {deploy_url}")
@ -133,7 +150,12 @@ class MultiTenantVespaIndex(DocumentIndex):
# Generate schema names from index settings
schema_names = [index_name for index_name in self.indices]
doc_lines = _create_document_xml_lines(schema_names)
full_schemas = schema_names
if self.secondary_index_name:
full_schemas.append(self.secondary_index_name)
doc_lines = _create_document_xml_lines(full_schemas)
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
kv_store = get_dynamic_config_store()
@ -164,13 +186,24 @@ class MultiTenantVespaIndex(DocumentIndex):
for i, index_name in enumerate(self.indices):
embedding_dim = embedding_dims[i]
logger.info(f"Creating index: {index_name} with embedding dimension: {embedding_dim}")
schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
schema = schema.replace(TENANT_ID_PAT, TENANT_ID_REPLACEMENT if MULTI_TENANT else "")
schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
zip_dict[f"schemas/{index_name}.sd"] = schema.encode("utf-8")
if self.secondary_index_name:
logger.info("Creating secondary index:"
f"{self.secondary_index_name} with embedding dimension: {secondary_index_embedding_dim}")
upcoming_schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
upcoming_schema = upcoming_schema.replace(TENANT_ID_PAT, TENANT_ID_REPLACEMENT if MULTI_TENANT else "")
zip_dict[f"schemas/{self.secondary_index_name}.sd"] = upcoming_schema.encode("utf-8")
zip_file = in_memory_zip_from_file_bytes(zip_dict)
headers = {"Content-Type": "application/zip"}
@ -181,388 +214,6 @@ class MultiTenantVespaIndex(DocumentIndex):
f"Failed to prepare Vespa Danswer Indexes. Response: {response.text}"
)
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""
# IMPORTANT: This must be done one index at a time, do not use secondary index here
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]
existing_docs: set[str] = set()
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
):
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
return {
DocumentInsertionRecord(
document_id=doc_id,
already_existed=doc_id in existing_docs,
)
for doc_id in all_doc_ids
}
@staticmethod
def _apply_updates_batched(
updates: list[_VespaUpdateRequest],
batch_size: int = BATCH_SIZE,
) -> None:
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
def _update_chunk(
update: _VespaUpdateRequest, http_client: httpx.Client
) -> httpx.Response:
logger.debug(
f"Updating with request to {update.url} with body {update.update_request}"
)
return http_client.put(
update.url,
headers={"Content-Type": "application/json"},
json=update.update_request,
)
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
):
for update_batch in batch_generator(updates, batch_size):
future_to_document_id = {
executor.submit(
_update_chunk,
update,
http_client,
): update.document_id
for update in update_batch
}
for future in concurrent.futures.as_completed(future_to_document_id):
res = future.result()
try:
res.raise_for_status()
except requests.HTTPError as e:
failure_msg = f"Failed to update document: {future_to_document_id[future]}"
raise requests.HTTPError(failure_msg) from e
def update(self, update_requests: list[UpdateRequest]) -> None:
logger.info(f"Updating {len(update_requests)} documents in Vespa")
# Handle Vespa character limitations
# Mutating update_requests but it's not used later anyway
for update_request in update_requests:
update_request.document_ids = [
replace_invalid_doc_id_characters(doc_id)
for doc_id in update_request.document_ids
]
update_start = time.monotonic()
processed_updates_requests: list[_VespaUpdateRequest] = []
all_doc_chunk_ids: dict[str, list[str]] = {}
# Fetch all chunks for each document ahead of time
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
chunk_id_start_time = time.monotonic()
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_doc_chunk_ids = {
executor.submit(
get_all_vespa_ids_for_document_id,
document_id=document_id,
index_name=index_name,
filters=None,
get_large_chunks=True,
): (document_id, index_name)
for index_name in index_names
for update_request in update_requests
for document_id in update_request.document_ids
}
for future in concurrent.futures.as_completed(future_to_doc_chunk_ids):
document_id, index_name = future_to_doc_chunk_ids[future]
try:
doc_chunk_ids = future.result()
if document_id not in all_doc_chunk_ids:
all_doc_chunk_ids[document_id] = []
all_doc_chunk_ids[document_id].extend(doc_chunk_ids)
except Exception as e:
logger.error(
f"Error retrieving chunk IDs for document {document_id} in index {index_name}: {e}"
)
logger.debug(
f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs"
)
# Build the _VespaUpdateRequest objects
for update_request in update_requests:
update_dict: dict[str, dict] = {"fields": {}}
if update_request.boost is not None:
update_dict["fields"][BOOST] = {"assign": update_request.boost}
if update_request.document_sets is not None:
update_dict["fields"][DOCUMENT_SETS] = {
"assign": {
document_set: 1 for document_set in update_request.document_sets
}
}
if update_request.access is not None:
update_dict["fields"][ACCESS_CONTROL_LIST] = {
"assign": {
acl_entry: 1 for acl_entry in update_request.access.to_acl()
}
}
if update_request.hidden is not None:
update_dict["fields"][HIDDEN] = {"assign": update_request.hidden}
if not update_dict["fields"]:
logger.error("Update request received but nothing to update")
continue
for document_id in update_request.document_ids:
for doc_chunk_id in all_doc_chunk_ids[document_id]:
processed_updates_requests.append(
_VespaUpdateRequest(
document_id=document_id,
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}",
update_request=update_dict,
)
)
self._apply_updates_batched(processed_updates_requests)
logger.debug(
"Finished updating Vespa documents in %.2f seconds",
time.monotonic() - update_start,
)
def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids]
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
for index_name in index_names:
delete_vespa_docs(
document_ids=doc_ids, index_name=index_name, http_client=http_client
)
def id_based_retrieval(
self,
chunk_requests: list[VespaChunkRequest],
filters: IndexFilters,
batch_retrieval: bool = False,
get_large_chunks: bool = False,
) -> list[InferenceChunkUncleaned]:
if batch_retrieval:
return batch_search_api_retrieval(
index_name=self.index_name,
chunk_requests=chunk_requests,
filters=filters,
get_large_chunks=get_large_chunks,
)
return parallel_visit_api_retrieval(
index_name=self.index_name,
chunk_requests=chunk_requests,
filters=filters,
get_large_chunks=get_large_chunks,
)
def hybrid_retrieval(
self,
query: str,
query_embedding: Embedding,
final_keywords: list[str] | None,
filters: IndexFilters,
hybrid_alpha: float,
time_decay_multiplier: float,
num_to_retrieve: int,
offset: int = 0,
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
) -> list[InferenceChunkUncleaned]:
vespa_where_clauses = build_vespa_filters(filters)
# Needs to be at least as much as the value set in Vespa schema config
target_hits = max(10 * num_to_retrieve, 1000)
yql = (
YQL_BASE.format(index_name=self.index_name)
+ vespa_where_clauses
+ f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) "
+ f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) "
+ 'or ({grammar: "weakAnd"}userInput(@query)) '
+ f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
)
final_query = " ".join(final_keywords) if final_keywords else query
logger.debug(f"Query YQL: {yql}")
params: dict[str, str | int | float] = {
"yql": yql,
"query": final_query,
"input.query(query_embedding)": str(query_embedding),
"input.query(decay_factor)": str(DOC_TIME_DECAY * time_decay_multiplier),
"input.query(alpha)": hybrid_alpha,
"input.query(title_content_ratio)": title_content_ratio
if title_content_ratio is not None
else TITLE_CONTENT_RATIO,
"hits": num_to_retrieve,
"offset": offset,
"ranking.profile": f"hybrid_search{len(query_embedding)}",
"timeout": VESPA_TIMEOUT,
}
return query_vespa(params)
def admin_retrieval(
self,
query: str,
filters: IndexFilters,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
) -> list[InferenceChunkUncleaned]:
vespa_where_clauses = build_vespa_filters(filters, include_hidden=True)
yql = (
YQL_BASE.format(index_name=self.index_name)
+ vespa_where_clauses
+ '({grammar: "weakAnd"}userInput(@query) '
# `({defaultIndex: "content_summary"}userInput(@query))` section is
# needed for highlighting while the N-gram highlighting is broken /
# not working as desired
+ f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
)
params: dict[str, str | int] = {
"yql": yql,
"query": query,
"hits": num_to_retrieve,
"offset": 0,
"ranking.profile": "admin_search",
"timeout": VESPA_TIMEOUT,
}
return query_vespa(params)
class VespaIndex(DocumentIndex):
def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
self.index_name = index_name
self.secondary_index_name = secondary_index_name
def ensure_indices_exist(
self,
index_embedding_dim: int,
secondary_index_embedding_dim: int | None,
) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.info(f"Deploying Vespa application package to {deploy_url}")
vespa_schema_path = os.path.join(
os.getcwd(), "danswer", "document_index", "vespa", "app_config"
)
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
services_file = os.path.join(vespa_schema_path, "services.xml")
overrides_file = os.path.join(vespa_schema_path, "validation-overrides.xml")
with open(services_file, "r") as services_f:
services_template = services_f.read()
schema_names = [self.index_name, self.secondary_index_name]
doc_lines = _create_document_xml_lines(schema_names)
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
services = services.replace(
SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS)
)
kv_store = get_dynamic_config_store()
needs_reindexing = False
try:
needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY))
except Exception:
logger.debug("Could not load the reindexing flag. Using ngrams")
with open(overrides_file, "r") as overrides_f:
overrides_template = overrides_f.read()
# Vespa requires an override to erase data including the indices we're no longer using
# It also has a 30 day cap from current so we set it to 7 dynamically
now = datetime.now()
date_in_7_days = now + timedelta(days=7)
formatted_date = date_in_7_days.strftime("%Y-%m-%d")
overrides = overrides_template.replace(DATE_REPLACEMENT, formatted_date)
zip_dict = {
"services.xml": services.encode("utf-8"),
"validation-overrides.xml": overrides.encode("utf-8"),
}
with open(schema_file, "r") as schema_f:
schema_template = schema_f.read()
schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))
schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8")
if self.secondary_index_name:
upcoming_schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8")
zip_file = in_memory_zip_from_file_bytes(zip_dict)
headers = {"Content-Type": "application/zip"}
response = requests.post(deploy_url, headers=headers, data=zip_file)
if response.status_code != 200:
raise RuntimeError(
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
)
def index(
self,
@ -613,7 +264,6 @@ class VespaIndex(DocumentIndex):
)
all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
return {
DocumentInsertionRecord(
document_id=doc_id,

View File

@ -98,7 +98,7 @@ def get_existing_documents_from_chunks(
try:
chunk_existence_future = {
executor.submit(
_does_document_exist,
_does_document_exist,
str(get_uuid_from_chunk(chunk)),
index_name,
http_client,
@ -142,7 +142,6 @@ def _index_vespa_chunk(
title = document.get_title_for_document_index()
vespa_document_fields = {
TENANT_ID: chunk.tenant_id,
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: remove_invalid_unicode_chars(chunk.blurb),
@ -178,6 +177,8 @@ def _index_vespa_chunk(
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
if chunk.tenant_id:
vespa_document_fields[TENANT_ID] = chunk.tenant_id
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}"
logger.debug(f'Indexing to URL "{vespa_url}" with TENANT ID "{chunk.tenant_id}"')

View File

@ -9,7 +9,13 @@ DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME"
DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT"
DATE_REPLACEMENT = "DATE_REPLACEMENT"
SEARCH_THREAD_NUMBER_PAT = "SEARCH_THREAD_NUMBER"
TENANT_ID_PAT = "TENANT_ID_REPLACEMENT"
TENANT_ID_REPLACEMENT = """field tenant_id type string {
indexing: summary | attribute
rank: filter
attribute: fast-search
}"""
# config server
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"

View File

@ -1,6 +1,5 @@
from danswer.document_index.vespa.index import MultiTenantVespaIndex
from shared_configs.configs import SupportedEmbeddingModel
from danswer.document_index.vespa.index import VespaIndex
import time
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@ -16,6 +15,8 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from httpx_oauth.clients.google import GoogleOAuth2
from danswer.document_index.interfaces import DocumentIndex
from danswer.configs.app_configs import MULTI_TENANT
from danswer import __version__
from danswer.auth.schemas import UserCreate
from danswer.auth.schemas import UserRead
@ -23,6 +24,7 @@ from danswer.auth.schemas import UserUpdate
from danswer.auth.users import auth_backend
from danswer.auth.users import fastapi_users
from sqlalchemy.orm import Session
from danswer.indexing.models import IndexingSetting
from danswer.configs.app_configs import APP_API_PREFIX
from danswer.configs.app_configs import APP_HOST
from danswer.configs.app_configs import APP_PORT
@ -52,7 +54,7 @@ from danswer.db.search_settings import get_secondary_search_settings
from danswer.db.search_settings import update_current_search_settings
from danswer.db.search_settings import update_secondary_search_settings
from danswer.db.swap_index import check_index_swap
from danswer.document_index.factory import get_default_multi_tenant_document_index
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
@ -229,15 +231,17 @@ def mark_reindex_flag(db_session: Session) -> None:
def setup_vespa(
document_index: MultiTenantVespaIndex,
supported_embedding_models: list[SupportedEmbeddingModel]
document_index: DocumentIndex,
embedding_dims: list[int],
secondary_embedding_dim: int | None = None
) -> None:
# Vespa startup is a bit slow, so give it a few seconds
wait_time = 5
for _ in range(5):
try:
document_index.ensure_indices_exist(
embedding_dims=[model.dim for model in supported_embedding_models],
embedding_dims=embedding_dims,
secondary_index_embedding_dim=secondary_embedding_dim
)
break
except Exception:
@ -320,14 +324,38 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# ensure Vespa is setup correctly
logger.notice("Verifying Document Index(s) is/are available.")
document_index = get_default_multi_tenant_document_index(
indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS],
)
# document_index = get_default_document_index(
# indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS]
# ) if MULTI_TENANT else get_default_document_index(
# indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS],
# secondary_index_name=secondary_search_settings.index_name if secondary_search_settings else None
# )
setup_vespa(
document_index,
SUPPORTED_EMBEDDING_MODELS
)
if MULTI_TENANT:
document_index = get_default_document_index(
indices=[model.index_name for model in SUPPORTED_EMBEDDING_MODELS]
)
setup_vespa(
document_index,
[model.dim for model in SUPPORTED_EMBEDDING_MODELS],
secondary_embedding_dim=secondary_search_settings.model_dim if secondary_search_settings else None
)
else:
document_index = get_default_document_index(
indices=[search_settings.index_name],
secondary_index_name=secondary_search_settings.index_name if secondary_search_settings else None
)
setup_vespa(
document_index,
[IndexingSetting.from_db_model(search_settings).model_dim],
secondary_embedding_dim=(
IndexingSetting.from_db_model(secondary_search_settings).model_dim
if secondary_search_settings
else None
)
)
logger.notice(f"Model Server: http://{MODEL_SERVER_HOST}:{MODEL_SERVER_PORT}")
if search_settings.provider_type is None:

View File

@ -38,6 +38,7 @@ from danswer.server.manage.models import BoostUpdateRequest
from danswer.server.manage.models import HiddenUpdateRequest
from danswer.server.models import StatusResponse
from danswer.utils.logger import setup_logger
from danswer.db.engine import current_tenant_id
router = APIRouter(prefix="/manage")
logger = setup_logger()
@ -195,7 +196,7 @@ def create_deletion_attempt_for_connector_id(
)
# actually kick off the deletion
cleanup_connector_credential_pair_task.apply_async(
kwargs=dict(connector_id=connector_id, credential_id=credential_id),
kwargs=dict(connector_id=connector_id, credential_id=credential_id, tenant_id=current_tenant_id.get()),
)
if cc_pair.connector.source == DocumentSource.FILE:

View File

@ -98,6 +98,8 @@ def set_new_search_settings(
primary_index_name=search_settings.index_name,
secondary_index_name=new_search_settings.index_name,
)
print("ensuring indices exist")
document_index.ensure_indices_exist(
index_embedding_dim=search_settings.model_dim,
secondary_index_embedding_dim=new_search_settings.model_dim,

View File

@ -22,7 +22,7 @@ logger = setup_logger()
basic_router = APIRouter(prefix="/tenants")
@basic_router.post("/create")
def create_tenant(tenant_id: str, _ = Depends(control_plane_dep)) -> dict[str, str]:
def create_tenant(tenant_id: str, _: None= Depends(control_plane_dep)) -> dict[str, str]:
if not MULTI_TENANT:
raise HTTPException(status_code=403, detail="Multi-tenant is not enabled")

View File

@ -50,7 +50,7 @@ def run_alembic_migrations(schema_name: str) -> None:
# Prepare the x arguments
x_arguments = [f"schema={schema_name}"]
alembic_cfg.cmd_opts.x = x_arguments
alembic_cfg.cmd_opts.x = x_arguments # type: ignore
# Run migrations programmatically
command.upgrade(alembic_cfg, 'head')

View File

@ -25,7 +25,7 @@ def verify_auth_setting() -> None:
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")
async def control_plane_dep(request: Request):
async def control_plane_dep(request: Request) -> None:
auth_header = request.headers.get("Authorization")
api_key = request.headers.get("X-API-KEY")

View File

@ -154,6 +154,7 @@ def get_answer_with_quote(
persona=persona,
actual_user_input=query,
max_llm_token_override=remaining_tokens,
db_session=db_session,
)
answer_details = get_search_answer(

View File

@ -132,9 +132,9 @@ def reset_vespa() -> None:
index_name = search_settings.index_name
setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
document_index=VespaIndex(indices=[index_name], secondary_index_name=None),
embedding_dims=[search_settings.model_dim],
secondary_embedding_dim=None,
)
for _ in range(5):

View File

@ -173,7 +173,7 @@ export function ProviderCreationModal({
</a>
</Text>
<div className="flex flex-col gap-y-2">
<div className="flex w-full flex-col gap-y-2">
{useFileUpload ? (
<>
<Label>Upload JSON File</Label>