Adding http2 support to Vespa

This commit is contained in:
Weves 2023-12-23 16:13:38 -08:00 committed by Chris Weaver
parent 535dc265c5
commit dca4f7a72b
2 changed files with 49 additions and 23 deletions

View File

@ -11,6 +11,7 @@ from datetime import timezone
from typing import Any from typing import Any
from typing import cast from typing import cast
import httpx
import requests import requests
from requests import HTTPError from requests import HTTPError
from requests import Response from requests import Response
@ -97,11 +98,12 @@ class _VespaUpdateRequest:
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _does_document_exist( def _does_document_exist(
doc_chunk_id: str, doc_chunk_id: str,
http_client: httpx.Client,
) -> bool: ) -> bool:
"""Returns whether the document already exists and the users/group whitelists """Returns whether the document already exists and the users/group whitelists
Specifically in this case, document refers to a vespa document which is equivalent to a Danswer Specifically in this case, document refers to a vespa document which is equivalent to a Danswer
chunk. This checks for whether the chunk exists already in the index""" chunk. This checks for whether the chunk exists already in the index"""
doc_fetch_response = requests.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}") doc_fetch_response = http_client.get(f"{DOCUMENT_ID_ENDPOINT}/{doc_chunk_id}")
if doc_fetch_response.status_code == 404: if doc_fetch_response.status_code == 404:
return False return False
@ -157,16 +159,17 @@ def _get_vespa_chunk_ids_by_document_id(
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _delete_vespa_doc_chunks(document_id: str) -> None: def _delete_vespa_doc_chunks(document_id: str, http_client: httpx.Client) -> None:
doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id) doc_chunk_ids = _get_vespa_chunk_ids_by_document_id(document_id)
for chunk_id in doc_chunk_ids: for chunk_id in doc_chunk_ids:
res = requests.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}") res = http_client.delete(f"{DOCUMENT_ID_ENDPOINT}/{chunk_id}")
res.raise_for_status() res.raise_for_status()
def _delete_vespa_docs( def _delete_vespa_docs(
document_ids: list[str], document_ids: list[str],
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None, executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None: ) -> None:
external_executor = True external_executor = True
@ -177,7 +180,7 @@ def _delete_vespa_docs(
try: try:
doc_deletion_future = { doc_deletion_future = {
executor.submit(_delete_vespa_doc_chunks, doc_id): doc_id executor.submit(_delete_vespa_doc_chunks, doc_id, http_client): doc_id
for doc_id in document_ids for doc_id in document_ids
} }
for future in concurrent.futures.as_completed(doc_deletion_future): for future in concurrent.futures.as_completed(doc_deletion_future):
@ -191,6 +194,7 @@ def _delete_vespa_docs(
def _get_existing_documents_from_chunks( def _get_existing_documents_from_chunks(
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None, executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> set[str]: ) -> set[str]:
external_executor = True external_executor = True
@ -203,7 +207,7 @@ def _get_existing_documents_from_chunks(
try: try:
chunk_existence_future = { chunk_existence_future = {
executor.submit( executor.submit(
_does_document_exist, str(get_uuid_from_chunk(chunk)) _does_document_exist, str(get_uuid_from_chunk(chunk)), http_client
): chunk ): chunk
for chunk in chunks for chunk in chunks
} }
@ -221,7 +225,9 @@ def _get_existing_documents_from_chunks(
@retry(tries=3, delay=1, backoff=2) @retry(tries=3, delay=1, backoff=2)
def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None: def _index_vespa_chunk(
chunk: DocMetadataAwareIndexChunk, http_client: httpx.Client
) -> None:
json_header = { json_header = {
"Content-Type": "application/json", "Content-Type": "application/json",
} }
@ -264,9 +270,9 @@ def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None:
headers: dict[str, str], headers: dict[str, str],
fields: dict[str, Any], fields: dict[str, Any],
log_error: bool = True, log_error: bool = True,
) -> Response: ) -> httpx.Response:
logger.debug(f'Indexing to URL "{url}"') logger.debug(f'Indexing to URL "{url}"')
res = requests.post(url, headers=headers, json={"fields": fields}) res = http_client.post(url, headers=headers, json={"fields": fields})
try: try:
res.raise_for_status() res.raise_for_status()
return res return res
@ -314,6 +320,7 @@ def _index_vespa_chunk(chunk: DocMetadataAwareIndexChunk) -> None:
def _batch_index_vespa_chunks( def _batch_index_vespa_chunks(
chunks: list[DocMetadataAwareIndexChunk], chunks: list[DocMetadataAwareIndexChunk],
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None, executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> None: ) -> None:
external_executor = True external_executor = True
@ -324,7 +331,8 @@ def _batch_index_vespa_chunks(
try: try:
chunk_index_future = { chunk_index_future = {
executor.submit(_index_vespa_chunk, chunk): chunk for chunk in chunks executor.submit(_index_vespa_chunk, chunk, http_client): chunk
for chunk in chunks
} }
for future in concurrent.futures.as_completed(chunk_index_future): for future in concurrent.futures.as_completed(chunk_index_future):
# Will raise exception if any indexing raised an exception # Will raise exception if any indexing raised an exception
@ -344,22 +352,31 @@ def _clear_and_index_vespa_chunks(
chunks will be kept""" chunks will be kept"""
existing_docs: set[str] = set() existing_docs: set[str] = set()
with concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor: # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
):
# Check for existing documents, existing documents need to have all of their chunks deleted # Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk # prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in chunks if chunk.chunk_id == 0] first_chunks = [chunk for chunk in chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE): for chunk_batch in batch_generator(first_chunks, _BATCH_SIZE):
existing_docs.update( existing_docs.update(
_get_existing_documents_from_chunks( _get_existing_documents_from_chunks(
chunks=chunk_batch, executor=executor chunks=chunk_batch, http_client=http_client, executor=executor
) )
) )
for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE): for doc_id_batch in batch_generator(existing_docs, _BATCH_SIZE):
_delete_vespa_docs(document_ids=doc_id_batch, executor=executor) _delete_vespa_docs(
document_ids=doc_id_batch, http_client=http_client, executor=executor
)
for chunk_batch in batch_generator(chunks, _BATCH_SIZE): for chunk_batch in batch_generator(chunks, _BATCH_SIZE):
_batch_index_vespa_chunks(chunks=chunk_batch, executor=executor) _batch_index_vespa_chunks(
chunks=chunk_batch, http_client=http_client, executor=executor
)
all_doc_ids = {chunk.source_document.id for chunk in chunks} all_doc_ids = {chunk.source_document.id for chunk in chunks}
@ -621,25 +638,30 @@ class VespaIndex(DocumentIndex):
) -> None: ) -> None:
"""Runs a batch of updates in parallel via the ThreadPoolExecutor.""" """Runs a batch of updates in parallel via the ThreadPoolExecutor."""
def _update_chunk(update: _VespaUpdateRequest) -> Response: def _update_chunk(
update_body = json.dumps(update.update_request) update: _VespaUpdateRequest, http_client: httpx.Client
) -> httpx.Response:
logger.debug( logger.debug(
f"Updating with request to {update.url} with body {update_body}" f"Updating with request to {update.url} with body {update.update_request}"
) )
return requests.put( return http_client.put(
update.url, update.url,
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
data=update_body, json=update.update_request,
) )
with concurrent.futures.ThreadPoolExecutor( # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
max_workers=_NUM_THREADS # indexing / updates / deletes since we have to make a large volume of requests.
) as executor: with (
concurrent.futures.ThreadPoolExecutor(max_workers=_NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
):
for update_batch in batch_generator(updates, batch_size): for update_batch in batch_generator(updates, batch_size):
future_to_document_id = { future_to_document_id = {
executor.submit( executor.submit(
_update_chunk, _update_chunk,
update, update,
http_client,
): update.document_id ): update.document_id
for update in update_batch for update in update_batch
} }
@ -696,7 +718,11 @@ class VespaIndex(DocumentIndex):
def delete(self, doc_ids: list[str]) -> None: def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa") logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
_delete_vespa_docs(doc_ids)
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
_delete_vespa_docs(document_ids=doc_ids, http_client=http_client)
def id_based_retrieval( def id_based_retrieval(
self, document_id: str, chunk_ind: int | None, filters: IndexFilters self, document_id: str, chunk_ind: int | None, filters: IndexFilters

View File

@ -42,7 +42,7 @@ function Main({ ccPairId }: { ccPairId: number }) {
return ( return (
<ErrorCallout <ErrorCallout
errorTitle={`Failed to fetch info on Connector with ID ${ccPairId}`} errorTitle={`Failed to fetch info on Connector with ID ${ccPairId}`}
errorMsg={error.toString()} errorMsg={error?.info?.detail || error.toString()}
/> />
); );
} }