diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 0d8b7da70100..08caf7181f8d 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -115,10 +115,16 @@ VESPA_HOST = os.environ.get("VESPA_HOST") or "localhost" VESPA_CONFIG_SERVER_HOST = os.environ.get("VESPA_CONFIG_SERVER_HOST") or VESPA_HOST VESPA_PORT = os.environ.get("VESPA_PORT") or "8081" VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071" + +VESPA_CLOUD_URL = os.environ.get("VESPA_CLOUD_URL", "") + # The default below is for dockerized deployment VESPA_DEPLOYMENT_ZIP = ( os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip" ) +VESPA_CLOUD_CERT_PATH = os.environ.get("VESPA_CLOUD_CERT_PATH") +VESPA_CLOUD_KEY_PATH = os.environ.get("VESPA_CLOUD_KEY_PATH") + # Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder) try: INDEX_BATCH_SIZE = int(os.environ.get("INDEX_BATCH_SIZE", 16)) @@ -428,6 +434,10 @@ AZURE_DALLE_DEPLOYMENT_NAME = os.environ.get("AZURE_DALLE_DEPLOYMENT_NAME") # Multi-tenancy configuration MULTI_TENANT = os.environ.get("MULTI_TENANT", "").lower() == "true" + +# Use managed Vespa (Vespa Cloud). If set, must also set VESPA_CLOUD_URL, VESPA_CLOUD_CERT_PATH and VESPA_CLOUD_KEY_PATH +MANAGED_VESPA = os.environ.get("MANAGED_VESPA", "").lower() == "true" + ENABLE_EMAIL_INVITES = os.environ.get("ENABLE_EMAIL_INVITES", "").lower() == "true" # Security and authentication diff --git a/backend/danswer/connectors/file/connector.py b/backend/danswer/connectors/file/connector.py index 106fed8b2afe..9992159eb356 100644 --- a/backend/danswer/connectors/file/connector.py +++ b/backend/danswer/connectors/file/connector.py @@ -28,6 +28,7 @@ from danswer.file_processing.extract_file_text import read_pdf_file from danswer.file_processing.extract_file_text import read_text_file from danswer.file_store.file_store import get_default_file_store from danswer.utils.logger import setup_logger +from shared_configs.configs import current_tenant_id logger = setup_logger() @@ -174,6 +175,8 @@ class LocalFileConnector(LoadConnector): def load_from_state(self) -> GenerateDocumentsOutput: documents: list[Document] = [] + token = current_tenant_id.set(self.tenant_id) + with get_session_with_tenant(self.tenant_id) as db_session: for file_path in self.file_locations: current_datetime = datetime.now(timezone.utc) @@ -196,6 +199,8 @@ class LocalFileConnector(LoadConnector): if documents: yield documents + current_tenant_id.reset(token) + if __name__ == "__main__": connector = LocalFileConnector(file_locations=[os.environ["TEST_FILE"]]) diff --git a/backend/danswer/document_index/vespa/chunk_retrieval.py b/backend/danswer/document_index/vespa/chunk_retrieval.py index e4b2ad83ce2c..3de64cbc8403 100644 --- a/backend/danswer/document_index/vespa/chunk_retrieval.py +++ b/backend/danswer/document_index/vespa/chunk_retrieval.py @@ -7,11 +7,13 @@ from datetime import timezone from typing import Any from typing import cast +import httpx import requests from retry import retry from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.vespa.shared_utils.utils import get_vespa_http_client from danswer.document_index.vespa.shared_utils.vespa_request_builders import ( build_vespa_filters, ) @@ -293,13 +295,12 @@ def query_vespa( if LOG_VESPA_TIMING_INFORMATION else {}, ) + try: - response = requests.post( - SEARCH_ENDPOINT, - json=params, - ) - response.raise_for_status() - except requests.HTTPError as e: + with get_vespa_http_client() as http_client: + response = http_client.post(SEARCH_ENDPOINT, json=params) + response.raise_for_status() + except httpx.HTTPError as e: request_info = f"Headers: {response.request.headers}\nPayload: {params}" response_info = ( f"Status Code: {response.status_code}\n" @@ -312,9 +313,10 @@ def query_vespa( f"{response_info}\n" f"Exception: {e}" ) - raise requests.HTTPError(error_base) from e + raise httpx.HTTPError(error_base) from e response_json: dict[str, Any] = response.json() + if LOG_VESPA_TIMING_INFORMATION: logger.debug("Vespa timing info: %s", response_json.get("timing")) hits = response_json["root"].get("children", []) diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index d71d198aea78..86bc481e5733 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -18,7 +18,6 @@ import requests # type: ignore from danswer.configs.app_configs import DOCUMENT_INDEX_NAME from danswer.configs.app_configs import MULTI_TENANT -from danswer.configs.app_configs import VESPA_REQUEST_TIMEOUT from danswer.configs.chat_configs import DOC_TIME_DECAY from danswer.configs.chat_configs import NUM_RETURNED_HITS from danswer.configs.chat_configs import TITLE_CONTENT_RATIO @@ -43,6 +42,7 @@ from danswer.document_index.vespa.indexing_utils import clean_chunk_id_copy from danswer.document_index.vespa.indexing_utils import ( get_existing_documents_from_chunks, ) +from danswer.document_index.vespa.shared_utils.utils import get_vespa_http_client from danswer.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, ) @@ -133,6 +133,7 @@ class VespaIndex(DocumentIndex): self.index_name = index_name self.secondary_index_name = secondary_index_name self.multitenant = multitenant + self.http_client = get_vespa_http_client() def ensure_indices_exist( self, @@ -319,7 +320,7 @@ class VespaIndex(DocumentIndex): # indexing / updates / deletes since we have to make a large volume of requests. with ( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, - httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client, + get_vespa_http_client() as http_client, ): # Check for existing documents, existing documents need to have all of their chunks deleted # prior to indexing as the document size (num chunks) may have shrunk @@ -382,9 +383,10 @@ class VespaIndex(DocumentIndex): # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for # indexing / updates / deletes since we have to make a large volume of requests. + with ( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, - httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client, + get_vespa_http_client() as http_client, ): for update_batch in batch_generator(updates, batch_size): future_to_document_id = { @@ -528,7 +530,7 @@ class VespaIndex(DocumentIndex): if self.secondary_index_name: index_names.append(self.secondary_index_name) - with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: + with get_vespa_http_client() as http_client: for index_name in index_names: params = httpx.QueryParams( { @@ -584,7 +586,7 @@ class VespaIndex(DocumentIndex): # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for # indexing / updates / deletes since we have to make a large volume of requests. - with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: + with get_vespa_http_client() as http_client: index_names = [self.index_name] if self.secondary_index_name: index_names.append(self.secondary_index_name) @@ -612,7 +614,7 @@ class VespaIndex(DocumentIndex): if self.secondary_index_name: index_names.append(self.secondary_index_name) - with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client: + with get_vespa_http_client() as http_client: for index_name in index_names: params = httpx.QueryParams( { @@ -822,7 +824,7 @@ class VespaIndex(DocumentIndex): f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}" ) - with httpx.Client(http2=True) as http_client: + with get_vespa_http_client(no_timeout=True) as http_client: response = http_client.get(url, params=query_params) response.raise_for_status() @@ -871,7 +873,7 @@ class VespaIndex(DocumentIndex): logger.debug(f"Starting batch deletion for {len(delete_requests)} documents") with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: - with httpx.Client(http2=True) as http_client: + with get_vespa_http_client(no_timeout=True) as http_client: for batch_start in range(0, len(delete_requests), batch_size): batch = delete_requests[batch_start : batch_start + batch_size] diff --git a/backend/danswer/document_index/vespa/shared_utils/utils.py b/backend/danswer/document_index/vespa/shared_utils/utils.py index c74afc9a6294..49fdd680198c 100644 --- a/backend/danswer/document_index/vespa/shared_utils/utils.py +++ b/backend/danswer/document_index/vespa/shared_utils/utils.py @@ -1,4 +1,12 @@ import re +from typing import cast + +import httpx + +from danswer.configs.app_configs import MANAGED_VESPA +from danswer.configs.app_configs import VESPA_CLOUD_CERT_PATH +from danswer.configs.app_configs import VESPA_CLOUD_KEY_PATH +from danswer.configs.app_configs import VESPA_REQUEST_TIMEOUT # NOTE: This does not seem to be used in reality despite the Vespa Docs pointing to this code # See here for reference: https://docs.vespa.ai/en/documents.html @@ -45,3 +53,19 @@ def remove_invalid_unicode_chars(text: str) -> str: "[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]" ) return _illegal_xml_chars_RE.sub("", text) + + +def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client: + """ + Configure and return an HTTP client for communicating with Vespa, + including authentication if needed. + """ + + return httpx.Client( + cert=cast(tuple[str, str], (VESPA_CLOUD_CERT_PATH, VESPA_CLOUD_KEY_PATH)) + if MANAGED_VESPA + else None, + verify=False if not MANAGED_VESPA else True, + timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT, + http2=True, + ) diff --git a/backend/danswer/document_index/vespa_constants.py b/backend/danswer/document_index/vespa_constants.py index a4d6aa52e2f5..d4a36ef97251 100644 --- a/backend/danswer/document_index/vespa_constants.py +++ b/backend/danswer/document_index/vespa_constants.py @@ -1,3 +1,4 @@ +from danswer.configs.app_configs import VESPA_CLOUD_URL from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST from danswer.configs.app_configs import VESPA_HOST from danswer.configs.app_configs import VESPA_PORT @@ -18,15 +19,21 @@ TENANT_ID_REPLACEMENT = """field tenant_id type string { attribute: fast-search }""" # config server -VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}" + + +VESPA_CONFIG_SERVER_URL = ( + VESPA_CLOUD_URL or f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}" +) VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" # main search application -VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}" +VESPA_APP_CONTAINER_URL = VESPA_CLOUD_URL or f"http://{VESPA_HOST}:{VESPA_PORT}" + # danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd DOCUMENT_ID_ENDPOINT = ( f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" ) + SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" NUM_THREADS = ( diff --git a/backend/danswer/setup.py b/backend/danswer/setup.py index 84f0382f9efe..747fe8094519 100644 --- a/backend/danswer/setup.py +++ b/backend/danswer/setup.py @@ -4,6 +4,7 @@ from sqlalchemy.orm import Session from danswer.chat.load_yamls import load_chat_yamls from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP +from danswer.configs.app_configs import MANAGED_VESPA from danswer.configs.app_configs import MULTI_TENANT from danswer.configs.constants import KV_REINDEX_KEY from danswer.configs.constants import KV_SEARCH_SETTINGS @@ -310,7 +311,8 @@ def update_default_multipass_indexing(db_session: Session) -> None: def setup_multitenant_danswer() -> None: - setup_vespa_multitenant(SUPPORTED_EMBEDDING_MODELS) + if not MANAGED_VESPA: + setup_vespa_multitenant(SUPPORTED_EMBEDDING_MODELS) def setup_vespa_multitenant(supported_indices: list[SupportedEmbeddingModel]) -> bool: