diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index fa5199563..21f4422e8 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -17,6 +17,7 @@ from danswer.db.document import update_docs_updated_at from danswer.db.document import upsert_documents_complete from danswer.db.document_set import fetch_document_sets_for_documents from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.models import Document as DBDocument from danswer.db.tag import create_or_add_document_tag from danswer.db.tag import create_or_add_document_tag_list from danswer.document_index.interfaces import DocumentIndex @@ -88,6 +89,28 @@ def upsert_documents_in_db( ) +def get_doc_ids_to_update( + documents: list[Document], db_docs: list[DBDocument] +) -> list[Document]: + """Figures out which documents actually need to be updated. If a document is already present + and the `updated_at` hasn't changed, we shouldn't need to do anything with it.""" + id_update_time_map = { + doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at + } + + updatable_docs: list[Document] = [] + for doc in documents: + if ( + doc.id in id_update_time_map + and doc.doc_updated_at + and doc.doc_updated_at <= id_update_time_map[doc.id] + ): + continue + updatable_docs.append(doc) + + return updatable_docs + + @log_function_time() def index_doc_batch( *, @@ -103,40 +126,25 @@ def index_doc_batch( memory requirements""" with Session(get_sqlalchemy_engine()) as db_session: document_ids = [document.id for document in documents] - - # Skip indexing docs that don't have a newer updated at - # Shortcuts the time-consuming flow on connector index retries db_docs = get_documents_by_ids( document_ids=document_ids, db_session=db_session, ) id_to_db_doc_map = {doc.id: doc for doc in db_docs} - id_update_time_map = { - doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at - } - - updatable_docs: list[Document] = [] - if ignore_time_skip: - updatable_docs = documents - else: - for doc in documents: - if ( - doc.id in id_update_time_map - and doc.doc_updated_at - and doc.doc_updated_at <= id_update_time_map[doc.id] - ): - continue - updatable_docs.append(doc) + # Skip indexing docs that don't have a newer updated at + # Shortcuts the time-consuming flow on connector index retries + updatable_docs = ( + get_doc_ids_to_update(documents=documents, db_docs=db_docs) + if not ignore_time_skip + else documents + ) updatable_ids = [doc.id for doc in updatable_docs] - # Acquires a lock on the documents so that no other process can modify them - prepare_to_modify_documents(db_session=db_session, document_ids=updatable_ids) - # Create records in the source of truth about these documents, # does not include doc_updated_at which is also used to indicate a successful update upsert_documents_in_db( - documents=updatable_docs, + documents=documents, index_attempt_metadata=index_attempt_metadata, db_session=db_session, ) @@ -151,6 +159,11 @@ def index_doc_batch( logger.debug("Starting embedding") chunks_with_embeddings = embedder.embed_chunks(chunks=chunks) + # Acquires a lock on the documents so that no other process can modify them + # NOTE: don't need to acquire till here, since this is when the actual race condition + # with Vespa can occur. + prepare_to_modify_documents(db_session=db_session, document_ids=updatable_ids) + # Attach the latest status from Postgres (source of truth for access) to each # chunk. This access status will be attached to each chunk in the document index # TODO: attach document sets to the chunk based on the status of Postgres as well