diff --git a/backend/onyx/connectors/web/connector.py b/backend/onyx/connectors/web/connector.py index 2c3ea064d..f15632b10 100644 --- a/backend/onyx/connectors/web/connector.py +++ b/backend/onyx/connectors/web/connector.py @@ -359,6 +359,7 @@ class WebConnector(LoadConnector): continue parsed_html = web_html_cleanup(soup, self.mintlify_cleanup) + doc_batch.append( Document( id=current_url, diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 730a69de2..47170f93b 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -504,7 +504,6 @@ class Document(Base): last_synced: Mapped[datetime.datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, index=True ) - # The following are not attached to User because the account/email may not be known # within Onyx # Something like the document creator diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index 3d27415a0..1f6386b09 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -148,6 +148,7 @@ class Indexable(abc.ABC): def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: """ Takes a list of document chunks and indexes them in the document index @@ -165,9 +166,15 @@ class Indexable(abc.ABC): only needs to index chunks into the PRIMARY index. Do not update the secondary index here, it is done automatically outside of this code. + NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously + indexed for the given index/tenant. This can be used to optimize the indexing process for + new or empty indices. + Parameters: - chunks: Document chunks with all of the information needed for indexing to the document index. + - fresh_index: Boolean indicating whether this is a fresh index with no existing documents. + Returns: List of document ids which map to unique documents and are used for deduping chunks when updating, as well as if the document is newly indexed or already existed and diff --git a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd index 4f2aaa9a7..2fd861b77 100644 --- a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd +++ b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd @@ -10,9 +10,6 @@ schema DANSWER_CHUNK_NAME { field chunk_id type int { indexing: summary | attribute } - field current_index_time type int { - indexing: summary | attribute - } # Displayed in the UI as the main identifier for the doc field semantic_identifier type string { indexing: summary | attribute diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index abdd37d1b..1b7478f8c 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -42,15 +42,12 @@ from onyx.document_index.vespa.deletion import delete_vespa_docs from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy from onyx.document_index.vespa.indexing_utils import ( - find_existing_docs_in_vespa_by_doc_id, + get_existing_documents_from_chunks, ) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, ) -from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( - build_deletion_selection_query, -) from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( build_vespa_filters, ) @@ -310,35 +307,47 @@ class VespaIndex(DocumentIndex): def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: - """ - Index a list of chunks into Vespa. We rely on 'current_index_time' - to keep track of when each chunk was added/updated in the index. We also raise a ValueError - if any chunk is missing a 'current_index_time' timestamp. - """ - - # Clean chunks if needed (remove invalid chars, etc.) + """Receive a list of chunks from a batch of documents and index the chunks into Vespa along + with updating the associated permissions. Assumes that a document will not be split into + multiple chunk batches calling this function multiple times, otherwise only the last set of + chunks will be kept""" + # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] - # We will store the set of doc_ids that previously existed in Vespa - doc_ids_to_current_index_time = { - chunk.source_document.id: chunk.current_index_time - for chunk in cleaned_chunks - } - existing_doc_ids = set() + existing_docs: set[str] = set() - with get_vespa_http_client() as http_client, concurrent.futures.ThreadPoolExecutor( - max_workers=NUM_THREADS - ) as executor: - # a) Find which docs already exist in Vespa - existing_doc_ids = find_existing_docs_in_vespa_by_doc_id( - doc_ids=list(doc_ids_to_current_index_time.keys()), - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for + # indexing / updates / deletes since we have to make a large volume of requests. + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, + get_vespa_http_client() as http_client, + ): + if not fresh_index: + # Check for existing documents, existing documents need to have all of their chunks deleted + # prior to indexing as the document size (num chunks) may have shrunk + first_chunks = [ + chunk for chunk in cleaned_chunks if chunk.chunk_id == 0 + ] + for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + existing_docs.update( + get_existing_documents_from_chunks( + chunks=chunk_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + ) + + for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): + delete_vespa_docs( + document_ids=doc_id_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) - # b) Feed new/updated chunks in batches for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, @@ -348,34 +357,14 @@ class VespaIndex(DocumentIndex): executor=executor, ) - # c) Remove chunks with using versioning scheme 'current_index_time' - for doc_id in existing_doc_ids: - version_cutoff = int(doc_ids_to_current_index_time[doc_id].timestamp()) - query_str = build_deletion_selection_query( - doc_id=doc_id, - version_cutoff=version_cutoff, - doc_type=self.index_name, - ) - delete_url = ( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/" - f"?{query_str}&cluster={DOCUMENT_INDEX_NAME}" - ) - try: - resp = http_client.delete(delete_url) - resp.raise_for_status() - except httpx.HTTPStatusError: - logger.exception( - f"Selection-based delete failed for doc_id='{doc_id}'" - ) - raise + all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks} - # Produce insertion records specifying which documents existed prior return { DocumentInsertionRecord( document_id=doc_id, - already_existed=(doc_id in existing_doc_ids), + already_existed=doc_id in existing_docs, ) - for doc_id in doc_ids_to_current_index_time + for doc_id in all_doc_ids } @staticmethod diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index 1d52c2e67..bfb0bd941 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -1,11 +1,8 @@ import concurrent.futures import json -import urllib.parse from datetime import datetime from datetime import timezone from http import HTTPStatus -from typing import List -from typing import Set import httpx from retry import retry @@ -24,7 +21,6 @@ from onyx.document_index.vespa_constants import BOOST from onyx.document_index.vespa_constants import CHUNK_ID from onyx.document_index.vespa_constants import CONTENT from onyx.document_index.vespa_constants import CONTENT_SUMMARY -from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT @@ -36,7 +32,6 @@ from onyx.document_index.vespa_constants import METADATA_LIST from onyx.document_index.vespa_constants import METADATA_SUFFIX from onyx.document_index.vespa_constants import NUM_THREADS from onyx.document_index.vespa_constants import PRIMARY_OWNERS -from onyx.document_index.vespa_constants import SEARCH_ENDPOINT from onyx.document_index.vespa_constants import SECONDARY_OWNERS from onyx.document_index.vespa_constants import SECTION_CONTINUATION from onyx.document_index.vespa_constants import SEMANTIC_IDENTIFIER @@ -173,7 +168,6 @@ def _index_vespa_chunk( METADATA_SUFFIX: chunk.metadata_suffix_keyword, EMBEDDINGS: embeddings_name_vector_map, TITLE_EMBEDDING: chunk.title_embedding, - CURRENT_INDEX_TIME: _vespa_get_updated_at_attribute(chunk.current_index_time), DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners), SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners), @@ -254,85 +248,3 @@ def clean_chunk_id_copy( } ) return clean_chunk - - -def _does_doc_exist_in_vespa( - doc_id: str, - index_name: str, - http_client: httpx.Client, -) -> bool: - """ - Checks whether there's a chunk/doc matching doc_id in Vespa using YQL. - """ - encoded_doc_id = urllib.parse.quote(doc_id) - - # Construct the URL with YQL query - url = ( - f"{SEARCH_ENDPOINT}" - f'?yql=select+*+from+sources+{index_name}+where+document_id+contains+"{encoded_doc_id}"' - "&hits=0" - ) - - logger.debug(f"Checking existence for doc_id={doc_id} with URL={url}") - resp = http_client.get(url) - - if resp.status_code == 200: - data = resp.json() - try: - total_count = data["root"]["fields"]["totalCount"] - return total_count > 0 - except (KeyError, TypeError): - logger.exception(f"Unexpected JSON structure from {url}: {data}") - raise - - elif resp.status_code == 404: - return False - - else: - logger.warning( - f"Unexpected HTTP {resp.status_code} checking doc existence for doc_id={doc_id}" - ) - return False - - -def find_existing_docs_in_vespa_by_doc_id( - doc_ids: List[str], - index_name: str, - http_client: httpx.Client, - executor: concurrent.futures.ThreadPoolExecutor | None = None, -) -> Set[str]: - """ - For each doc_id in doc_ids, returns whether it already exists in Vespa. - We do this concurrently for performance if doc_ids is large. - """ - if not doc_ids: - return set() - - external_executor = True - if executor is None: - # Create our own if not given - external_executor = False - executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) - - existing_doc_ids = set() - - try: - future_map = { - executor.submit( - _does_doc_exist_in_vespa, doc_id, index_name, http_client - ): doc_id - for doc_id in doc_ids - } - for future in concurrent.futures.as_completed(future_map): - doc_id = future_map[future] - try: - if future.result(): - existing_doc_ids.add(doc_id) - except Exception: - logger.exception(f"Error checking doc existence for doc_id={doc_id}") - raise - - finally: - if not external_executor: - executor.shutdown(wait=True) - return existing_doc_ids diff --git a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py index 85ce5e48e..dda75c853 100644 --- a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py +++ b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py @@ -1,14 +1,12 @@ from datetime import datetime from datetime import timedelta from datetime import timezone -from urllib.parse import urlencode from onyx.configs.constants import INDEX_SEPARATOR from onyx.context.search.models import IndexFilters from onyx.document_index.interfaces import VespaChunkRequest from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST from onyx.document_index.vespa_constants import CHUNK_ID -from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_SETS @@ -108,24 +106,3 @@ def build_vespa_id_based_retrieval_yql( id_based_retrieval_yql_section += ")" return id_based_retrieval_yql_section - - -def build_deletion_selection_query( - doc_id: str, version_cutoff: int, doc_type: str -) -> str: - """ - Build a Vespa selection expression that includes: - - {doc_type}.document_id == - - {doc_type}.current_index_time < version_cutoff - - Returns the URL-encoded selection query parameter. - """ - # Escape single quotes by doubling them for Vespa selection expressions - escaped_doc_id = doc_id.replace("'", "''") - - filter_str = ( - f"({doc_type}.document_id=='{escaped_doc_id}') and " - f"({doc_type}.{CURRENT_INDEX_TIME} < {version_cutoff})" - ) - - return urlencode({"selection": filter_str}) diff --git a/backend/onyx/document_index/vespa_constants.py b/backend/onyx/document_index/vespa_constants.py index 3bf5ddb9e..aff4e8556 100644 --- a/backend/onyx/document_index/vespa_constants.py +++ b/backend/onyx/document_index/vespa_constants.py @@ -52,7 +52,6 @@ BATCH_SIZE = 128 # Specific to Vespa TENANT_ID = "tenant_id" DOCUMENT_ID = "document_id" -CURRENT_INDEX_TIME = "current_index_time" CHUNK_ID = "chunk_id" BLURB = "blurb" CONTENT = "content" diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index bf15a71a6..f9ed3eb7b 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -1,7 +1,5 @@ import traceback from collections.abc import Callable -from datetime import datetime -from datetime import timezone from functools import partial from http import HTTPStatus from typing import Protocol @@ -402,8 +400,6 @@ def index_doc_batch( else DEFAULT_BOOST ), tenant_id=tenant_id, - # Use a timezone-aware datetime, here we set to current UTC time - current_index_time=datetime.now(tz=timezone.utc), ) for chunk in chunks_with_embeddings ] diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index b09d5570a..e9a155d17 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -1,4 +1,3 @@ -import datetime from typing import TYPE_CHECKING from pydantic import BaseModel @@ -74,14 +73,12 @@ class DocMetadataAwareIndexChunk(IndexChunk): of. This is used for filtering / personas. boost: influences the ranking of this chunk at query time. Positive -> ranked higher, negative -> ranked lower. - current_index_time: the timestamp of when this chunk is being indexed. """ tenant_id: str | None = None access: "DocumentAccess" document_sets: set[str] boost: int - current_index_time: datetime.datetime @classmethod def from_index_chunk( @@ -91,7 +88,6 @@ class DocMetadataAwareIndexChunk(IndexChunk): document_sets: set[str], boost: int, tenant_id: str | None, - current_index_time: datetime.datetime, ) -> "DocMetadataAwareIndexChunk": index_chunk_data = index_chunk.model_dump() return cls( @@ -100,7 +96,6 @@ class DocMetadataAwareIndexChunk(IndexChunk): document_sets=document_sets, boost=boost, tenant_id=tenant_id, - current_index_time=current_index_time, ) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index ed818ce69..b629b6ac3 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -86,7 +86,6 @@ def _create_indexable_chunks( access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, - current_index_time=datetime.datetime.now(datetime.timezone.utc), ) chunks.append(chunk) @@ -218,7 +217,7 @@ def seed_initial_documents( # as we just sent over the Vespa schema and there is a slight delay index_with_retries = retry_builder(tries=15)(document_index.index) - index_with_retries(chunks=chunks) + index_with_retries(chunks=chunks, fresh_index=cohere_enabled) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index 79d506c1d..e7a780569 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -10,7 +10,6 @@ Then run test_query_times.py to test query times. """ import random from datetime import datetime -from datetime import timezone from onyx.access.models import DocumentAccess from onyx.configs.constants import DocumentSource @@ -97,7 +96,6 @@ def generate_dummy_chunk( document_sets={document_set for document_set in document_set_names}, boost=random.randint(-1, 1), tenant_id=POSTGRES_DEFAULT_SCHEMA, - current_index_time=datetime.now(tz=timezone.utc), ) diff --git a/web/src/app/chat/documentSidebar/ChatFilters.tsx b/web/src/app/chat/documentSidebar/ChatFilters.tsx index 98fb9b36c..f91529f91 100644 --- a/web/src/app/chat/documentSidebar/ChatFilters.tsx +++ b/web/src/app/chat/documentSidebar/ChatFilters.tsx @@ -81,6 +81,7 @@ export const ChatFilters = forwardRef( const dedupedDocuments = removeDuplicateDocs(currentDocuments || []); const tokenLimitReached = selectedDocumentTokens > maxTokens - 75; + console.log("SELECTED MESSAGE is", selectedMessage); const hasSelectedDocuments = selectedDocumentIds.length > 0;