diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 6e05c97fe..90751a309 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -11,6 +11,7 @@ from datetime import timezone from typing import Any from typing import cast +import httpx import requests from requests import HTTPError from requests import Response @@ -97,11 +98,12 @@ class _VespaUpdateRequest: @retry(tries=3, delay=1, backoff=2) def _does_document_exist( doc_chunk_id: str, + http_client: httpx.Client, ) -> bool: """Returns whether the document already exists and the users/group whitelists Specifically in this case, document refers to a vespa document which is equivalent to a Danswer chunk. This checks for whether the chunk exists already in the index""" - doc_fetch_response = requests.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}") + doc_fetch_response = http_client.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}") if doc_fetch_response.status_code == 404: return False @@ -157,16 +159,17 @@ def _get_vespa_chunk_ids_by_document_id( @retry(tries=3, delay=1, backoff=2) -def _delete_vespa_doc_chunks(document_id: str) -> None: +def _delete_vespa_doc_chunks(document_id: str, http_client: httpx.Client) -> None: doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id) for chunk_id in doc_chunk_ids: - res = requests.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}") + res = http_client.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}") res.raise_for_status() def _delete_vespa_docs( document_ids: list[str], + http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> None: external_executor = True @@ -177,7 +180,7 @@ def _delete_vespa_docs( try: doc_deletion_future = { - executor.submit(_delete_vespa_doc_chunks, doc_id): doc_id + executor.submit(_delete_vespa_doc_chunks, doc_id, http_client): doc_id for doc_id in document_ids } for future in concurrent.futures.as_completed(doc_deletion_future): @@ -191,6 +194,7 @@ def _delete_vespa_docs( def _get_existing_documents_from_chunks( chunks: list[DocMetadataAwareIndexChunk], + http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> set[str]: external_executor = True @@ -203,7 +207,7 @@ def _get_existing_documents_from_chunks( try: chunk_existence_future = { executor.submit( - _does_document_exist, str(get_uuid_from_chunk(chunk)) + _does_document_exist, str(get_uuid_from_chunk(chunk)), http_client ): chunk for chunk in chunks } @@ -221,7 +225,9 @@ def _get_existing_documents_from_chunks( @retry(tries=3, delay=1, backoff=2) -def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None: +def _index_vespa_chunk( + chunk: DocMetadataAwareIndexChunk, http_client: httpx.Client +) -> None: json_header = { "Content-Type": "application/json", } @@ -264,9 +270,9 @@ def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None: headers: dict[str, str], fields: dict[str, Any], log_error: bool = True, - ) -> Response: + ) -> httpx.Response: logger.debug(f'Indexing to URL "{url}"') - res = requests.post(url, headers=headers, json={"fields": fields}) + res = http_client.post(url, headers=headers, json={"fields": fields}) try: res.raise_for_status() return res @@ -314,6 +320,7 @@ def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None: def _batch_index_vespa_chunks( chunks: list[DocMetadataAwareIndexChunk], + http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, ) -> None: external_executor = True @@ -324,7 +331,8 @@ def _batch_index_vespa_chunks( try: chunk_index_future = { - executor.submit(_index_vespa_chunk, chunk): chunk for chunk in chunks + executor.submit(_index_vespa_chunk, chunk, http_client): chunk + for chunk in chunks } for future in concurrent.futures.as_completed(chunk_index_future): # Will raise exception if any indexing raised an exception @@ -344,22 +352,31 @@ def _clear_and_index_vespa_chunks( chunks will be kept""" existing_docs: set[str] = set() - with concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor: + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for + # indexing / updates / deletes since we have to make a large volume of requests. + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor, + httpx.Client(http2=True) as http_client, + ): # Check for existing documents, existing documents need to have all of their chunks deleted # prior to indexing as the document size (num chunks) may have shrunk first_chunks = [chunk for chunk in chunks if chunk.chunk_id == 0] for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE): existing_docs.update( _get_existing_documents_from_chunks( - chunks=chunk_batch, executor=executor + chunks=chunk_batch, http_client=http_client, executor=executor ) ) for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE): - _delete_vespa_docs(document_ids=doc_id_batch, executor=executor) + _delete_vespa_docs( + document_ids=doc_id_batch, http_client=http_client, executor=executor + ) for chunk_batch in batch_generator(chunks, _BATCH_SIZE): - _batch_index_vespa_chunks(chunks=chunk_batch, executor=executor) + _batch_index_vespa_chunks( + chunks=chunk_batch, http_client=http_client, executor=executor + ) all_doc_ids = {chunk.source_document.id for chunk in chunks} @@ -621,25 +638,30 @@ class VespaIndex(DocumentIndex): ) -> None: """Runs a batch of updates in parallel via the ThreadPoolExecutor.""" - def _update_chunk(update: _VespaUpdateRequest) -> Response: - update_body = json.dumps(update.update_request) + def _update_chunk( + update: _VespaUpdateRequest, http_client: httpx.Client + ) -> httpx.Response: logger.debug( - f"Updating with request to {update.url} with body {update_body}" + f"Updating with request to {update.url} with body {update.update_request}" ) - return requests.put( + return http_client.put( update.url, headers={"Content-Type": "application/json"}, - data=update_body, + json=update.update_request, ) - with concurrent.futures.ThreadPoolExecutor( - max_workers=_NUM_THREADS - ) as executor: + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for + # indexing / updates / deletes since we have to make a large volume of requests. + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor, + httpx.Client(http2=True) as http_client, + ): for update_batch in batch_generator(updates, batch_size): future_to_document_id = { executor.submit( _update_chunk, update, + http_client, ): update.document_id for update in update_batch } @@ -696,7 +718,11 @@ class VespaIndex(DocumentIndex): def delete(self, doc_ids: list[str]) -> None: logger.info(f"Deleting {len(doc_ids)} documents from Vespa") - _delete_vespa_docs(doc_ids) + + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for + # indexing / updates / deletes since we have to make a large volume of requests. + with httpx.Client(http2=True) as http_client: + _delete_vespa_docs(document_ids=doc_ids, http_client=http_client) def id_based_retrieval( self, document_id: str, chunk_ind: int | None, filters: IndexFilters diff --git a/web/src/app/admin/connector/[ccPairId]/page.tsx b/web/src/app/admin/connector/[ccPairId]/page.tsx index 3af0ff7bd..96020d060 100644 --- a/web/src/app/admin/connector/[ccPairId]/page.tsx +++ b/web/src/app/admin/connector/[ccPairId]/page.tsx @@ -42,7 +42,7 @@ function Main({ ccPairId }: { ccPairId: number }) { return ( ); }