push vespa managed service configs (#2857)

* push vespa managed service configs

* organize

* k

* k

* k

* nit

* k

* minor cleanup

* ensure no unnecessary timeout
This commit is contained in:
pablodanswer
2024-10-19 16:43:26 -07:00
committed by GitHub
parent 457e7992a4
commit eaaa135f90
7 changed files with 70 additions and 18 deletions

View File

@@ -115,10 +115,16 @@ VESPA_HOST = os.environ.get("VESPA_HOST") or "localhost"
VESPA_CONFIG_SERVER_HOST = os.environ.get("VESPA_CONFIG_SERVER_HOST") or VESPA_HOST VESPA_CONFIG_SERVER_HOST = os.environ.get("VESPA_CONFIG_SERVER_HOST") or VESPA_HOST
VESPA_PORT = os.environ.get("VESPA_PORT") or "8081" VESPA_PORT = os.environ.get("VESPA_PORT") or "8081"
VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071" VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071"
VESPA_CLOUD_URL = os.environ.get("VESPA_CLOUD_URL", "")
# The default below is for dockerized deployment # The default below is for dockerized deployment
VESPA_DEPLOYMENT_ZIP = ( VESPA_DEPLOYMENT_ZIP = (
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip" os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
) )
VESPA_CLOUD_CERT_PATH = os.environ.get("VESPA_CLOUD_CERT_PATH")
VESPA_CLOUD_KEY_PATH = os.environ.get("VESPA_CLOUD_KEY_PATH")
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder) # Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
try: try:
INDEX_BATCH_SIZE = int(os.environ.get("INDEX_BATCH_SIZE", 16)) INDEX_BATCH_SIZE = int(os.environ.get("INDEX_BATCH_SIZE", 16))
@@ -428,6 +434,10 @@ AZURE_DALLE_DEPLOYMENT_NAME = os.environ.get("AZURE_DALLE_DEPLOYMENT_NAME")
# Multi-tenancy configuration # Multi-tenancy configuration
MULTI_TENANT = os.environ.get("MULTI_TENANT", "").lower() == "true" MULTI_TENANT = os.environ.get("MULTI_TENANT", "").lower() == "true"
# Use managed Vespa (Vespa Cloud). If set, must also set VESPA_CLOUD_URL, VESPA_CLOUD_CERT_PATH and VESPA_CLOUD_KEY_PATH
MANAGED_VESPA = os.environ.get("MANAGED_VESPA", "").lower() == "true"
ENABLE_EMAIL_INVITES = os.environ.get("ENABLE_EMAIL_INVITES", "").lower() == "true" ENABLE_EMAIL_INVITES = os.environ.get("ENABLE_EMAIL_INVITES", "").lower() == "true"
# Security and authentication # Security and authentication

View File

@@ -28,6 +28,7 @@ from danswer.file_processing.extract_file_text import read_pdf_file
from danswer.file_processing.extract_file_text import read_text_file from danswer.file_processing.extract_file_text import read_text_file
from danswer.file_store.file_store import get_default_file_store from danswer.file_store.file_store import get_default_file_store
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
from shared_configs.configs import current_tenant_id
logger = setup_logger() logger = setup_logger()
@@ -174,6 +175,8 @@ class LocalFileConnector(LoadConnector):
def load_from_state(self) -> GenerateDocumentsOutput: def load_from_state(self) -> GenerateDocumentsOutput:
documents: list[Document] = [] documents: list[Document] = []
token = current_tenant_id.set(self.tenant_id)
with get_session_with_tenant(self.tenant_id) as db_session: with get_session_with_tenant(self.tenant_id) as db_session:
for file_path in self.file_locations: for file_path in self.file_locations:
current_datetime = datetime.now(timezone.utc) current_datetime = datetime.now(timezone.utc)
@@ -196,6 +199,8 @@ class LocalFileConnector(LoadConnector):
if documents: if documents:
yield documents yield documents
current_tenant_id.reset(token)
if __name__ == "__main__": if __name__ == "__main__":
connector = LocalFileConnector(file_locations=[os.environ["TEST_FILE"]]) connector = LocalFileConnector(file_locations=[os.environ["TEST_FILE"]])

View File

@@ -7,11 +7,13 @@ from datetime import timezone
from typing import Any from typing import Any
from typing import cast from typing import cast
import httpx
import requests import requests
from retry import retry from retry import retry
from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION
from danswer.document_index.interfaces import VespaChunkRequest from danswer.document_index.interfaces import VespaChunkRequest
from danswer.document_index.vespa.shared_utils.utils import get_vespa_http_client
from danswer.document_index.vespa.shared_utils.vespa_request_builders import ( from danswer.document_index.vespa.shared_utils.vespa_request_builders import (
build_vespa_filters, build_vespa_filters,
) )
@@ -293,13 +295,12 @@ def query_vespa(
if LOG_VESPA_TIMING_INFORMATION if LOG_VESPA_TIMING_INFORMATION
else {}, else {},
) )
try: try:
response = requests.post( with get_vespa_http_client() as http_client:
SEARCH_ENDPOINT, response = http_client.post(SEARCH_ENDPOINT, json=params)
json=params, response.raise_for_status()
) except httpx.HTTPError as e:
response.raise_for_status()
except requests.HTTPError as e:
request_info = f"Headers: {response.request.headers}\nPayload: {params}" request_info = f"Headers: {response.request.headers}\nPayload: {params}"
response_info = ( response_info = (
f"Status Code: {response.status_code}\n" f"Status Code: {response.status_code}\n"
@@ -312,9 +313,10 @@ def query_vespa(
f"{response_info}\n" f"{response_info}\n"
f"Exception: {e}" f"Exception: {e}"
) )
raise requests.HTTPError(error_base) from e raise httpx.HTTPError(error_base) from e
response_json: dict[str, Any] = response.json() response_json: dict[str, Any] = response.json()
if LOG_VESPA_TIMING_INFORMATION: if LOG_VESPA_TIMING_INFORMATION:
logger.debug("Vespa timing info: %s", response_json.get("timing")) logger.debug("Vespa timing info: %s", response_json.get("timing"))
hits = response_json["root"].get("children", []) hits = response_json["root"].get("children", [])

View File

@@ -18,7 +18,6 @@ import requests # type: ignore
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
from danswer.configs.app_configs import MULTI_TENANT from danswer.configs.app_configs import MULTI_TENANT
from danswer.configs.app_configs import VESPA_REQUEST_TIMEOUT
from danswer.configs.chat_configs import DOC_TIME_DECAY from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import NUM_RETURNED_HITS from danswer.configs.chat_configs import NUM_RETURNED_HITS
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
@@ -43,6 +42,7 @@ from danswer.document_index.vespa.indexing_utils import clean_chunk_id_copy
from danswer.document_index.vespa.indexing_utils import ( from danswer.document_index.vespa.indexing_utils import (
get_existing_documents_from_chunks, get_existing_documents_from_chunks,
) )
from danswer.document_index.vespa.shared_utils.utils import get_vespa_http_client
from danswer.document_index.vespa.shared_utils.utils import ( from danswer.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters, replace_invalid_doc_id_characters,
) )
@@ -133,6 +133,7 @@ class VespaIndex(DocumentIndex):
self.index_name = index_name self.index_name = index_name
self.secondary_index_name = secondary_index_name self.secondary_index_name = secondary_index_name
self.multitenant = multitenant self.multitenant = multitenant
self.http_client = get_vespa_http_client()
def ensure_indices_exist( def ensure_indices_exist(
self, self,
@@ -319,7 +320,7 @@ class VespaIndex(DocumentIndex):
# indexing / updates / deletes since we have to make a large volume of requests. # indexing / updates / deletes since we have to make a large volume of requests.
with ( with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client, get_vespa_http_client() as http_client,
): ):
# Check for existing documents, existing documents need to have all of their chunks deleted # Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk # prior to indexing as the document size (num chunks) may have shrunk
@@ -382,9 +383,10 @@ class VespaIndex(DocumentIndex):
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests. # indexing / updates / deletes since we have to make a large volume of requests.
with ( with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client, get_vespa_http_client() as http_client,
): ):
for update_batch in batch_generator(updates, batch_size): for update_batch in batch_generator(updates, batch_size):
future_to_document_id = { future_to_document_id = {
@@ -528,7 +530,7 @@ class VespaIndex(DocumentIndex):
if self.secondary_index_name: if self.secondary_index_name:
index_names.append(self.secondary_index_name) index_names.append(self.secondary_index_name)
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: with get_vespa_http_client() as http_client:
for index_name in index_names: for index_name in index_names:
params = httpx.QueryParams( params = httpx.QueryParams(
{ {
@@ -584,7 +586,7 @@ class VespaIndex(DocumentIndex):
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests. # indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: with get_vespa_http_client() as http_client:
index_names = [self.index_name] index_names = [self.index_name]
if self.secondary_index_name: if self.secondary_index_name:
index_names.append(self.secondary_index_name) index_names.append(self.secondary_index_name)
@@ -612,7 +614,7 @@ class VespaIndex(DocumentIndex):
if self.secondary_index_name: if self.secondary_index_name:
index_names.append(self.secondary_index_name) index_names.append(self.secondary_index_name)
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: with get_vespa_http_client() as http_client:
for index_name in index_names: for index_name in index_names:
params = httpx.QueryParams( params = httpx.QueryParams(
{ {
@@ -822,7 +824,7 @@ class VespaIndex(DocumentIndex):
f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}" f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}"
) )
with httpx.Client(http2=True) as http_client: with get_vespa_http_client(no_timeout=True) as http_client:
response = http_client.get(url, params=query_params) response = http_client.get(url, params=query_params)
response.raise_for_status() response.raise_for_status()
@@ -871,7 +873,7 @@ class VespaIndex(DocumentIndex):
logger.debug(f"Starting batch deletion for {len(delete_requests)} documents") logger.debug(f"Starting batch deletion for {len(delete_requests)} documents")
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
with httpx.Client(http2=True) as http_client: with get_vespa_http_client(no_timeout=True) as http_client:
for batch_start in range(0, len(delete_requests), batch_size): for batch_start in range(0, len(delete_requests), batch_size):
batch = delete_requests[batch_start : batch_start + batch_size] batch = delete_requests[batch_start : batch_start + batch_size]

View File

@@ -1,4 +1,12 @@
import re import re
from typing import cast
import httpx
from danswer.configs.app_configs import MANAGED_VESPA
from danswer.configs.app_configs import VESPA_CLOUD_CERT_PATH
from danswer.configs.app_configs import VESPA_CLOUD_KEY_PATH
from danswer.configs.app_configs import VESPA_REQUEST_TIMEOUT
# NOTE: This does not seem to be used in reality despite the Vespa Docs pointing to this code # NOTE: This does not seem to be used in reality despite the Vespa Docs pointing to this code
# See here for reference: https://docs.vespa.ai/en/documents.html # See here for reference: https://docs.vespa.ai/en/documents.html
@@ -45,3 +53,19 @@ def remove_invalid_unicode_chars(text: str) -> str:
"[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]" "[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]"
) )
return _illegal_xml_chars_RE.sub("", text) return _illegal_xml_chars_RE.sub("", text)
def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client:
"""
Configure and return an HTTP client for communicating with Vespa,
including authentication if needed.
"""
return httpx.Client(
cert=cast(tuple[str, str], (VESPA_CLOUD_CERT_PATH, VESPA_CLOUD_KEY_PATH))
if MANAGED_VESPA
else None,
verify=False if not MANAGED_VESPA else True,
timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT,
http2=True,
)

View File

@@ -1,3 +1,4 @@
from danswer.configs.app_configs import VESPA_CLOUD_URL
from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST
from danswer.configs.app_configs import VESPA_HOST from danswer.configs.app_configs import VESPA_HOST
from danswer.configs.app_configs import VESPA_PORT from danswer.configs.app_configs import VESPA_PORT
@@ -18,15 +19,21 @@ TENANT_ID_REPLACEMENT = """field tenant_id type string {
attribute: fast-search attribute: fast-search
}""" }"""
# config server # config server
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
VESPA_CONFIG_SERVER_URL = (
VESPA_CLOUD_URL or f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
)
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
# main search application # main search application
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}" VESPA_APP_CONTAINER_URL = VESPA_CLOUD_URL or f"http://{VESPA_HOST}:{VESPA_PORT}"
# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd # danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd
DOCUMENT_ID_ENDPOINT = ( DOCUMENT_ID_ENDPOINT = (
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid"
) )
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
NUM_THREADS = ( NUM_THREADS = (

View File

@@ -4,6 +4,7 @@ from sqlalchemy.orm import Session
from danswer.chat.load_yamls import load_chat_yamls from danswer.chat.load_yamls import load_chat_yamls
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.configs.app_configs import MANAGED_VESPA
from danswer.configs.app_configs import MULTI_TENANT from danswer.configs.app_configs import MULTI_TENANT
from danswer.configs.constants import KV_REINDEX_KEY from danswer.configs.constants import KV_REINDEX_KEY
from danswer.configs.constants import KV_SEARCH_SETTINGS from danswer.configs.constants import KV_SEARCH_SETTINGS
@@ -310,7 +311,8 @@ def update_default_multipass_indexing(db_session: Session) -> None:
def setup_multitenant_danswer() -> None: def setup_multitenant_danswer() -> None:
setup_vespa_multitenant(SUPPORTED_EMBEDDING_MODELS) if not MANAGED_VESPA:
setup_vespa_multitenant(SUPPORTED_EMBEDDING_MODELS)
def setup_vespa_multitenant(supported_indices: list[SupportedEmbeddingModel]) -> bool: def setup_vespa_multitenant(supported_indices: list[SupportedEmbeddingModel]) -> bool: