diff --git a/backend/alembic/versions/2955778aa44c_add_chunk_count_to_document.py b/backend/alembic/versions/2955778aa44c_add_chunk_count_to_document.py new file mode 100644 index 00000000000..755ef334fd4 --- /dev/null +++ b/backend/alembic/versions/2955778aa44c_add_chunk_count_to_document.py @@ -0,0 +1,24 @@ +"""add chunk count to document + +Revision ID: 2955778aa44c +Revises: c0aab6edb6dd +Create Date: 2025-01-04 11:39:43.268612 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "2955778aa44c" +down_revision = "c0aab6edb6dd" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column("document", sa.Column("chunk_count", sa.Integer(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("document", "chunk_count") diff --git a/backend/onyx/connectors/models.py b/backend/onyx/connectors/models.py index 474b8a186db..ee66d4b50a9 100644 --- a/backend/onyx/connectors/models.py +++ b/backend/onyx/connectors/models.py @@ -101,8 +101,11 @@ class DocumentBase(BaseModel): source: DocumentSource | None = None semantic_identifier: str # displayed in the UI as the main identifier for the doc metadata: dict[str, str | list[str]] + # UTC time doc_updated_at: datetime | None = None + chunk_count: int | None = None + # Owner, creator, etc. primary_owners: list[BasicExpertInfo] | None = None # Assignee, space owner, etc. diff --git a/backend/onyx/db/document.py b/backend/onyx/db/document.py index 9a7f89aa63e..4ebcc2c7d0d 100644 --- a/backend/onyx/db/document.py +++ b/backend/onyx/db/document.py @@ -416,6 +416,18 @@ def update_docs_last_modified__no_commit( doc.last_modified = now +def update_docs_chunk_count__no_commit( + document_ids: list[str], + doc_id_to_chunk_count: dict[str, int], + db_session: Session, +) -> None: + documents_to_update = ( + db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all() + ) + for doc in documents_to_update: + doc.chunk_count = doc_id_to_chunk_count[doc.id] + + def mark_document_as_modified( document_id: str, db_session: Session, @@ -612,3 +624,25 @@ def get_document( stmt = select(DbDocument).where(DbDocument.id == document_id) doc: DbDocument | None = db_session.execute(stmt).scalar_one_or_none() return doc + + +def fetch_chunk_counts_for_documents( + document_ids: list[str], + db_session: Session, +) -> list[tuple[str, int | None]]: + """ + Return a list of (document_id, chunk_count) tuples. + Note: chunk_count might be None if not set in DB, + so we declare it as Optional[int]. + """ + stmt = select(DbDocument.id, DbDocument.chunk_count).where( + DbDocument.id.in_(document_ids) + ) + + # results is a list of 'Row' objects, each containing two columns + results = db_session.execute(stmt).all() + + # If DbDocument.id is guaranteed to be a string, you can just do row.id; + # otherwise cast to str if you need to be sure it's a string: + return [(str(row[0]), row[1]) for row in results] + # or row.id, row.chunk_count if they are named attributes in your ORM model diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 47170f93b22..93d327dad01 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -494,6 +494,10 @@ class Document(Base): DateTime(timezone=True), nullable=True ) + # Number of chunks in the document (in Vespa) + # Only null for documents indexed prior to this change + chunk_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + # last time any vespa relevant row metadata or the doc changed. # does not include last_synced last_modified: Mapped[datetime.datetime | None] = mapped_column( diff --git a/backend/onyx/document_index/document_index_utils.py b/backend/onyx/document_index/document_index_utils.py index 2589ecd035d..8976b556eb2 100644 --- a/backend/onyx/document_index/document_index_utils.py +++ b/backend/onyx/document_index/document_index_utils.py @@ -1,12 +1,13 @@ import math import uuid +from uuid import UUID from sqlalchemy.orm import Session -from onyx.context.search.models import InferenceChunk from onyx.db.search_settings import get_current_search_settings from onyx.db.search_settings import get_secondary_search_settings -from onyx.indexing.models import IndexChunk +from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo +from onyx.indexing.models import DocMetadataAwareIndexChunk DEFAULT_BATCH_SIZE = 30 @@ -36,25 +37,118 @@ def translate_boost_count_to_multiplier(boost: int) -> float: return 2 / (1 + math.exp(-1 * boost / 3)) -def get_uuid_from_chunk( - chunk: IndexChunk | InferenceChunk, mini_chunk_ind: int = 0 -) -> uuid.UUID: - doc_str = ( - chunk.document_id - if isinstance(chunk, InferenceChunk) - else chunk.source_document.id - ) +def assemble_document_chunk_info( + enriched_document_info_list: list[EnrichedDocumentIndexingInfo], + tenant_id: str | None, + large_chunks_enabled: bool, +) -> list[UUID]: + doc_chunk_ids = [] + + for enriched_document_info in enriched_document_info_list: + for chunk_index in range( + enriched_document_info.chunk_start_index, + enriched_document_info.chunk_end_index, + ): + if not enriched_document_info.old_version: + doc_chunk_ids.append( + get_uuid_from_chunk_info( + document_id=enriched_document_info.doc_id, + chunk_id=chunk_index, + tenant_id=tenant_id, + ) + ) + else: + doc_chunk_ids.append( + get_uuid_from_chunk_info_old( + document_id=enriched_document_info.doc_id, + chunk_id=chunk_index, + ) + ) + + if large_chunks_enabled and chunk_index % 4 == 0: + large_chunk_id = int(chunk_index / 4) + large_chunk_reference_ids = [ + large_chunk_id + i + for i in range(4) + if large_chunk_id + i < enriched_document_info.chunk_end_index + ] + if enriched_document_info.old_version: + doc_chunk_ids.append( + get_uuid_from_chunk_info_old( + document_id=enriched_document_info.doc_id, + chunk_id=large_chunk_id, + large_chunk_reference_ids=large_chunk_reference_ids, + ) + ) + else: + doc_chunk_ids.append( + get_uuid_from_chunk_info( + document_id=enriched_document_info.doc_id, + chunk_id=large_chunk_id, + tenant_id=tenant_id, + large_chunk_id=large_chunk_id, + ) + ) + + return doc_chunk_ids + + +def get_uuid_from_chunk_info( + *, + document_id: str, + chunk_id: int, + tenant_id: str | None, + large_chunk_id: int | None = None, +) -> UUID: + doc_str = document_id + # Web parsing URL duplicate catching if doc_str and doc_str[-1] == "/": doc_str = doc_str[:-1] - unique_identifier_string = "_".join( - [doc_str, str(chunk.chunk_id), str(mini_chunk_ind)] + + chunk_index = ( + "large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id) ) - if chunk.large_chunk_reference_ids: + unique_identifier_string = "_".join([doc_str, chunk_index]) + if tenant_id: + unique_identifier_string += "_" + tenant_id + + return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) + + +def get_uuid_from_chunk_info_old( + *, document_id: str, chunk_id: int, large_chunk_reference_ids: list[int] = [] +) -> UUID: + doc_str = document_id + + # Web parsing URL duplicate catching + if doc_str and doc_str[-1] == "/": + doc_str = doc_str[:-1] + unique_identifier_string = "_".join([doc_str, str(chunk_id), "0"]) + if large_chunk_reference_ids: unique_identifier_string += "_large" + "_".join( [ str(referenced_chunk_id) - for referenced_chunk_id in chunk.large_chunk_reference_ids + for referenced_chunk_id in large_chunk_reference_ids ] ) return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) + + +def get_uuid_from_chunk(chunk: DocMetadataAwareIndexChunk) -> uuid.UUID: + return get_uuid_from_chunk_info( + document_id=chunk.source_document.id, + chunk_id=chunk.chunk_id, + tenant_id=chunk.tenant_id, + large_chunk_id=chunk.large_chunk_id, + ) + + +def get_uuid_from_chunk_old( + chunk: DocMetadataAwareIndexChunk, large_chunk_reference_ids: list[int] = [] +) -> UUID: + return get_uuid_from_chunk_info_old( + document_id=chunk.source_document.id, + chunk_id=chunk.chunk_id, + large_chunk_reference_ids=large_chunk_reference_ids, + ) diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index 1f6386b09ea..c97c87fd07e 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -35,6 +35,38 @@ class VespaChunkRequest: return None +@dataclass +class IndexBatchParams: + """ + Information necessary for efficiently indexing a batch of documents + """ + + doc_id_to_previous_chunk_cnt: dict[str, int | None] + doc_id_to_new_chunk_cnt: dict[str, int] + tenant_id: str | None + large_chunks_enabled: bool + + +@dataclass +class MinimalDocumentIndexingInfo: + """ + Minimal information necessary for indexing a document + """ + + doc_id: str + chunk_start_index: int + + +@dataclass +class EnrichedDocumentIndexingInfo(MinimalDocumentIndexingInfo): + """ + Enriched information necessary for indexing a document, including version and chunk range. + """ + + old_version: bool + chunk_end_index: int + + @dataclass class DocumentMetadata: """ @@ -148,7 +180,7 @@ class Indexable(abc.ABC): def index( self, chunks: list[DocMetadataAwareIndexChunk], - fresh_index: bool = False, + index_batch_params: IndexBatchParams, ) -> set[DocumentInsertionRecord]: """ Takes a list of document chunks and indexes them in the document index @@ -166,14 +198,11 @@ class Indexable(abc.ABC): only needs to index chunks into the PRIMARY index. Do not update the secondary index here, it is done automatically outside of this code. - NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously - indexed for the given index/tenant. This can be used to optimize the indexing process for - new or empty indices. - Parameters: - chunks: Document chunks with all of the information needed for indexing to the document index. - - fresh_index: Boolean indicating whether this is a fresh index with no existing documents. + - tenant_id: The tenant id of the user whose chunks are being indexed + - large_chunks_enabled: Whether large chunks are enabled Returns: List of document ids which map to unique documents and are used for deduping chunks @@ -185,7 +214,7 @@ class Indexable(abc.ABC): class Deletable(abc.ABC): """ - Class must implement the ability to delete document by their unique document ids. + Class must implement the ability to delete document by a given unique document id. """ @abc.abstractmethod @@ -198,16 +227,6 @@ class Deletable(abc.ABC): """ raise NotImplementedError - @abc.abstractmethod - def delete(self, doc_ids: list[str]) -> None: - """ - Given a list of document ids, hard delete them from the document index - - Parameters: - - doc_ids: list of document ids as specified by the connector - """ - raise NotImplementedError - class Updatable(abc.ABC): """ diff --git a/backend/onyx/document_index/vespa/deletion.py b/backend/onyx/document_index/vespa/deletion.py index 6237e712c4c..8c6f0cf6c75 100644 --- a/backend/onyx/document_index/vespa/deletion.py +++ b/backend/onyx/document_index/vespa/deletion.py @@ -1,11 +1,9 @@ import concurrent.futures +from uuid import UUID import httpx from retry import retry -from onyx.document_index.vespa.chunk_retrieval import ( - get_all_vespa_ids_for_document_id, -) from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT from onyx.document_index.vespa_constants import NUM_THREADS from onyx.utils.logger import setup_logger @@ -22,29 +20,21 @@ def _retryable_http_delete(http_client: httpx.Client, url: str) -> None: res.raise_for_status() -@retry(tries=3, delay=1, backoff=2) -def _delete_vespa_doc_chunks( - document_id: str, index_name: str, http_client: httpx.Client +def _delete_vespa_chunk( + doc_chunk_id: UUID, index_name: str, http_client: httpx.Client ) -> None: - doc_chunk_ids = get_all_vespa_ids_for_document_id( - document_id=document_id, - index_name=index_name, - get_large_chunks=True, - ) - - for chunk_id in doc_chunk_ids: - try: - _retryable_http_delete( - http_client, - f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}", - ) - except httpx.HTTPStatusError as e: - logger.error(f"Failed to delete chunk, details: {e.response.text}") - raise + try: + _retryable_http_delete( + http_client, + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}", + ) + except httpx.HTTPStatusError as e: + logger.error(f"Failed to delete chunk, details: {e.response.text}") + raise -def delete_vespa_docs( - document_ids: list[str], +def delete_vespa_chunks( + doc_chunk_ids: list[UUID], index_name: str, http_client: httpx.Client, executor: concurrent.futures.ThreadPoolExecutor | None = None, @@ -56,13 +46,13 @@ def delete_vespa_docs( executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) try: - doc_deletion_future = { + chunk_deletion_future = { executor.submit( - _delete_vespa_doc_chunks, doc_id, index_name, http_client - ): doc_id - for doc_id in document_ids + _delete_vespa_chunk, doc_chunk_id, index_name, http_client + ): doc_chunk_id + for doc_chunk_id in doc_chunk_ids } - for future in concurrent.futures.as_completed(doc_deletion_future): + for future in concurrent.futures.as_completed(chunk_deletion_future): # Will raise exception if the deletion raised an exception future.result() diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 1b7478f8cd3..eccd79252a3 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -25,8 +25,12 @@ from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS from onyx.configs.constants import KV_REINDEX_KEY from onyx.context.search.models import IndexFilters from onyx.context.search.models import InferenceChunkUncleaned +from onyx.document_index.document_index_utils import assemble_document_chunk_info from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentInsertionRecord +from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo +from onyx.document_index.interfaces import IndexBatchParams +from onyx.document_index.interfaces import MinimalDocumentIndexingInfo from onyx.document_index.interfaces import UpdateRequest from onyx.document_index.interfaces import VespaChunkRequest from onyx.document_index.interfaces import VespaDocumentFields @@ -38,12 +42,10 @@ from onyx.document_index.vespa.chunk_retrieval import ( parallel_visit_api_retrieval, ) from onyx.document_index.vespa.chunk_retrieval import query_vespa -from onyx.document_index.vespa.deletion import delete_vespa_docs +from onyx.document_index.vespa.deletion import delete_vespa_chunks from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks +from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy -from onyx.document_index.vespa.indexing_utils import ( - get_existing_documents_from_chunks, -) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, @@ -307,12 +309,18 @@ class VespaIndex(DocumentIndex): def index( self, chunks: list[DocMetadataAwareIndexChunk], - fresh_index: bool = False, + index_batch_params: IndexBatchParams, ) -> set[DocumentInsertionRecord]: """Receive a list of chunks from a batch of documents and index the chunks into Vespa along with updating the associated permissions. Assumes that a document will not be split into multiple chunk batches calling this function multiple times, otherwise only the last set of chunks will be kept""" + + doc_id_to_previous_chunk_cnt = index_batch_params.doc_id_to_previous_chunk_cnt + doc_id_to_new_chunk_cnt = index_batch_params.doc_id_to_new_chunk_cnt + tenant_id = index_batch_params.tenant_id + large_chunks_enabled = index_batch_params.large_chunks_enabled + # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] @@ -324,30 +332,55 @@ class VespaIndex(DocumentIndex): concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): - if not fresh_index: - # Check for existing documents, existing documents need to have all of their chunks deleted - # prior to indexing as the document size (num chunks) may have shrunk - first_chunks = [ - chunk for chunk in cleaned_chunks if chunk.chunk_id == 0 - ] - for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): - existing_docs.update( - get_existing_documents_from_chunks( - chunks=chunk_batch, - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) - ) + # We require the start and end index for each document in order to + # know precisely which chunks to delete. This information exists for + # documents that have `chunk_count` in the database, but not for + # `old_version` documents. - for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): - delete_vespa_docs( - document_ids=doc_id_batch, + enriched_doc_infos: list[EnrichedDocumentIndexingInfo] = [] + for document_id, _ in doc_id_to_previous_chunk_cnt.items(): + last_indexed_chunk = doc_id_to_previous_chunk_cnt.get(document_id, None) + # If the document has no `chunk_count` in the database, we know that it + # has the old chunk ID system and we must check for the final chunk index + is_old_version = False + if last_indexed_chunk is None: + is_old_version = True + minimal_doc_info = MinimalDocumentIndexingInfo( + doc_id=document_id, + chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), + ) + last_indexed_chunk = check_for_final_chunk_existence( + minimal_doc_info=minimal_doc_info, + start_index=doc_id_to_new_chunk_cnt[document_id], index_name=self.index_name, http_client=http_client, - executor=executor, ) + enriched_doc_info = EnrichedDocumentIndexingInfo( + doc_id=document_id, + chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), + chunk_end_index=last_indexed_chunk, + old_version=is_old_version, + ) + enriched_doc_infos.append(enriched_doc_info) + + # Now, for each doc, we know exactly where to start and end our deletion + # So let's generate the chunk IDs for each chunk to delete + chunks_to_delete = assemble_document_chunk_info( + enriched_document_info_list=enriched_doc_infos, + tenant_id=tenant_id, + large_chunks_enabled=large_chunks_enabled, + ) + + # Delete old Vespa documents + for doc_chunk_ids_batch in batch_generator(chunks_to_delete, BATCH_SIZE): + delete_vespa_chunks( + doc_chunk_ids=doc_chunk_ids_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, @@ -588,24 +621,6 @@ class VespaIndex(DocumentIndex): return total_chunks_updated - def delete(self, doc_ids: list[str]) -> None: - logger.info(f"Deleting {len(doc_ids)} documents from Vespa") - - doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids] - - # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for - # indexing / updates / deletes since we have to make a large volume of requests. - with get_vespa_http_client() as http_client: - index_names = [self.index_name] - if self.secondary_index_name: - index_names.append(self.secondary_index_name) - - for index_name in index_names: - delete_vespa_docs( - document_ids=doc_ids, index_name=index_name, http_client=http_client - ) - return - def delete_single(self, doc_id: str) -> int: """Possibly faster overall than the delete method due to using a single delete call with a selection query.""" diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index 224f938dcee..6781cae1d93 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -1,5 +1,6 @@ import concurrent.futures import json +import uuid from datetime import datetime from datetime import timezone from http import HTTPStatus @@ -11,6 +12,8 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, ) from onyx.document_index.document_index_utils import get_uuid_from_chunk +from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old +from onyx.document_index.interfaces import MinimalDocumentIndexingInfo from onyx.document_index.vespa.shared_utils.utils import remove_invalid_unicode_chars from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, @@ -48,14 +51,9 @@ logger = setup_logger() @retry(tries=3, delay=1, backoff=2) -def _does_document_exist( - doc_chunk_id: str, - index_name: str, - http_client: httpx.Client, +def _does_doc_chunk_exist( + doc_chunk_id: uuid.UUID, index_name: str, http_client: httpx.Client ) -> bool: - """Returns whether the document already exists and the users/group whitelists - Specifically in this case, document refers to a vespa document which is equivalent to a Onyx - chunk. This checks for whether the chunk exists already in the index""" doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" doc_fetch_response = http_client.get(doc_url) if doc_fetch_response.status_code == 404: @@ -98,8 +96,8 @@ def get_existing_documents_from_chunks( try: chunk_existence_future = { executor.submit( - _does_document_exist, - str(get_uuid_from_chunk(chunk)), + _does_doc_chunk_exist, + get_uuid_from_chunk(chunk), index_name, http_client, ): chunk @@ -248,3 +246,22 @@ def clean_chunk_id_copy( } ) return clean_chunk + + +def check_for_final_chunk_existence( + minimal_doc_info: MinimalDocumentIndexingInfo, + start_index: int, + index_name: str, + http_client: httpx.Client, +) -> int: + index = start_index + while True: + doc_chunk_id = get_uuid_from_chunk_info_old( + document_id=minimal_doc_info.doc_id, + chunk_id=index, + large_chunk_reference_ids=[], + ) + if not _does_doc_chunk_exist(doc_chunk_id, index_name, http_client): + return index + + index += 1 diff --git a/backend/onyx/document_index/vespa_constants.py b/backend/onyx/document_index/vespa_constants.py index aff4e85566d..a259aede5ca 100644 --- a/backend/onyx/document_index/vespa_constants.py +++ b/backend/onyx/document_index/vespa_constants.py @@ -35,6 +35,8 @@ DOCUMENT_ID_ENDPOINT = ( f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" ) +# the default document id endpoint is http://localhost:8080/document/v1/default/danswer_chunk/docid + SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/" NUM_THREADS = ( diff --git a/backend/onyx/indexing/chunker.py b/backend/onyx/indexing/chunker.py index 5756481e4e7..f95fe86e317 100644 --- a/backend/onyx/indexing/chunker.py +++ b/backend/onyx/indexing/chunker.py @@ -73,25 +73,25 @@ def _get_metadata_suffix_for_document_index( return metadata_semantic, metadata_keyword -def _combine_chunks(chunks: list[DocAwareChunk], index: int) -> DocAwareChunk: +def _combine_chunks(chunks: list[DocAwareChunk], large_chunk_id: int) -> DocAwareChunk: merged_chunk = DocAwareChunk( source_document=chunks[0].source_document, - chunk_id=index, + chunk_id=chunks[0].chunk_id, blurb=chunks[0].blurb, content=chunks[0].content, source_links=chunks[0].source_links or {}, - section_continuation=(index > 0), + section_continuation=(chunks[0].chunk_id > 0), title_prefix=chunks[0].title_prefix, metadata_suffix_semantic=chunks[0].metadata_suffix_semantic, metadata_suffix_keyword=chunks[0].metadata_suffix_keyword, - large_chunk_reference_ids=[chunks[0].chunk_id], + large_chunk_reference_ids=[chunk.chunk_id for chunk in chunks], mini_chunk_texts=None, + large_chunk_id=large_chunk_id, ) offset = 0 for i in range(1, len(chunks)): merged_chunk.content += SECTION_SEPARATOR + chunks[i].content - merged_chunk.large_chunk_reference_ids.append(chunks[i].chunk_id) offset += len(SECTION_SEPARATOR) + len(chunks[i - 1].content) for link_offset, link_text in (chunks[i].source_links or {}).items(): @@ -103,11 +103,12 @@ def _combine_chunks(chunks: list[DocAwareChunk], index: int) -> DocAwareChunk: def generate_large_chunks(chunks: list[DocAwareChunk]) -> list[DocAwareChunk]: - large_chunks = [ - _combine_chunks(chunks[i : i + LARGE_CHUNK_RATIO], idx) - for idx, i in enumerate(range(0, len(chunks), LARGE_CHUNK_RATIO)) - if len(chunks[i : i + LARGE_CHUNK_RATIO]) > 1 - ] + large_chunks = [] + for idx, i in enumerate(range(0, len(chunks), LARGE_CHUNK_RATIO)): + chunk_group = chunks[i : i + LARGE_CHUNK_RATIO] + if len(chunk_group) > 1: + large_chunk = _combine_chunks(chunk_group, idx) + large_chunks.append(large_chunk) return large_chunks @@ -219,6 +220,7 @@ class Chunker: metadata_suffix_semantic=metadata_suffix_semantic, metadata_suffix_keyword=metadata_suffix_keyword, mini_chunk_texts=self._get_mini_chunk_texts(text), + large_chunk_id=None, ) for section_idx, section in enumerate(document.sections): diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index f9ed3eb7b8e..1a2e73b2ab1 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -20,8 +20,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( ) from onyx.connectors.models import Document from onyx.connectors.models import IndexAttemptMetadata +from onyx.db.document import fetch_chunk_counts_for_documents from onyx.db.document import get_documents_by_ids from onyx.db.document import prepare_to_modify_documents +from onyx.db.document import update_docs_chunk_count__no_commit from onyx.db.document import update_docs_last_modified__no_commit from onyx.db.document import update_docs_updated_at__no_commit from onyx.db.document import upsert_document_by_connector_credential_pair @@ -34,6 +36,7 @@ from onyx.db.tag import create_or_add_document_tag from onyx.db.tag import create_or_add_document_tag_list from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentMetadata +from onyx.document_index.interfaces import IndexBatchParams from onyx.indexing.chunker import Chunker from onyx.indexing.embedder import IndexingEmbedder from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface @@ -370,16 +373,35 @@ def index_doc_batch( # NOTE: don't need to acquire till here, since this is when the actual race condition # with Vespa can occur. with prepare_to_modify_documents(db_session=db_session, document_ids=updatable_ids): - document_id_to_access_info = get_access_for_documents( + doc_id_to_access_info = get_access_for_documents( document_ids=updatable_ids, db_session=db_session ) - document_id_to_document_set = { + doc_id_to_document_set = { document_id: document_sets for document_id, document_sets in fetch_document_sets_for_documents( document_ids=updatable_ids, db_session=db_session ) } + doc_id_to_previous_chunk_cnt: dict[str, int | None] = { + document_id: chunk_count + for document_id, chunk_count in fetch_chunk_counts_for_documents( + document_ids=updatable_ids, + db_session=db_session, + ) + } + + doc_id_to_new_chunk_cnt: dict[str, int] = { + document_id: len( + [ + chunk + for chunk in chunks_with_embeddings + if chunk.source_document.id == document_id + ] + ) + for document_id in updatable_ids + } + # we're concerned about race conditions where multiple simultaneous indexings might result # in one set of metadata overwriting another one in vespa. # we still write data here for the immediate and most likely correct sync, but @@ -388,11 +410,9 @@ def index_doc_batch( access_aware_chunks = [ DocMetadataAwareIndexChunk.from_index_chunk( index_chunk=chunk, - access=document_id_to_access_info.get( - chunk.source_document.id, no_access - ), + access=doc_id_to_access_info.get(chunk.source_document.id, no_access), document_sets=set( - document_id_to_document_set.get(chunk.source_document.id, []) + doc_id_to_document_set.get(chunk.source_document.id, []) ), boost=( ctx.id_to_db_doc_map[chunk.source_document.id].boost @@ -410,7 +430,15 @@ def index_doc_batch( # A document will not be spread across different batches, so all the # documents with chunks in this set, are fully represented by the chunks # in this set - insertion_records = document_index.index(chunks=access_aware_chunks) + insertion_records = document_index.index( + chunks=access_aware_chunks, + index_batch_params=IndexBatchParams( + doc_id_to_previous_chunk_cnt=doc_id_to_previous_chunk_cnt, + doc_id_to_new_chunk_cnt=doc_id_to_new_chunk_cnt, + tenant_id=tenant_id, + large_chunks_enabled=chunker.enable_large_chunks, + ), + ) successful_doc_ids = [record.document_id for record in insertion_records] successful_docs = [ @@ -435,6 +463,12 @@ def index_doc_batch( document_ids=last_modified_ids, db_session=db_session ) + update_docs_chunk_count__no_commit( + document_ids=updatable_ids, + doc_id_to_chunk_count=doc_id_to_new_chunk_cnt, + db_session=db_session, + ) + db_session.commit() result = ( @@ -445,6 +479,28 @@ def index_doc_batch( return result +def check_enable_large_chunks_and_multipass( + embedder: IndexingEmbedder, db_session: Session +) -> tuple[bool, bool]: + search_settings = get_current_search_settings(db_session) + multipass = ( + search_settings.multipass_indexing + if search_settings + else ENABLE_MULTIPASS_INDEXING + ) + + enable_large_chunks = ( + multipass + and + # Only local models that supports larger context are from Nomic + (embedder.model_name.startswith("nomic-ai")) + and + # Cohere does not support larger context they recommend not going above 512 tokens + embedder.provider_type != EmbeddingProvider.COHERE + ) + return multipass, enable_large_chunks + + def build_indexing_pipeline( *, embedder: IndexingEmbedder, @@ -457,24 +513,8 @@ def build_indexing_pipeline( callback: IndexingHeartbeatInterface | None = None, ) -> IndexingPipelineProtocol: """Builds a pipeline which takes in a list (batch) of docs and indexes them.""" - search_settings = get_current_search_settings(db_session) - multipass = ( - search_settings.multipass_indexing - if search_settings - else ENABLE_MULTIPASS_INDEXING - ) - - enable_large_chunks = ( - multipass - and - # Only local models that supports larger context are from Nomic - ( - embedder.provider_type is not None - or embedder.model_name.startswith("nomic-ai") - ) - and - # Cohere does not support larger context they recommend not going above 512 tokens - embedder.provider_type != EmbeddingProvider.COHERE + multipass, enable_large_chunks = check_enable_large_chunks_and_multipass( + embedder, db_session ) chunker = chunker or Chunker( diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index e9a155d172b..e64cb9ae6b7 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -47,6 +47,8 @@ class DocAwareChunk(BaseChunk): mini_chunk_texts: list[str] | None + large_chunk_id: int | None + large_chunk_reference_ids: list[int] = Field(default_factory=list) def to_short_descriptor(self) -> str: diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index b629b6ac3bc..44d0750e02f 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -25,6 +25,7 @@ from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.index_attempt import mock_successful_index_attempt from onyx.db.search_settings import get_current_search_settings from onyx.document_index.factory import get_default_document_index +from onyx.document_index.interfaces import IndexBatchParams from onyx.indexing.indexing_pipeline import index_doc_batch_prepare from onyx.indexing.models import ChunkEmbedding from onyx.indexing.models import DocMetadataAwareIndexChunk @@ -86,6 +87,7 @@ def _create_indexable_chunks( access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, + large_chunk_id=None, ) chunks.append(chunk) @@ -217,7 +219,15 @@ def seed_initial_documents( # as we just sent over the Vespa schema and there is a slight delay index_with_retries = retry_builder(tries=15)(document_index.index) - index_with_retries(chunks=chunks, fresh_index=cohere_enabled) + index_with_retries( + chunks=chunks, + index_batch_params=IndexBatchParams( + doc_id_to_previous_chunk_cnt={}, + doc_id_to_new_chunk_cnt={}, + large_chunks_enabled=False, + tenant_id=tenant_id, + ), + ) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/scripts/force_delete_connector_by_id.py b/backend/scripts/force_delete_connector_by_id.py index 3a791c6fda7..bc994238870 100755 --- a/backend/scripts/force_delete_connector_by_id.py +++ b/backend/scripts/force_delete_connector_by_id.py @@ -72,7 +72,9 @@ def _unsafe_deletion( break document_ids = [document.id for document in documents] - document_index.delete(doc_ids=document_ids) + for doc_id in document_ids: + document_index.delete_single(doc_id) + delete_documents_complete__no_commit( db_session=db_session, document_ids=document_ids, diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index e7a7805690f..ce71b1d28b6 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -17,6 +17,7 @@ from onyx.connectors.models import Document from onyx.db.engine import get_session_context_manager from onyx.db.search_settings import get_current_search_settings from onyx.document_index.vespa.index import VespaIndex +from onyx.indexing.indexing_pipeline import IndexBatchParams from onyx.indexing.models import ChunkEmbedding from onyx.indexing.models import DocMetadataAwareIndexChunk from onyx.indexing.models import IndexChunk @@ -24,7 +25,6 @@ from onyx.utils.timing import log_function_time from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.model_server_models import Embedding - TOTAL_DOC_SETS = 8 TOTAL_ACL_ENTRIES_PER_CATEGORY = 80 @@ -68,6 +68,8 @@ def generate_dummy_chunk( mini_chunk_embeddings=[], ), title_embedding=generate_random_embedding(embedding_dim), + large_chunk_id=None, + large_chunk_reference_ids=[], ) document_set_names = [] @@ -103,7 +105,15 @@ def generate_dummy_chunk( def do_insertion( vespa_index: VespaIndex, all_chunks: list[DocMetadataAwareIndexChunk] ) -> None: - insertion_records = vespa_index.index(all_chunks) + insertion_records = vespa_index.index( + chunks=all_chunks, + index_batch_params=IndexBatchParams( + doc_id_to_previous_chunk_cnt={}, + doc_id_to_new_chunk_cnt={}, + tenant_id=POSTGRES_DEFAULT_SCHEMA, + large_chunks_enabled=False, + ), + ) print(f"Indexed {len(insertion_records)} documents.") print( f"New documents: {sum(1 for record in insertion_records if not record.already_existed)}" diff --git a/backend/tests/integration/common_utils/vespa.py b/backend/tests/integration/common_utils/vespa.py index 933c4d5e056..46698fbe34c 100644 --- a/backend/tests/integration/common_utils/vespa.py +++ b/backend/tests/integration/common_utils/vespa.py @@ -1,6 +1,6 @@ import requests -from onyx.document_index.vespa.index import DOCUMENT_ID_ENDPOINT +from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT class vespa_fixture: diff --git a/backend/tests/unit/onyx/indexing/test_embedder.py b/backend/tests/unit/onyx/indexing/test_embedder.py index 78400ecd866..0c7d6b43f5a 100644 --- a/backend/tests/unit/onyx/indexing/test_embedder.py +++ b/backend/tests/unit/onyx/indexing/test_embedder.py @@ -61,6 +61,7 @@ def test_default_indexing_embedder_embed_chunks(mock_embedding_model: Mock) -> N metadata_suffix_keyword="", mini_chunk_texts=None, large_chunk_reference_ids=[], + large_chunk_id=None, ) ]