mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-07 19:38:19 +02:00
Feature/indexing errors (#2148)
* backend changes to handle partial completion of index attempts * typo fix * Display partial success in UI * make log timing more readable by limiting printed precision to milliseconds * forgot alembic * initial cut at "completed with errors" indexing * remove and reorganize unused imports * show view errors while indexing is in progress * code review fixes
This commit is contained in:
parent
739058aacc
commit
492797c9f3
@ -0,0 +1,57 @@
|
||||
"""Add index_attempt_errors table
|
||||
|
||||
Revision ID: c5b692fa265c
|
||||
Revises: 4a951134c801
|
||||
Create Date: 2024-08-08 14:06:39.581972
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "c5b692fa265c"
|
||||
down_revision = "4a951134c801"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"index_attempt_errors",
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("index_attempt_id", sa.Integer(), nullable=True),
|
||||
sa.Column("batch", sa.Integer(), nullable=True),
|
||||
sa.Column(
|
||||
"doc_summaries",
|
||||
postgresql.JSONB(astext_type=sa.Text()),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("error_msg", sa.Text(), nullable=True),
|
||||
sa.Column("traceback", sa.Text(), nullable=True),
|
||||
sa.Column(
|
||||
"time_created",
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.text("now()"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.ForeignKeyConstraint(
|
||||
["index_attempt_id"],
|
||||
["index_attempt.id"],
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
op.create_index(
|
||||
"index_attempt_id",
|
||||
"index_attempt_errors",
|
||||
["time_created"],
|
||||
unique=False,
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index("index_attempt_id", table_name="index_attempt_errors")
|
||||
op.drop_table("index_attempt_errors")
|
||||
# ### end Alembic commands ###
|
@ -24,6 +24,7 @@ from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.db.index_attempt import get_index_attempt
|
||||
from danswer.db.index_attempt import mark_attempt_failed
|
||||
from danswer.db.index_attempt import mark_attempt_in_progress
|
||||
from danswer.db.index_attempt import mark_attempt_partially_succeeded
|
||||
from danswer.db.index_attempt import mark_attempt_succeeded
|
||||
from danswer.db.index_attempt import update_docs_indexed
|
||||
from danswer.db.models import IndexAttempt
|
||||
@ -131,6 +132,7 @@ def _run_indexing(
|
||||
)
|
||||
|
||||
indexing_pipeline = build_indexing_pipeline(
|
||||
attempt_id=index_attempt.id,
|
||||
embedder=embedding_model,
|
||||
document_index=document_index,
|
||||
ignore_time_skip=index_attempt.from_beginning
|
||||
@ -163,6 +165,12 @@ def _run_indexing(
|
||||
tracer.start()
|
||||
tracer.snap()
|
||||
|
||||
index_attempt_md = IndexAttemptMetadata(
|
||||
connector_id=db_connector.id,
|
||||
credential_id=db_credential.id,
|
||||
)
|
||||
|
||||
batch_num = 0
|
||||
net_doc_change = 0
|
||||
document_count = 0
|
||||
chunk_count = 0
|
||||
@ -228,13 +236,13 @@ def _run_indexing(
|
||||
|
||||
logger.debug(f"Indexing batch of documents: {batch_description}")
|
||||
|
||||
index_attempt_md.batch_num = batch_num + 1 # use 1-index for this
|
||||
new_docs, total_batch_chunks = indexing_pipeline(
|
||||
document_batch=doc_batch,
|
||||
index_attempt_metadata=IndexAttemptMetadata(
|
||||
connector_id=db_connector.id,
|
||||
credential_id=db_credential.id,
|
||||
),
|
||||
index_attempt_metadata=index_attempt_md,
|
||||
)
|
||||
|
||||
batch_num += 1
|
||||
net_doc_change += new_docs
|
||||
chunk_count += total_batch_chunks
|
||||
document_count += len(doc_batch)
|
||||
@ -323,7 +331,44 @@ def _run_indexing(
|
||||
tracer.stop()
|
||||
logger.info("Memory tracer stopped.")
|
||||
|
||||
mark_attempt_succeeded(index_attempt, db_session)
|
||||
if (
|
||||
index_attempt_md.num_exceptions > 0
|
||||
and index_attempt_md.num_exceptions >= batch_num
|
||||
):
|
||||
mark_attempt_failed(
|
||||
index_attempt,
|
||||
db_session,
|
||||
failure_reason="All batches exceptioned.",
|
||||
)
|
||||
if is_primary:
|
||||
update_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
connector_id=index_attempt.connector_credential_pair.connector.id,
|
||||
credential_id=index_attempt.connector_credential_pair.credential.id,
|
||||
)
|
||||
raise Exception(
|
||||
f"Connector failed - All batches exceptioned: batches={batch_num}"
|
||||
)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
if index_attempt_md.num_exceptions == 0:
|
||||
mark_attempt_succeeded(index_attempt, db_session)
|
||||
logger.info(
|
||||
f"Connector succeeded: "
|
||||
f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s"
|
||||
)
|
||||
else:
|
||||
mark_attempt_partially_succeeded(index_attempt, db_session)
|
||||
logger.info(
|
||||
f"Connector completed with some errors: "
|
||||
f"exceptions={index_attempt_md.num_exceptions} "
|
||||
f"batches={batch_num} "
|
||||
f"docs={document_count} "
|
||||
f"chunks={chunk_count} "
|
||||
f"elapsed={elapsed_time:.2f}s"
|
||||
)
|
||||
|
||||
if is_primary:
|
||||
update_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
@ -332,11 +377,6 @@ def _run_indexing(
|
||||
run_dt=run_end_dt,
|
||||
)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
logger.info(
|
||||
f"Connector succeeded: docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s"
|
||||
)
|
||||
|
||||
|
||||
def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexAttempt:
|
||||
# make sure that the index attempt can't change in between checking the
|
||||
|
@ -120,16 +120,6 @@ def _should_create_new_indexing(
|
||||
return time_since_index.total_seconds() >= connector.refresh_freq
|
||||
|
||||
|
||||
def _is_indexing_job_marked_as_finished(index_attempt: IndexAttempt | None) -> bool:
|
||||
if index_attempt is None:
|
||||
return False
|
||||
|
||||
return (
|
||||
index_attempt.status == IndexingStatus.FAILED
|
||||
or index_attempt.status == IndexingStatus.SUCCESS
|
||||
)
|
||||
|
||||
|
||||
def _mark_run_failed(
|
||||
db_session: Session, index_attempt: IndexAttempt, failure_reason: str
|
||||
) -> None:
|
||||
@ -215,10 +205,12 @@ def cleanup_indexing_jobs(
|
||||
)
|
||||
|
||||
# do nothing for ongoing jobs that haven't been stopped
|
||||
if not job.done() and not _is_indexing_job_marked_as_finished(
|
||||
index_attempt
|
||||
):
|
||||
continue
|
||||
if not job.done():
|
||||
if not index_attempt:
|
||||
continue
|
||||
|
||||
if not index_attempt.is_finished():
|
||||
continue
|
||||
|
||||
if job.status == "error":
|
||||
logger.error(job.exception())
|
||||
|
@ -295,6 +295,10 @@ INDEXING_SIZE_WARNING_THRESHOLD = int(
|
||||
# 0 disables this behavior and is the default.
|
||||
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0))
|
||||
|
||||
# During an indexing attempt, specifies the number of batches which are allowed to
|
||||
# exception without aborting the attempt.
|
||||
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0))
|
||||
|
||||
#####
|
||||
# Miscellaneous
|
||||
#####
|
||||
|
@ -166,6 +166,36 @@ class Document(DocumentBase):
|
||||
)
|
||||
|
||||
|
||||
class DocumentErrorSummary(BaseModel):
|
||||
id: str
|
||||
semantic_id: str
|
||||
section_link: str | None
|
||||
|
||||
@classmethod
|
||||
def from_document(cls, doc: Document) -> "DocumentErrorSummary":
|
||||
section_link = doc.sections[0].link if len(doc.sections) > 0 else None
|
||||
return cls(
|
||||
id=doc.id, semantic_id=doc.semantic_identifier, section_link=section_link
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "DocumentErrorSummary":
|
||||
return cls(
|
||||
id=str(data.get("id")),
|
||||
semantic_id=str(data.get("semantic_id")),
|
||||
section_link=str(data.get("section_link")),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, str | None]:
|
||||
return {
|
||||
"id": self.id,
|
||||
"semantic_id": self.semantic_id,
|
||||
"section_link": self.section_link,
|
||||
}
|
||||
|
||||
|
||||
class IndexAttemptMetadata(BaseModel):
|
||||
batch_num: int | None = None
|
||||
num_exceptions: int = 0
|
||||
connector_id: int
|
||||
credential_id: int
|
||||
|
@ -317,7 +317,7 @@ def prepare_to_modify_documents(
|
||||
called ahead of any modification to Vespa. Locks should be released by the
|
||||
caller as soon as updates are complete by finishing the transaction.
|
||||
|
||||
NOTE: only one commit is allowed within the context manager returned by this funtion.
|
||||
NOTE: only one commit is allowed within the context manager returned by this function.
|
||||
Multiple commits will result in a sqlalchemy.exc.InvalidRequestError.
|
||||
NOTE: this function will commit any existing transaction.
|
||||
"""
|
||||
|
@ -6,6 +6,15 @@ class IndexingStatus(str, PyEnum):
|
||||
IN_PROGRESS = "in_progress"
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
COMPLETED_WITH_ERRORS = "completed_with_errors"
|
||||
|
||||
def is_terminal(self) -> bool:
|
||||
terminal_states = {
|
||||
IndexingStatus.SUCCESS,
|
||||
IndexingStatus.COMPLETED_WITH_ERRORS,
|
||||
IndexingStatus.FAILED,
|
||||
}
|
||||
return self in terminal_states
|
||||
|
||||
|
||||
# these may differ in the future, which is why we're okay with this duplication
|
||||
|
@ -11,8 +11,11 @@ from sqlalchemy import update
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import DocumentErrorSummary
|
||||
from danswer.db.models import EmbeddingModel
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexAttemptError
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.db.models import IndexModelStatus
|
||||
from danswer.server.documents.models import ConnectorCredentialPair
|
||||
@ -118,6 +121,15 @@ def mark_attempt_succeeded(
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def mark_attempt_partially_succeeded(
|
||||
index_attempt: IndexAttempt,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
index_attempt.status = IndexingStatus.COMPLETED_WITH_ERRORS
|
||||
db_session.add(index_attempt)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def mark_attempt_failed(
|
||||
index_attempt: IndexAttempt,
|
||||
db_session: Session,
|
||||
@ -401,3 +413,41 @@ def count_unique_cc_pairs_with_successful_index_attempts(
|
||||
)
|
||||
|
||||
return unique_pairs_count
|
||||
|
||||
|
||||
def create_index_attempt_error(
|
||||
index_attempt_id: int | None,
|
||||
batch: int | None,
|
||||
docs: list[Document],
|
||||
exception_msg: str,
|
||||
exception_traceback: str,
|
||||
db_session: Session,
|
||||
) -> int:
|
||||
doc_summaries = []
|
||||
for doc in docs:
|
||||
doc_summary = DocumentErrorSummary.from_document(doc)
|
||||
doc_summaries.append(doc_summary.to_dict())
|
||||
|
||||
new_error = IndexAttemptError(
|
||||
index_attempt_id=index_attempt_id,
|
||||
batch=batch,
|
||||
doc_summaries=doc_summaries,
|
||||
error_msg=exception_msg,
|
||||
traceback=exception_traceback,
|
||||
)
|
||||
db_session.add(new_error)
|
||||
db_session.commit()
|
||||
|
||||
return new_error.id
|
||||
|
||||
|
||||
def get_index_attempt_errors(
|
||||
index_attempt_id: int,
|
||||
db_session: Session,
|
||||
) -> list[IndexAttemptError]:
|
||||
stmt = select(IndexAttemptError).where(
|
||||
IndexAttemptError.index_attempt_id == index_attempt_id
|
||||
)
|
||||
|
||||
errors = db_session.scalars(stmt)
|
||||
return list(errors.all())
|
||||
|
@ -660,6 +660,8 @@ class IndexAttempt(Base):
|
||||
"EmbeddingModel", back_populates="index_attempts"
|
||||
)
|
||||
|
||||
error_rows = relationship("IndexAttemptError", back_populates="index_attempt")
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_index_attempt_latest_for_connector_credential_pair",
|
||||
@ -677,6 +679,53 @@ class IndexAttempt(Base):
|
||||
f"time_updated={self.time_updated!r}, "
|
||||
)
|
||||
|
||||
def is_finished(self) -> bool:
|
||||
return self.status.is_terminal()
|
||||
|
||||
|
||||
class IndexAttemptError(Base):
|
||||
"""
|
||||
Represents an error that was encountered during an IndexAttempt.
|
||||
"""
|
||||
|
||||
__tablename__ = "index_attempt_errors"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
|
||||
index_attempt_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("index_attempt.id"),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
# The index of the batch where the error occurred (if looping thru batches)
|
||||
# Just informational.
|
||||
batch: Mapped[int | None] = mapped_column(Integer, default=None)
|
||||
doc_summaries: Mapped[list[Any]] = mapped_column(postgresql.JSONB())
|
||||
error_msg: Mapped[str | None] = mapped_column(Text, default=None)
|
||||
traceback: Mapped[str | None] = mapped_column(Text, default=None)
|
||||
time_created: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
)
|
||||
|
||||
# This is the reverse side of the relationship
|
||||
index_attempt = relationship("IndexAttempt", back_populates="error_rows")
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"index_attempt_id",
|
||||
"time_created",
|
||||
),
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"<IndexAttempt(id={self.id!r}, "
|
||||
f"index_attempt_id={self.index_attempt_id!r}, "
|
||||
f"error_msg={self.error_msg!r})>"
|
||||
f"time_created={self.time_created!r}, "
|
||||
)
|
||||
|
||||
|
||||
class DocumentByConnectorCredentialPair(Base):
|
||||
"""Represents an indexing of a document by a specific connector / credential pair"""
|
||||
|
@ -1,10 +1,13 @@
|
||||
import traceback
|
||||
from functools import partial
|
||||
from typing import Protocol
|
||||
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.access.access import get_access_for_documents
|
||||
from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING
|
||||
from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT
|
||||
from danswer.configs.constants import DEFAULT_BOOST
|
||||
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
get_experts_stores_representations,
|
||||
@ -16,6 +19,7 @@ from danswer.db.document import prepare_to_modify_documents
|
||||
from danswer.db.document import update_docs_updated_at
|
||||
from danswer.db.document import upsert_documents_complete
|
||||
from danswer.db.document_set import fetch_document_sets_for_documents
|
||||
from danswer.db.index_attempt import create_index_attempt_error
|
||||
from danswer.db.models import Document as DBDocument
|
||||
from danswer.db.tag import create_or_add_document_tag
|
||||
from danswer.db.tag import create_or_add_document_tag_list
|
||||
@ -33,6 +37,14 @@ from shared_configs.enums import EmbeddingProvider
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class DocumentBatchPrepareContext(BaseModel):
|
||||
updatable_docs: list[Document]
|
||||
id_to_db_doc_map: dict[str, DBDocument]
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class IndexingPipelineProtocol(Protocol):
|
||||
def __call__(
|
||||
self,
|
||||
@ -113,20 +125,72 @@ def get_doc_ids_to_update(
|
||||
return updatable_docs
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def index_doc_batch(
|
||||
def index_doc_batch_with_handler(
|
||||
*,
|
||||
chunker: Chunker,
|
||||
embedder: IndexingEmbedder,
|
||||
document_index: DocumentIndex,
|
||||
document_batch: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
attempt_id: int | None,
|
||||
db_session: Session,
|
||||
ignore_time_skip: bool = False,
|
||||
) -> tuple[int, int]:
|
||||
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
|
||||
Note that the documents should already be batched at this point so that it does not inflate the
|
||||
memory requirements"""
|
||||
r = (0, 0)
|
||||
try:
|
||||
r = index_doc_batch(
|
||||
chunker=chunker,
|
||||
embedder=embedder,
|
||||
document_index=document_index,
|
||||
document_batch=document_batch,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=ignore_time_skip,
|
||||
)
|
||||
except Exception as e:
|
||||
if INDEXING_EXCEPTION_LIMIT == 0:
|
||||
raise
|
||||
|
||||
trace = traceback.format_exc()
|
||||
create_index_attempt_error(
|
||||
attempt_id,
|
||||
batch=index_attempt_metadata.batch_num,
|
||||
docs=document_batch,
|
||||
exception_msg=str(e),
|
||||
exception_traceback=trace,
|
||||
db_session=db_session,
|
||||
)
|
||||
logger.exception(
|
||||
f"Indexing batch {index_attempt_metadata.batch_num} failed. msg='{e}' trace='{trace}'"
|
||||
)
|
||||
|
||||
index_attempt_metadata.num_exceptions += 1
|
||||
if index_attempt_metadata.num_exceptions == INDEXING_EXCEPTION_LIMIT:
|
||||
logger.info(
|
||||
f"Maximum number of exceptions for this index attempt "
|
||||
f"({INDEXING_EXCEPTION_LIMIT}) has been reached. "
|
||||
f"The next exception will abort the indexing attempt."
|
||||
)
|
||||
elif index_attempt_metadata.num_exceptions > INDEXING_EXCEPTION_LIMIT:
|
||||
logger.warning(
|
||||
f"Maximum number of exceptions for this index attempt "
|
||||
f"({INDEXING_EXCEPTION_LIMIT}) has been exceeded. Raising RuntimeError."
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Maximum exception limit of {INDEXING_EXCEPTION_LIMIT} exceeded."
|
||||
)
|
||||
else:
|
||||
pass
|
||||
|
||||
return r
|
||||
|
||||
|
||||
def index_doc_batch_prepare(
|
||||
document_batch: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
db_session: Session,
|
||||
ignore_time_skip: bool = False,
|
||||
) -> DocumentBatchPrepareContext | None:
|
||||
documents = []
|
||||
for document in document_batch:
|
||||
empty_contents = not any(section.text.strip() for section in document.sections)
|
||||
@ -154,11 +218,10 @@ def index_doc_batch(
|
||||
documents.append(document)
|
||||
|
||||
document_ids = [document.id for document in documents]
|
||||
db_docs = get_documents_by_ids(
|
||||
db_docs: list[DBDocument] = get_documents_by_ids(
|
||||
document_ids=document_ids,
|
||||
db_session=db_session,
|
||||
)
|
||||
id_to_db_doc_map = {doc.id: doc for doc in db_docs}
|
||||
|
||||
# Skip indexing docs that don't have a newer updated at
|
||||
# Shortcuts the time-consuming flow on connector index retries
|
||||
@ -170,9 +233,7 @@ def index_doc_batch(
|
||||
|
||||
# No docs to update either because the batch is empty or every doc was already indexed
|
||||
if not updatable_docs:
|
||||
return 0, 0
|
||||
|
||||
updatable_ids = [doc.id for doc in updatable_docs]
|
||||
return None
|
||||
|
||||
# Create records in the source of truth about these documents,
|
||||
# does not include doc_updated_at which is also used to indicate a successful update
|
||||
@ -182,9 +243,39 @@ def index_doc_batch(
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
id_to_db_doc_map = {doc.id: doc for doc in db_docs}
|
||||
return DocumentBatchPrepareContext(
|
||||
updatable_docs=updatable_docs, id_to_db_doc_map=id_to_db_doc_map
|
||||
)
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def index_doc_batch(
|
||||
*,
|
||||
chunker: Chunker,
|
||||
embedder: IndexingEmbedder,
|
||||
document_index: DocumentIndex,
|
||||
document_batch: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
db_session: Session,
|
||||
ignore_time_skip: bool = False,
|
||||
) -> tuple[int, int]:
|
||||
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
|
||||
Note that the documents should already be batched at this point so that it does not inflate the
|
||||
memory requirements"""
|
||||
|
||||
ctx = index_doc_batch_prepare(
|
||||
document_batch=document_batch,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
ignore_time_skip=ignore_time_skip,
|
||||
db_session=db_session,
|
||||
)
|
||||
if not ctx:
|
||||
return 0, 0
|
||||
|
||||
logger.debug("Starting chunking")
|
||||
chunks: list[DocAwareChunk] = []
|
||||
for document in updatable_docs:
|
||||
for document in ctx.updatable_docs:
|
||||
chunks.extend(chunker.chunk(document=document))
|
||||
|
||||
logger.debug("Starting embedding")
|
||||
@ -196,6 +287,8 @@ def index_doc_batch(
|
||||
else []
|
||||
)
|
||||
|
||||
updatable_ids = [doc.id for doc in ctx.updatable_docs]
|
||||
|
||||
# Acquires a lock on the documents so that no other process can modify them
|
||||
# NOTE: don't need to acquire till here, since this is when the actual race condition
|
||||
# with Vespa can occur.
|
||||
@ -220,8 +313,8 @@ def index_doc_batch(
|
||||
document_id_to_document_set.get(chunk.source_document.id, [])
|
||||
),
|
||||
boost=(
|
||||
id_to_db_doc_map[chunk.source_document.id].boost
|
||||
if chunk.source_document.id in id_to_db_doc_map
|
||||
ctx.id_to_db_doc_map[chunk.source_document.id].boost
|
||||
if chunk.source_document.id in ctx.id_to_db_doc_map
|
||||
else DEFAULT_BOOST
|
||||
),
|
||||
)
|
||||
@ -238,7 +331,7 @@ def index_doc_batch(
|
||||
|
||||
successful_doc_ids = [record.document_id for record in insertion_records]
|
||||
successful_docs = [
|
||||
doc for doc in updatable_docs if doc.id in successful_doc_ids
|
||||
doc for doc in ctx.updatable_docs if doc.id in successful_doc_ids
|
||||
]
|
||||
|
||||
# Update the time of latest version of the doc successfully indexed
|
||||
@ -264,6 +357,7 @@ def build_indexing_pipeline(
|
||||
db_session: Session,
|
||||
chunker: Chunker | None = None,
|
||||
ignore_time_skip: bool = False,
|
||||
attempt_id: int | None = None,
|
||||
) -> IndexingPipelineProtocol:
|
||||
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
|
||||
search_settings = get_search_settings()
|
||||
@ -293,10 +387,11 @@ def build_indexing_pipeline(
|
||||
)
|
||||
|
||||
return partial(
|
||||
index_doc_batch,
|
||||
index_doc_batch_with_handler,
|
||||
chunker=chunker,
|
||||
embedder=embedder,
|
||||
document_index=document_index,
|
||||
ignore_time_skip=ignore_time_skip,
|
||||
attempt_id=attempt_id,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
@ -73,6 +73,7 @@ from danswer.server.documents.cc_pair import router as cc_pair_router
|
||||
from danswer.server.documents.connector import router as connector_router
|
||||
from danswer.server.documents.credential import router as credential_router
|
||||
from danswer.server.documents.document import router as document_router
|
||||
from danswer.server.documents.indexing import router as indexing_router
|
||||
from danswer.server.features.document_set.api import router as document_set_router
|
||||
from danswer.server.features.folder.api import router as folder_router
|
||||
from danswer.server.features.input_prompt.api import (
|
||||
@ -393,6 +394,7 @@ def get_application() -> FastAPI:
|
||||
include_router_with_global_prefix_prepended(
|
||||
application, token_rate_limit_settings_router
|
||||
)
|
||||
include_router_with_global_prefix_prepended(application, indexing_router)
|
||||
|
||||
if AUTH_TYPE == AuthType.DISABLED:
|
||||
# Server logs this during auth setup verification step
|
||||
|
23
backend/danswer/server/documents/indexing.py
Normal file
23
backend/danswer/server/documents/indexing.py
Normal file
@ -0,0 +1,23 @@
|
||||
from fastapi import APIRouter
|
||||
from fastapi import Depends
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.auth.users import current_admin_user
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.db.index_attempt import (
|
||||
get_index_attempt_errors,
|
||||
)
|
||||
from danswer.db.models import User
|
||||
from danswer.server.documents.models import IndexAttemptError
|
||||
|
||||
router = APIRouter(prefix="/manage")
|
||||
|
||||
|
||||
@router.get("/admin/indexing-errors/{index_attempt_id}")
|
||||
def get_indexing_errors(
|
||||
index_attempt_id: int,
|
||||
_: User | None = Depends(current_admin_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> list[IndexAttemptError]:
|
||||
indexing_errors = get_index_attempt_errors(index_attempt_id, db_session)
|
||||
return [IndexAttemptError.from_db_model(e) for e in indexing_errors]
|
@ -6,12 +6,14 @@ from pydantic import BaseModel
|
||||
|
||||
from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.models import DocumentErrorSummary
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.db.models import Connector
|
||||
from danswer.db.models import ConnectorCredentialPair
|
||||
from danswer.db.models import Credential
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexAttemptError as DbIndexAttemptError
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.db.models import TaskStatus
|
||||
from danswer.server.utils import mask_credential_dict
|
||||
@ -122,6 +124,7 @@ class IndexAttemptSnapshot(BaseModel):
|
||||
total_docs_indexed: int # includes docs that are updated
|
||||
docs_removed_from_index: int
|
||||
error_msg: str | None
|
||||
error_count: int
|
||||
full_exception_trace: str | None
|
||||
time_started: str | None
|
||||
time_updated: str
|
||||
@ -137,6 +140,7 @@ class IndexAttemptSnapshot(BaseModel):
|
||||
total_docs_indexed=index_attempt.total_docs_indexed or 0,
|
||||
docs_removed_from_index=index_attempt.docs_removed_from_index or 0,
|
||||
error_msg=index_attempt.error_msg,
|
||||
error_count=len(index_attempt.error_rows),
|
||||
full_exception_trace=index_attempt.full_exception_trace,
|
||||
time_started=(
|
||||
index_attempt.time_started.isoformat()
|
||||
@ -147,6 +151,31 @@ class IndexAttemptSnapshot(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
class IndexAttemptError(BaseModel):
|
||||
id: int
|
||||
index_attempt_id: int | None
|
||||
batch_number: int | None
|
||||
doc_summaries: list[DocumentErrorSummary]
|
||||
error_msg: str | None
|
||||
traceback: str | None
|
||||
time_created: str
|
||||
|
||||
@classmethod
|
||||
def from_db_model(cls, error: DbIndexAttemptError) -> "IndexAttemptError":
|
||||
doc_summaries = [
|
||||
DocumentErrorSummary.from_dict(summary) for summary in error.doc_summaries
|
||||
]
|
||||
return IndexAttemptError(
|
||||
id=error.id,
|
||||
index_attempt_id=error.index_attempt_id,
|
||||
batch_number=error.batch,
|
||||
doc_summaries=doc_summaries,
|
||||
error_msg=error.error_msg,
|
||||
traceback=error.traceback,
|
||||
time_created=error.time_created.isoformat(),
|
||||
)
|
||||
|
||||
|
||||
class CCPairFullInfo(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
|
@ -29,7 +29,8 @@ def log_function_time(
|
||||
start_time = time.time()
|
||||
user = kwargs.get("user")
|
||||
result = func(*args, **kwargs)
|
||||
elapsed_time_str = str(time.time() - start_time)
|
||||
elapsed_time = time.time() - start_time
|
||||
elapsed_time_str = f"{elapsed_time:.3f}"
|
||||
log_name = func_name or func.__name__
|
||||
args_str = f" args={args} kwargs={kwargs}" if include_args else ""
|
||||
final_log = f"{log_name}{args_str} took {elapsed_time_str} seconds"
|
||||
|
@ -18,7 +18,8 @@ import { PageSelector } from "@/components/PageSelector";
|
||||
import { localizeAndPrettify } from "@/lib/time";
|
||||
import { getDocsProcessedPerMinute } from "@/lib/indexAttempt";
|
||||
import { Modal } from "@/components/Modal";
|
||||
import { CheckmarkIcon, CopyIcon } from "@/components/icons/icons";
|
||||
import { CheckmarkIcon, CopyIcon, SearchIcon } from "@/components/icons/icons";
|
||||
import Link from "next/link";
|
||||
import ExceptionTraceModal from "@/components/modals/ExceptionTraceModal";
|
||||
|
||||
const NUM_IN_PAGE = 8;
|
||||
@ -50,7 +51,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) {
|
||||
<TableHeaderCell>Status</TableHeaderCell>
|
||||
<TableHeaderCell>New Doc Cnt</TableHeaderCell>
|
||||
<TableHeaderCell>Total Doc Cnt</TableHeaderCell>
|
||||
<TableHeaderCell>Error Msg</TableHeaderCell>
|
||||
<TableHeaderCell>Error Message</TableHeaderCell>
|
||||
</TableRow>
|
||||
</TableHead>
|
||||
<TableBody>
|
||||
@ -93,9 +94,31 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) {
|
||||
<TableCell>{indexAttempt.total_docs_indexed}</TableCell>
|
||||
<TableCell>
|
||||
<div>
|
||||
<Text className="flex flex-wrap whitespace-normal">
|
||||
{indexAttempt.error_msg || "-"}
|
||||
</Text>
|
||||
{indexAttempt.error_count > 0 && (
|
||||
<Link
|
||||
className="cursor-pointer my-auto"
|
||||
href={`/admin/indexing/${indexAttempt.id}`}
|
||||
>
|
||||
<Text className="flex flex-wrap text-link whitespace-normal">
|
||||
<SearchIcon />
|
||||
View Errors
|
||||
</Text>
|
||||
</Link>
|
||||
)}
|
||||
|
||||
{indexAttempt.status === "success" && (
|
||||
<Text className="flex flex-wrap whitespace-normal">
|
||||
{"-"}
|
||||
</Text>
|
||||
)}
|
||||
|
||||
{indexAttempt.status === "failed" &&
|
||||
indexAttempt.error_msg && (
|
||||
<Text className="flex flex-wrap whitespace-normal">
|
||||
{indexAttempt.error_msg}
|
||||
</Text>
|
||||
)}
|
||||
|
||||
{indexAttempt.full_exception_trace && (
|
||||
<div
|
||||
onClick={() => {
|
||||
|
189
web/src/app/admin/indexing/[id]/IndexAttemptErrorsTable.tsx
Normal file
189
web/src/app/admin/indexing/[id]/IndexAttemptErrorsTable.tsx
Normal file
@ -0,0 +1,189 @@
|
||||
"use client";
|
||||
|
||||
import { Modal } from "@/components/Modal";
|
||||
import { PageSelector } from "@/components/PageSelector";
|
||||
import { CheckmarkIcon, CopyIcon } from "@/components/icons/icons";
|
||||
import { localizeAndPrettify } from "@/lib/time";
|
||||
import {
|
||||
Table,
|
||||
TableBody,
|
||||
TableCell,
|
||||
TableHead,
|
||||
TableHeaderCell,
|
||||
TableRow,
|
||||
Text,
|
||||
} from "@tremor/react";
|
||||
import { useState } from "react";
|
||||
import { IndexAttemptError } from "./types";
|
||||
|
||||
const NUM_IN_PAGE = 8;
|
||||
|
||||
export function CustomModal({
|
||||
isVisible,
|
||||
onClose,
|
||||
title,
|
||||
content,
|
||||
showCopyButton = false,
|
||||
}: {
|
||||
isVisible: boolean;
|
||||
onClose: () => void;
|
||||
title: string;
|
||||
content: string;
|
||||
showCopyButton?: boolean;
|
||||
}) {
|
||||
const [copyClicked, setCopyClicked] = useState(false);
|
||||
|
||||
if (!isVisible) return null;
|
||||
|
||||
return (
|
||||
<Modal
|
||||
width="w-4/6"
|
||||
className="h-5/6 overflow-y-hidden flex flex-col"
|
||||
title={title}
|
||||
onOutsideClick={onClose}
|
||||
>
|
||||
<div className="overflow-y-auto mb-6">
|
||||
{showCopyButton && (
|
||||
<div className="mb-6">
|
||||
{!copyClicked ? (
|
||||
<div
|
||||
onClick={() => {
|
||||
navigator.clipboard.writeText(content);
|
||||
setCopyClicked(true);
|
||||
setTimeout(() => setCopyClicked(false), 2000);
|
||||
}}
|
||||
className="flex w-fit cursor-pointer hover:bg-hover-light p-2 border-border border rounded"
|
||||
>
|
||||
Copy full content
|
||||
<CopyIcon className="ml-2 my-auto" />
|
||||
</div>
|
||||
) : (
|
||||
<div className="flex w-fit hover:bg-hover-light p-2 border-border border rounded cursor-default">
|
||||
Copied to clipboard
|
||||
<CheckmarkIcon
|
||||
className="my-auto ml-2 flex flex-shrink-0 text-success"
|
||||
size={16}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
<div className="whitespace-pre-wrap">{content}</div>
|
||||
</div>
|
||||
</Modal>
|
||||
);
|
||||
}
|
||||
|
||||
export function IndexAttemptErrorsTable({
|
||||
indexAttemptErrors,
|
||||
}: {
|
||||
indexAttemptErrors: IndexAttemptError[];
|
||||
}) {
|
||||
const [page, setPage] = useState(1);
|
||||
const [modalData, setModalData] = useState<{
|
||||
id: number | null;
|
||||
title: string;
|
||||
content: string;
|
||||
} | null>(null);
|
||||
const closeModal = () => setModalData(null);
|
||||
|
||||
return (
|
||||
<>
|
||||
{modalData && (
|
||||
<CustomModal
|
||||
isVisible={!!modalData}
|
||||
onClose={closeModal}
|
||||
title={modalData.title}
|
||||
content={modalData.content}
|
||||
showCopyButton
|
||||
/>
|
||||
)}
|
||||
|
||||
<Table>
|
||||
<TableHead>
|
||||
<TableRow>
|
||||
<TableHeaderCell>Timestamp</TableHeaderCell>
|
||||
<TableHeaderCell>Batch Number</TableHeaderCell>
|
||||
<TableHeaderCell>Document Summaries</TableHeaderCell>
|
||||
<TableHeaderCell>Error Message</TableHeaderCell>
|
||||
</TableRow>
|
||||
</TableHead>
|
||||
<TableBody>
|
||||
{indexAttemptErrors
|
||||
.slice(NUM_IN_PAGE * (page - 1), NUM_IN_PAGE * page)
|
||||
.map((indexAttemptError) => {
|
||||
return (
|
||||
<TableRow key={indexAttemptError.id}>
|
||||
<TableCell>
|
||||
{indexAttemptError.time_created
|
||||
? localizeAndPrettify(indexAttemptError.time_created)
|
||||
: "-"}
|
||||
</TableCell>
|
||||
<TableCell>{indexAttemptError.batch_number}</TableCell>
|
||||
<TableCell>
|
||||
{indexAttemptError.doc_summaries && (
|
||||
<div
|
||||
onClick={() =>
|
||||
setModalData({
|
||||
id: indexAttemptError.id,
|
||||
title: "Document Summaries",
|
||||
content: JSON.stringify(
|
||||
indexAttemptError.doc_summaries,
|
||||
null,
|
||||
2
|
||||
),
|
||||
})
|
||||
}
|
||||
className="mt-2 text-link cursor-pointer select-none"
|
||||
>
|
||||
View Document Summaries
|
||||
</div>
|
||||
)}
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
<div>
|
||||
<Text className="flex flex-wrap whitespace-normal">
|
||||
{indexAttemptError.error_msg || "-"}
|
||||
</Text>
|
||||
{indexAttemptError.traceback && (
|
||||
<div
|
||||
onClick={() =>
|
||||
setModalData({
|
||||
id: indexAttemptError.id,
|
||||
title: "Exception Traceback",
|
||||
content: indexAttemptError.traceback!,
|
||||
})
|
||||
}
|
||||
className="mt-2 text-link cursor-pointer select-none"
|
||||
>
|
||||
View Full Trace
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</TableCell>
|
||||
</TableRow>
|
||||
);
|
||||
})}
|
||||
</TableBody>
|
||||
</Table>
|
||||
{indexAttemptErrors.length > NUM_IN_PAGE && (
|
||||
<div className="mt-3 flex">
|
||||
<div className="mx-auto">
|
||||
<PageSelector
|
||||
totalPages={Math.ceil(indexAttemptErrors.length / NUM_IN_PAGE)}
|
||||
currentPage={page}
|
||||
onPageChange={(newPage) => {
|
||||
setPage(newPage);
|
||||
window.scrollTo({
|
||||
top: 0,
|
||||
left: 0,
|
||||
behavior: "smooth",
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</>
|
||||
);
|
||||
}
|
3
web/src/app/admin/indexing/[id]/lib.ts
Normal file
3
web/src/app/admin/indexing/[id]/lib.ts
Normal file
@ -0,0 +1,3 @@
|
||||
export function buildIndexingErrorsUrl(id: string | number) {
|
||||
return `/api/manage/admin/indexing-errors/${id}`;
|
||||
}
|
58
web/src/app/admin/indexing/[id]/page.tsx
Normal file
58
web/src/app/admin/indexing/[id]/page.tsx
Normal file
@ -0,0 +1,58 @@
|
||||
"use client";
|
||||
|
||||
import { BackButton } from "@/components/BackButton";
|
||||
import { ErrorCallout } from "@/components/ErrorCallout";
|
||||
import { ThreeDotsLoader } from "@/components/Loading";
|
||||
import { errorHandlingFetcher } from "@/lib/fetcher";
|
||||
import { ValidSources } from "@/lib/types";
|
||||
import { Title } from "@tremor/react";
|
||||
import useSWR from "swr";
|
||||
import { IndexAttemptErrorsTable } from "./IndexAttemptErrorsTable";
|
||||
import { buildIndexingErrorsUrl } from "./lib";
|
||||
import { IndexAttemptError } from "./types";
|
||||
|
||||
function Main({ id }: { id: number }) {
|
||||
const {
|
||||
data: indexAttemptErrors,
|
||||
isLoading,
|
||||
error,
|
||||
} = useSWR<IndexAttemptError[]>(
|
||||
buildIndexingErrorsUrl(id),
|
||||
errorHandlingFetcher
|
||||
);
|
||||
|
||||
if (isLoading) {
|
||||
return <ThreeDotsLoader />;
|
||||
}
|
||||
|
||||
if (error || !indexAttemptErrors) {
|
||||
return (
|
||||
<ErrorCallout
|
||||
errorTitle={`Failed to fetch errors for attempt ID ${id}`}
|
||||
errorMsg={error?.info?.detail || error.toString()}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<>
|
||||
<BackButton />
|
||||
<div className="mt-6">
|
||||
<div className="flex">
|
||||
<Title>Indexing Errors for Attempt {id}</Title>
|
||||
</div>
|
||||
<IndexAttemptErrorsTable indexAttemptErrors={indexAttemptErrors} />
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
}
|
||||
|
||||
export default function Page({ params }: { params: { id: string } }) {
|
||||
const id = parseInt(params.id);
|
||||
|
||||
return (
|
||||
<div className="mx-auto container">
|
||||
<Main id={id} />
|
||||
</div>
|
||||
);
|
||||
}
|
15
web/src/app/admin/indexing/[id]/types.ts
Normal file
15
web/src/app/admin/indexing/[id]/types.ts
Normal file
@ -0,0 +1,15 @@
|
||||
export interface IndexAttemptError {
|
||||
id: number;
|
||||
index_attempt_id: number;
|
||||
batch_number: number;
|
||||
doc_summaries: DocumentErrorSummary[];
|
||||
error_msg: string;
|
||||
traceback: string;
|
||||
time_created: string;
|
||||
}
|
||||
|
||||
export interface DocumentErrorSummary {
|
||||
id: string;
|
||||
semantic_id: string;
|
||||
section_link: string;
|
||||
}
|
@ -42,6 +42,26 @@ export function IndexAttemptStatus({
|
||||
} else {
|
||||
badge = icon;
|
||||
}
|
||||
} else if (status === "completed_with_errors") {
|
||||
const icon = (
|
||||
<Badge size={size} color="yellow" icon={FiAlertTriangle}>
|
||||
Completed with errors
|
||||
</Badge>
|
||||
);
|
||||
badge = (
|
||||
<HoverPopup
|
||||
mainContent={<div className="cursor-pointer">{icon}</div>}
|
||||
popupContent={
|
||||
<div className="w-64 p-2 break-words overflow-hidden whitespace-normal">
|
||||
The indexing attempt completed, but some errors were encountered
|
||||
during the run.
|
||||
<br />
|
||||
<br />
|
||||
Click View Errors for more details.
|
||||
</div>
|
||||
}
|
||||
/>
|
||||
);
|
||||
} else if (status === "success") {
|
||||
badge = (
|
||||
<Badge size={size} color="green" icon={FiCheckCircle}>
|
||||
|
@ -38,6 +38,7 @@ export interface MinimalUserSnapshot {
|
||||
export type ValidInputTypes = "load_state" | "poll" | "event";
|
||||
export type ValidStatuses =
|
||||
| "success"
|
||||
| "completed_with_errors"
|
||||
| "failed"
|
||||
| "in_progress"
|
||||
| "not_started";
|
||||
@ -59,6 +60,7 @@ export interface IndexAttemptSnapshot {
|
||||
docs_removed_from_index: number;
|
||||
total_docs_indexed: number;
|
||||
error_msg: string | null;
|
||||
error_count: number;
|
||||
full_exception_trace: string | null;
|
||||
time_started: string | null;
|
||||
time_updated: string;
|
||||
|
Loading…
x
Reference in New Issue
Block a user