mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-30 01:30:45 +02:00
Variable Embedding Dim for Vespa (#985)
This commit is contained in:
@ -42,11 +42,6 @@ COPY ./alembic /app/alembic
|
|||||||
COPY ./alembic.ini /app/alembic.ini
|
COPY ./alembic.ini /app/alembic.ini
|
||||||
COPY supervisord.conf /usr/etc/supervisord.conf
|
COPY supervisord.conf /usr/etc/supervisord.conf
|
||||||
|
|
||||||
# Create Vespa app zip
|
|
||||||
WORKDIR /app/danswer/document_index/vespa/app_config
|
|
||||||
RUN zip -r /app/danswer/vespa-app.zip .
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
ENV PYTHONPATH /app
|
ENV PYTHONPATH /app
|
||||||
|
|
||||||
# Default command which does nothing
|
# Default command which does nothing
|
||||||
|
@ -4,6 +4,7 @@ from datetime import datetime
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from danswer.access.models import DocumentAccess
|
from danswer.access.models import DocumentAccess
|
||||||
|
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||||
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
||||||
from danswer.indexing.models import InferenceChunk
|
from danswer.indexing.models import InferenceChunk
|
||||||
from danswer.search.models import IndexFilters
|
from danswer.search.models import IndexFilters
|
||||||
@ -50,7 +51,7 @@ class Verifiable(abc.ABC):
|
|||||||
self.index_name = index_name
|
self.index_name = index_name
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def ensure_indices_exist(self) -> None:
|
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ schema danswer_chunk {
|
|||||||
summary: dynamic
|
summary: dynamic
|
||||||
}
|
}
|
||||||
# Title embedding (x1)
|
# Title embedding (x1)
|
||||||
field title_embedding type tensor<float>(x[384]) {
|
field title_embedding type tensor<float>(x[VARIABLE_DIM]) {
|
||||||
indexing: attribute
|
indexing: attribute
|
||||||
attribute {
|
attribute {
|
||||||
distance-metric: angular
|
distance-metric: angular
|
||||||
@ -45,7 +45,7 @@ schema danswer_chunk {
|
|||||||
}
|
}
|
||||||
# Content embeddings (chunk + optional mini chunks embeddings)
|
# Content embeddings (chunk + optional mini chunks embeddings)
|
||||||
# "t" and "x" are arbitrary names, not special keywords
|
# "t" and "x" are arbitrary names, not special keywords
|
||||||
field embeddings type tensor<float>(t{},x[384]) {
|
field embeddings type tensor<float>(t{},x[VARIABLE_DIM]) {
|
||||||
indexing: attribute
|
indexing: attribute
|
||||||
attribute {
|
attribute {
|
||||||
distance-metric: angular
|
distance-metric: angular
|
||||||
@ -143,7 +143,7 @@ schema danswer_chunk {
|
|||||||
|
|
||||||
rank-profile hybrid_search inherits default, default_rank {
|
rank-profile hybrid_search inherits default, default_rank {
|
||||||
inputs {
|
inputs {
|
||||||
query(query_embedding) tensor<float>(x[384])
|
query(query_embedding) tensor<float>(x[VARIABLE_DIM])
|
||||||
}
|
}
|
||||||
|
|
||||||
# This must be separate function for normalize_linear to work
|
# This must be separate function for normalize_linear to work
|
||||||
@ -224,7 +224,7 @@ schema danswer_chunk {
|
|||||||
|
|
||||||
rank-profile semantic_search inherits default, default_rank {
|
rank-profile semantic_search inherits default, default_rank {
|
||||||
inputs {
|
inputs {
|
||||||
query(query_embedding) tensor<float>(x[384])
|
query(query_embedding) tensor<float>(x[VARIABLE_DIM])
|
||||||
}
|
}
|
||||||
|
|
||||||
first-phase {
|
first-phase {
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
import io
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import string
|
import string
|
||||||
import time
|
import time
|
||||||
|
import zipfile
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
@ -9,6 +12,7 @@ from datetime import datetime
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from typing import BinaryIO
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
@ -49,6 +53,7 @@ from danswer.configs.constants import SOURCE_TYPE
|
|||||||
from danswer.configs.constants import TITLE
|
from danswer.configs.constants import TITLE
|
||||||
from danswer.configs.constants import TITLE_EMBEDDING
|
from danswer.configs.constants import TITLE_EMBEDDING
|
||||||
from danswer.configs.constants import TITLE_SEPARATOR
|
from danswer.configs.constants import TITLE_SEPARATOR
|
||||||
|
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||||
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
|
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||||
get_experts_stores_representations,
|
get_experts_stores_representations,
|
||||||
@ -70,7 +75,7 @@ from danswer.utils.threadpool_concurrency import run_functions_tuples_in_paralle
|
|||||||
|
|
||||||
logger = setup_logger()
|
logger = setup_logger()
|
||||||
|
|
||||||
|
VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM"
|
||||||
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_HOST}:{VESPA_TENANT_PORT}"
|
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_HOST}:{VESPA_TENANT_PORT}"
|
||||||
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
|
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
|
||||||
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
|
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
|
||||||
@ -566,6 +571,15 @@ def _inference_chunk_by_vespa_id(vespa_id: str) -> InferenceChunk:
|
|||||||
return _vespa_hit_to_inference_chunk(res.json())
|
return _vespa_hit_to_inference_chunk(res.json())
|
||||||
|
|
||||||
|
|
||||||
|
def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
|
||||||
|
zip_buffer = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
|
||||||
|
for filename, content in file_contents.items():
|
||||||
|
zipf.writestr(filename, content)
|
||||||
|
zip_buffer.seek(0)
|
||||||
|
return zip_buffer
|
||||||
|
|
||||||
|
|
||||||
class VespaIndex(DocumentIndex):
|
class VespaIndex(DocumentIndex):
|
||||||
yql_base = (
|
yql_base = (
|
||||||
f"select "
|
f"select "
|
||||||
@ -593,7 +607,7 @@ class VespaIndex(DocumentIndex):
|
|||||||
# to be updated + zipped + deployed, not supporting the option for simplicity
|
# to be updated + zipped + deployed, not supporting the option for simplicity
|
||||||
self.deployment_zip = deployment_zip
|
self.deployment_zip = deployment_zip
|
||||||
|
|
||||||
def ensure_indices_exist(self) -> None:
|
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
|
||||||
"""Verifying indices is more involved as there is no good way to
|
"""Verifying indices is more involved as there is no good way to
|
||||||
verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that
|
verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that
|
||||||
the index is up-to-date with the expected schema and this does not erase the existing index.
|
the index is up-to-date with the expected schema and this does not erase the existing index.
|
||||||
@ -601,13 +615,34 @@ class VespaIndex(DocumentIndex):
|
|||||||
"""
|
"""
|
||||||
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
|
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
|
||||||
logger.debug(f"Sending Vespa zip to {deploy_url}")
|
logger.debug(f"Sending Vespa zip to {deploy_url}")
|
||||||
|
|
||||||
|
vespa_schema_path = os.path.join(
|
||||||
|
os.getcwd(), "danswer", "document_index", "vespa", "app_config"
|
||||||
|
)
|
||||||
|
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
|
||||||
|
services_file = os.path.join(vespa_schema_path, "services.xml")
|
||||||
|
|
||||||
|
with open(schema_file, "r") as schema_f:
|
||||||
|
schema = schema_f.read()
|
||||||
|
schema = schema.replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
|
||||||
|
schema_bytes = schema.encode("utf-8")
|
||||||
|
|
||||||
|
with open(services_file, "rb") as services_f:
|
||||||
|
services_bytes = services_f.read()
|
||||||
|
|
||||||
|
zip_dict = {
|
||||||
|
"schemas/danswer_chunk.sd": schema_bytes,
|
||||||
|
"services.xml": services_bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
zip_file = in_memory_zip_from_file_bytes(zip_dict)
|
||||||
|
|
||||||
headers = {"Content-Type": "application/zip"}
|
headers = {"Content-Type": "application/zip"}
|
||||||
with open(self.deployment_zip, "rb") as f:
|
response = requests.post(deploy_url, headers=headers, data=zip_file)
|
||||||
response = requests.post(deploy_url, headers=headers, data=f)
|
if response.status_code != 200:
|
||||||
if response.status_code != 200:
|
raise RuntimeError(
|
||||||
raise RuntimeError(
|
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
|
||||||
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
|
)
|
||||||
)
|
|
||||||
|
|
||||||
def index(
|
def index(
|
||||||
self,
|
self,
|
||||||
|
Reference in New Issue
Block a user