Fix early breakout causing us to not update ConnectorByCredentialPair

This commit is contained in:
Weves 2024-02-29 12:53:46 -08:00 committed by Chris Weaver
parent 2331bf9b36
commit 7b7561533f

View File

@ -17,6 +17,7 @@ from danswer.db.document import update_docs_updated_at
from danswer.db.document import upsert_documents_complete
from danswer.db.document_set import fetch_document_sets_for_documents
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.models import Document as DBDocument
from danswer.db.tag import create_or_add_document_tag
from danswer.db.tag import create_or_add_document_tag_list
from danswer.document_index.interfaces import DocumentIndex
@ -88,6 +89,28 @@ def upsert_documents_in_db(
)
def get_doc_ids_to_update(
documents: list[Document], db_docs: list[DBDocument]
) -> list[Document]:
"""Figures out which documents actually need to be updated. If a document is already present
and the `updated_at` hasn't changed, we shouldn't need to do anything with it."""
id_update_time_map = {
doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at
}
updatable_docs: list[Document] = []
for doc in documents:
if (
doc.id in id_update_time_map
and doc.doc_updated_at
and doc.doc_updated_at <= id_update_time_map[doc.id]
):
continue
updatable_docs.append(doc)
return updatable_docs
@log_function_time()
def index_doc_batch(
*,
@ -103,40 +126,25 @@ def index_doc_batch(
memory requirements"""
with Session(get_sqlalchemy_engine()) as db_session:
document_ids = [document.id for document in documents]
# Skip indexing docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
db_docs = get_documents_by_ids(
document_ids=document_ids,
db_session=db_session,
)
id_to_db_doc_map = {doc.id: doc for doc in db_docs}
id_update_time_map = {
doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at
}
updatable_docs: list[Document] = []
if ignore_time_skip:
updatable_docs = documents
else:
for doc in documents:
if (
doc.id in id_update_time_map
and doc.doc_updated_at
and doc.doc_updated_at <= id_update_time_map[doc.id]
):
continue
updatable_docs.append(doc)
# Skip indexing docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
else documents
)
updatable_ids = [doc.id for doc in updatable_docs]
# Acquires a lock on the documents so that no other process can modify them
prepare_to_modify_documents(db_session=db_session, document_ids=updatable_ids)
# Create records in the source of truth about these documents,
# does not include doc_updated_at which is also used to indicate a successful update
upsert_documents_in_db(
documents=updatable_docs,
documents=documents,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)
@ -151,6 +159,11 @@ def index_doc_batch(
logger.debug("Starting embedding")
chunks_with_embeddings = embedder.embed_chunks(chunks=chunks)
# Acquires a lock on the documents so that no other process can modify them
# NOTE: don't need to acquire till here, since this is when the actual race condition
# with Vespa can occur.
prepare_to_modify_documents(db_session=db_session, document_ids=updatable_ids)
# Attach the latest status from Postgres (source of truth for access) to each
# chunk. This access status will be attached to each chunk in the document index
# TODO: attach document sets to the chunk based on the status of Postgres as well