Merge pull request #3567 from onyx-dot-app/bugfix/revert_vespa

Revert "More efficient Vespa indexing (#3552)"
This commit is contained in:
rkuo-danswer 2024-12-31 09:47:00 -08:00 committed by GitHub
commit 2643782e30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 50 additions and 180 deletions

View File

@ -359,6 +359,7 @@ class WebConnector(LoadConnector):
continue
parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
doc_batch.append(
Document(
id=current_url,

View File

@ -504,7 +504,6 @@ class Document(Base):
last_synced: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, index=True
)
# The following are not attached to User because the account/email may not be known
# within Onyx
# Something like the document creator

View File

@ -148,6 +148,7 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
@ -165,9 +166,15 @@ class Indexable(abc.ABC):
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
Returns:
List of document ids which map to unique documents and are used for deduping chunks
when updating, as well as if the document is newly indexed or already existed and

View File

@ -10,9 +10,6 @@ schema DANSWER_CHUNK_NAME {
field chunk_id type int {
indexing: summary | attribute
}
field current_index_time type int {
indexing: summary | attribute
}
# Displayed in the UI as the main identifier for the doc
field semantic_identifier type string {
indexing: summary | attribute

View File

@ -42,15 +42,12 @@ from onyx.document_index.vespa.deletion import delete_vespa_docs
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
find_existing_docs_in_vespa_by_doc_id,
get_existing_documents_from_chunks,
)
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
)
from onyx.document_index.vespa.shared_utils.vespa_request_builders import (
build_deletion_selection_query,
)
from onyx.document_index.vespa.shared_utils.vespa_request_builders import (
build_vespa_filters,
)
@ -310,35 +307,47 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""
Index a list of chunks into Vespa. We rely on 'current_index_time'
to keep track of when each chunk was added/updated in the index. We also raise a ValueError
if any chunk is missing a 'current_index_time' timestamp.
"""
# Clean chunks if needed (remove invalid chars, etc.)
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""
# IMPORTANT: This must be done one index at a time, do not use secondary index here
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]
# We will store the set of doc_ids that previously existed in Vespa
doc_ids_to_current_index_time = {
chunk.source_document.id: chunk.current_index_time
for chunk in cleaned_chunks
}
existing_doc_ids = set()
existing_docs: set[str] = set()
with get_vespa_http_client() as http_client, concurrent.futures.ThreadPoolExecutor(
max_workers=NUM_THREADS
) as executor:
# a) Find which docs already exist in Vespa
existing_doc_ids = find_existing_docs_in_vespa_by_doc_id(
doc_ids=list(doc_ids_to_current_index_time.keys()),
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
if not fresh_index:
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [
chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
# b) Feed new/updated chunks in batches
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,
@ -348,34 +357,14 @@ class VespaIndex(DocumentIndex):
executor=executor,
)
# c) Remove chunks with using versioning scheme 'current_index_time'
for doc_id in existing_doc_ids:
version_cutoff = int(doc_ids_to_current_index_time[doc_id].timestamp())
query_str = build_deletion_selection_query(
doc_id=doc_id,
version_cutoff=version_cutoff,
doc_type=self.index_name,
)
delete_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/"
f"?{query_str}&cluster={DOCUMENT_INDEX_NAME}"
)
try:
resp = http_client.delete(delete_url)
resp.raise_for_status()
except httpx.HTTPStatusError:
logger.exception(
f"Selection-based delete failed for doc_id='{doc_id}'"
)
raise
all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
# Produce insertion records specifying which documents existed prior
return {
DocumentInsertionRecord(
document_id=doc_id,
already_existed=(doc_id in existing_doc_ids),
already_existed=doc_id in existing_docs,
)
for doc_id in doc_ids_to_current_index_time
for doc_id in all_doc_ids
}
@staticmethod

View File

@ -1,11 +1,8 @@
import concurrent.futures
import json
import urllib.parse
from datetime import datetime
from datetime import timezone
from http import HTTPStatus
from typing import List
from typing import Set
import httpx
from retry import retry
@ -24,7 +21,6 @@ from onyx.document_index.vespa_constants import BOOST
from onyx.document_index.vespa_constants import CHUNK_ID
from onyx.document_index.vespa_constants import CONTENT
from onyx.document_index.vespa_constants import CONTENT_SUMMARY
from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME
from onyx.document_index.vespa_constants import DOC_UPDATED_AT
from onyx.document_index.vespa_constants import DOCUMENT_ID
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
@ -36,7 +32,6 @@ from onyx.document_index.vespa_constants import METADATA_LIST
from onyx.document_index.vespa_constants import METADATA_SUFFIX
from onyx.document_index.vespa_constants import NUM_THREADS
from onyx.document_index.vespa_constants import PRIMARY_OWNERS
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT
from onyx.document_index.vespa_constants import SECONDARY_OWNERS
from onyx.document_index.vespa_constants import SECTION_CONTINUATION
from onyx.document_index.vespa_constants import SEMANTIC_IDENTIFIER
@ -173,7 +168,6 @@ def _index_vespa_chunk(
METADATA_SUFFIX: chunk.metadata_suffix_keyword,
EMBEDDINGS: embeddings_name_vector_map,
TITLE_EMBEDDING: chunk.title_embedding,
CURRENT_INDEX_TIME: _vespa_get_updated_at_attribute(chunk.current_index_time),
DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at),
PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners),
SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners),
@ -254,85 +248,3 @@ def clean_chunk_id_copy(
}
)
return clean_chunk
def _does_doc_exist_in_vespa(
doc_id: str,
index_name: str,
http_client: httpx.Client,
) -> bool:
"""
Checks whether there's a chunk/doc matching doc_id in Vespa using YQL.
"""
encoded_doc_id = urllib.parse.quote(doc_id)
# Construct the URL with YQL query
url = (
f"{SEARCH_ENDPOINT}"
f'?yql=select+*+from+sources+{index_name}+where+document_id+contains+"{encoded_doc_id}"'
"&hits=0"
)
logger.debug(f"Checking existence for doc_id={doc_id} with URL={url}")
resp = http_client.get(url)
if resp.status_code == 200:
data = resp.json()
try:
total_count = data["root"]["fields"]["totalCount"]
return total_count > 0
except (KeyError, TypeError):
logger.exception(f"Unexpected JSON structure from {url}: {data}")
raise
elif resp.status_code == 404:
return False
else:
logger.warning(
f"Unexpected HTTP {resp.status_code} checking doc existence for doc_id={doc_id}"
)
return False
def find_existing_docs_in_vespa_by_doc_id(
doc_ids: List[str],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> Set[str]:
"""
For each doc_id in doc_ids, returns whether it already exists in Vespa.
We do this concurrently for performance if doc_ids is large.
"""
if not doc_ids:
return set()
external_executor = True
if executor is None:
# Create our own if not given
external_executor = False
executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS)
existing_doc_ids = set()
try:
future_map = {
executor.submit(
_does_doc_exist_in_vespa, doc_id, index_name, http_client
): doc_id
for doc_id in doc_ids
}
for future in concurrent.futures.as_completed(future_map):
doc_id = future_map[future]
try:
if future.result():
existing_doc_ids.add(doc_id)
except Exception:
logger.exception(f"Error checking doc existence for doc_id={doc_id}")
raise
finally:
if not external_executor:
executor.shutdown(wait=True)
return existing_doc_ids

View File

@ -1,14 +1,12 @@
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from urllib.parse import urlencode
from onyx.configs.constants import INDEX_SEPARATOR
from onyx.context.search.models import IndexFilters
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST
from onyx.document_index.vespa_constants import CHUNK_ID
from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME
from onyx.document_index.vespa_constants import DOC_UPDATED_AT
from onyx.document_index.vespa_constants import DOCUMENT_ID
from onyx.document_index.vespa_constants import DOCUMENT_SETS
@ -108,24 +106,3 @@ def build_vespa_id_based_retrieval_yql(
id_based_retrieval_yql_section += ")"
return id_based_retrieval_yql_section
def build_deletion_selection_query(
doc_id: str, version_cutoff: int, doc_type: str
) -> str:
"""
Build a Vespa selection expression that includes:
- {doc_type}.document_id == <doc_id>
- {doc_type}.current_index_time < version_cutoff
Returns the URL-encoded selection query parameter.
"""
# Escape single quotes by doubling them for Vespa selection expressions
escaped_doc_id = doc_id.replace("'", "''")
filter_str = (
f"({doc_type}.document_id=='{escaped_doc_id}') and "
f"({doc_type}.{CURRENT_INDEX_TIME} < {version_cutoff})"
)
return urlencode({"selection": filter_str})

View File

@ -52,7 +52,6 @@ BATCH_SIZE = 128 # Specific to Vespa
TENANT_ID = "tenant_id"
DOCUMENT_ID = "document_id"
CURRENT_INDEX_TIME = "current_index_time"
CHUNK_ID = "chunk_id"
BLURB = "blurb"
CONTENT = "content"

View File

@ -1,7 +1,5 @@
import traceback
from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from functools import partial
from http import HTTPStatus
from typing import Protocol
@ -402,8 +400,6 @@ def index_doc_batch(
else DEFAULT_BOOST
),
tenant_id=tenant_id,
# Use a timezone-aware datetime, here we set to current UTC time
current_index_time=datetime.now(tz=timezone.utc),
)
for chunk in chunks_with_embeddings
]

View File

@ -1,4 +1,3 @@
import datetime
from typing import TYPE_CHECKING
from pydantic import BaseModel
@ -74,14 +73,12 @@ class DocMetadataAwareIndexChunk(IndexChunk):
of. This is used for filtering / personas.
boost: influences the ranking of this chunk at query time. Positive -> ranked higher,
negative -> ranked lower.
current_index_time: the timestamp of when this chunk is being indexed.
"""
tenant_id: str | None = None
access: "DocumentAccess"
document_sets: set[str]
boost: int
current_index_time: datetime.datetime
@classmethod
def from_index_chunk(
@ -91,7 +88,6 @@ class DocMetadataAwareIndexChunk(IndexChunk):
document_sets: set[str],
boost: int,
tenant_id: str | None,
current_index_time: datetime.datetime,
) -> "DocMetadataAwareIndexChunk":
index_chunk_data = index_chunk.model_dump()
return cls(
@ -100,7 +96,6 @@ class DocMetadataAwareIndexChunk(IndexChunk):
document_sets=document_sets,
boost=boost,
tenant_id=tenant_id,
current_index_time=current_index_time,
)

View File

@ -86,7 +86,6 @@ def _create_indexable_chunks(
access=default_public_access,
document_sets=set(),
boost=DEFAULT_BOOST,
current_index_time=datetime.datetime.now(datetime.timezone.utc),
)
chunks.append(chunk)
@ -218,7 +217,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder(tries=15)(document_index.index)
index_with_retries(chunks=chunks)
index_with_retries(chunks=chunks, fresh_index=cohere_enabled)
# Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt(

View File

@ -10,7 +10,6 @@ Then run test_query_times.py to test query times.
"""
import random
from datetime import datetime
from datetime import timezone
from onyx.access.models import DocumentAccess
from onyx.configs.constants import DocumentSource
@ -97,7 +96,6 @@ def generate_dummy_chunk(
document_sets={document_set for document_set in document_set_names},
boost=random.randint(-1, 1),
tenant_id=POSTGRES_DEFAULT_SCHEMA,
current_index_time=datetime.now(tz=timezone.utc),
)

View File

@ -81,6 +81,7 @@ export const ChatFilters = forwardRef<HTMLDivElement, ChatFiltersProps>(
const dedupedDocuments = removeDuplicateDocs(currentDocuments || []);
const tokenLimitReached = selectedDocumentTokens > maxTokens - 75;
console.log("SELECTED MESSAGE is", selectedMessage);
const hasSelectedDocuments = selectedDocumentIds.length > 0;