diff --git a/backend/alembic/versions/c5b692fa265c_add_index_attempt_errors_table.py b/backend/alembic/versions/c5b692fa265c_add_index_attempt_errors_table.py new file mode 100644 index 000000000..c49fea219 --- /dev/null +++ b/backend/alembic/versions/c5b692fa265c_add_index_attempt_errors_table.py @@ -0,0 +1,57 @@ +"""Add index_attempt_errors table + +Revision ID: c5b692fa265c +Revises: 4a951134c801 +Create Date: 2024-08-08 14:06:39.581972 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "c5b692fa265c" +down_revision = "4a951134c801" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "index_attempt_errors", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("index_attempt_id", sa.Integer(), nullable=True), + sa.Column("batch", sa.Integer(), nullable=True), + sa.Column( + "doc_summaries", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + ), + sa.Column("error_msg", sa.Text(), nullable=True), + sa.Column("traceback", sa.Text(), nullable=True), + sa.Column( + "time_created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["index_attempt_id"], + ["index_attempt.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "index_attempt_id", + "index_attempt_errors", + ["time_created"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index("index_attempt_id", table_name="index_attempt_errors") + op.drop_table("index_attempt_errors") + # ### end Alembic commands ### diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 4b4405956..2e4cc71e8 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -24,6 +24,7 @@ from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import mark_attempt_failed from danswer.db.index_attempt import mark_attempt_in_progress +from danswer.db.index_attempt import mark_attempt_partially_succeeded from danswer.db.index_attempt import mark_attempt_succeeded from danswer.db.index_attempt import update_docs_indexed from danswer.db.models import IndexAttempt @@ -131,6 +132,7 @@ def _run_indexing( ) indexing_pipeline = build_indexing_pipeline( + attempt_id=index_attempt.id, embedder=embedding_model, document_index=document_index, ignore_time_skip=index_attempt.from_beginning @@ -163,6 +165,12 @@ def _run_indexing( tracer.start() tracer.snap() + index_attempt_md = IndexAttemptMetadata( + connector_id=db_connector.id, + credential_id=db_credential.id, + ) + + batch_num = 0 net_doc_change = 0 document_count = 0 chunk_count = 0 @@ -228,13 +236,13 @@ def _run_indexing( logger.debug(f"Indexing batch of documents: {batch_description}") + index_attempt_md.batch_num = batch_num + 1 # use 1-index for this new_docs, total_batch_chunks = indexing_pipeline( document_batch=doc_batch, - index_attempt_metadata=IndexAttemptMetadata( - connector_id=db_connector.id, - credential_id=db_credential.id, - ), + index_attempt_metadata=index_attempt_md, ) + + batch_num += 1 net_doc_change += new_docs chunk_count += total_batch_chunks document_count += len(doc_batch) @@ -323,7 +331,44 @@ def _run_indexing( tracer.stop() logger.info("Memory tracer stopped.") - mark_attempt_succeeded(index_attempt, db_session) + if ( + index_attempt_md.num_exceptions > 0 + and index_attempt_md.num_exceptions >= batch_num + ): + mark_attempt_failed( + index_attempt, + db_session, + failure_reason="All batches exceptioned.", + ) + if is_primary: + update_connector_credential_pair( + db_session=db_session, + connector_id=index_attempt.connector_credential_pair.connector.id, + credential_id=index_attempt.connector_credential_pair.credential.id, + ) + raise Exception( + f"Connector failed - All batches exceptioned: batches={batch_num}" + ) + + elapsed_time = time.time() - start_time + + if index_attempt_md.num_exceptions == 0: + mark_attempt_succeeded(index_attempt, db_session) + logger.info( + f"Connector succeeded: " + f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s" + ) + else: + mark_attempt_partially_succeeded(index_attempt, db_session) + logger.info( + f"Connector completed with some errors: " + f"exceptions={index_attempt_md.num_exceptions} " + f"batches={batch_num} " + f"docs={document_count} " + f"chunks={chunk_count} " + f"elapsed={elapsed_time:.2f}s" + ) + if is_primary: update_connector_credential_pair( db_session=db_session, @@ -332,11 +377,6 @@ def _run_indexing( run_dt=run_end_dt, ) - elapsed_time = time.time() - start_time - logger.info( - f"Connector succeeded: docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s" - ) - def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexAttempt: # make sure that the index attempt can't change in between checking the diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 9b3c70745..1949f7905 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -120,16 +120,6 @@ def _should_create_new_indexing( return time_since_index.total_seconds() >= connector.refresh_freq -def _is_indexing_job_marked_as_finished(index_attempt: IndexAttempt | None) -> bool: - if index_attempt is None: - return False - - return ( - index_attempt.status == IndexingStatus.FAILED - or index_attempt.status == IndexingStatus.SUCCESS - ) - - def _mark_run_failed( db_session: Session, index_attempt: IndexAttempt, failure_reason: str ) -> None: @@ -215,10 +205,12 @@ def cleanup_indexing_jobs( ) # do nothing for ongoing jobs that haven't been stopped - if not job.done() and not _is_indexing_job_marked_as_finished( - index_attempt - ): - continue + if not job.done(): + if not index_attempt: + continue + + if not index_attempt.is_finished(): + continue if job.status == "error": logger.error(job.exception()) diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index fdc7c652c..d89c39ca8 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -295,6 +295,10 @@ INDEXING_SIZE_WARNING_THRESHOLD = int( # 0 disables this behavior and is the default. INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0)) +# During an indexing attempt, specifies the number of batches which are allowed to +# exception without aborting the attempt. +INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0)) + ##### # Miscellaneous ##### diff --git a/backend/danswer/connectors/models.py b/backend/danswer/connectors/models.py index 1f4a1d2ae..192aa1b20 100644 --- a/backend/danswer/connectors/models.py +++ b/backend/danswer/connectors/models.py @@ -166,6 +166,36 @@ class Document(DocumentBase): ) +class DocumentErrorSummary(BaseModel): + id: str + semantic_id: str + section_link: str | None + + @classmethod + def from_document(cls, doc: Document) -> "DocumentErrorSummary": + section_link = doc.sections[0].link if len(doc.sections) > 0 else None + return cls( + id=doc.id, semantic_id=doc.semantic_identifier, section_link=section_link + ) + + @classmethod + def from_dict(cls, data: dict) -> "DocumentErrorSummary": + return cls( + id=str(data.get("id")), + semantic_id=str(data.get("semantic_id")), + section_link=str(data.get("section_link")), + ) + + def to_dict(self) -> dict[str, str | None]: + return { + "id": self.id, + "semantic_id": self.semantic_id, + "section_link": self.section_link, + } + + class IndexAttemptMetadata(BaseModel): + batch_num: int | None = None + num_exceptions: int = 0 connector_id: int credential_id: int diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index 2ea77d072..c6104bbde 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -317,7 +317,7 @@ def prepare_to_modify_documents( called ahead of any modification to Vespa. Locks should be released by the caller as soon as updates are complete by finishing the transaction. - NOTE: only one commit is allowed within the context manager returned by this funtion. + NOTE: only one commit is allowed within the context manager returned by this function. Multiple commits will result in a sqlalchemy.exc.InvalidRequestError. NOTE: this function will commit any existing transaction. """ diff --git a/backend/danswer/db/enums.py b/backend/danswer/db/enums.py index 55f0b9bcb..ecb14825e 100644 --- a/backend/danswer/db/enums.py +++ b/backend/danswer/db/enums.py @@ -6,6 +6,15 @@ class IndexingStatus(str, PyEnum): IN_PROGRESS = "in_progress" SUCCESS = "success" FAILED = "failed" + COMPLETED_WITH_ERRORS = "completed_with_errors" + + def is_terminal(self) -> bool: + terminal_states = { + IndexingStatus.SUCCESS, + IndexingStatus.COMPLETED_WITH_ERRORS, + IndexingStatus.FAILED, + } + return self in terminal_states # these may differ in the future, which is why we're okay with this duplication diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 2763f397b..a87f8f45f 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -11,8 +11,11 @@ from sqlalchemy import update from sqlalchemy.orm import joinedload from sqlalchemy.orm import Session +from danswer.connectors.models import Document +from danswer.connectors.models import DocumentErrorSummary from danswer.db.models import EmbeddingModel from danswer.db.models import IndexAttempt +from danswer.db.models import IndexAttemptError from danswer.db.models import IndexingStatus from danswer.db.models import IndexModelStatus from danswer.server.documents.models import ConnectorCredentialPair @@ -118,6 +121,15 @@ def mark_attempt_succeeded( db_session.commit() +def mark_attempt_partially_succeeded( + index_attempt: IndexAttempt, + db_session: Session, +) -> None: + index_attempt.status = IndexingStatus.COMPLETED_WITH_ERRORS + db_session.add(index_attempt) + db_session.commit() + + def mark_attempt_failed( index_attempt: IndexAttempt, db_session: Session, @@ -401,3 +413,41 @@ def count_unique_cc_pairs_with_successful_index_attempts( ) return unique_pairs_count + + +def create_index_attempt_error( + index_attempt_id: int | None, + batch: int | None, + docs: list[Document], + exception_msg: str, + exception_traceback: str, + db_session: Session, +) -> int: + doc_summaries = [] + for doc in docs: + doc_summary = DocumentErrorSummary.from_document(doc) + doc_summaries.append(doc_summary.to_dict()) + + new_error = IndexAttemptError( + index_attempt_id=index_attempt_id, + batch=batch, + doc_summaries=doc_summaries, + error_msg=exception_msg, + traceback=exception_traceback, + ) + db_session.add(new_error) + db_session.commit() + + return new_error.id + + +def get_index_attempt_errors( + index_attempt_id: int, + db_session: Session, +) -> list[IndexAttemptError]: + stmt = select(IndexAttemptError).where( + IndexAttemptError.index_attempt_id == index_attempt_id + ) + + errors = db_session.scalars(stmt) + return list(errors.all()) diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index b53679e8e..cd8f1721c 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -660,6 +660,8 @@ class IndexAttempt(Base): "EmbeddingModel", back_populates="index_attempts" ) + error_rows = relationship("IndexAttemptError", back_populates="index_attempt") + __table_args__ = ( Index( "ix_index_attempt_latest_for_connector_credential_pair", @@ -677,6 +679,53 @@ class IndexAttempt(Base): f"time_updated={self.time_updated!r}, " ) + def is_finished(self) -> bool: + return self.status.is_terminal() + + +class IndexAttemptError(Base): + """ + Represents an error that was encountered during an IndexAttempt. + """ + + __tablename__ = "index_attempt_errors" + + id: Mapped[int] = mapped_column(primary_key=True) + + index_attempt_id: Mapped[int] = mapped_column( + ForeignKey("index_attempt.id"), + nullable=True, + ) + + # The index of the batch where the error occurred (if looping thru batches) + # Just informational. + batch: Mapped[int | None] = mapped_column(Integer, default=None) + doc_summaries: Mapped[list[Any]] = mapped_column(postgresql.JSONB()) + error_msg: Mapped[str | None] = mapped_column(Text, default=None) + traceback: Mapped[str | None] = mapped_column(Text, default=None) + time_created: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + ) + + # This is the reverse side of the relationship + index_attempt = relationship("IndexAttempt", back_populates="error_rows") + + __table_args__ = ( + Index( + "index_attempt_id", + "time_created", + ), + ) + + def __repr__(self) -> str: + return ( + f"" + f"time_created={self.time_created!r}, " + ) + class DocumentByConnectorCredentialPair(Base): """Represents an indexing of a document by a specific connector / credential pair""" diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 0c81f2a15..60faafe4d 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -1,10 +1,13 @@ +import traceback from functools import partial from typing import Protocol +from pydantic import BaseModel from sqlalchemy.orm import Session from danswer.access.access import get_access_for_documents from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING +from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT from danswer.configs.constants import DEFAULT_BOOST from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, @@ -16,6 +19,7 @@ from danswer.db.document import prepare_to_modify_documents from danswer.db.document import update_docs_updated_at from danswer.db.document import upsert_documents_complete from danswer.db.document_set import fetch_document_sets_for_documents +from danswer.db.index_attempt import create_index_attempt_error from danswer.db.models import Document as DBDocument from danswer.db.tag import create_or_add_document_tag from danswer.db.tag import create_or_add_document_tag_list @@ -33,6 +37,14 @@ from shared_configs.enums import EmbeddingProvider logger = setup_logger() +class DocumentBatchPrepareContext(BaseModel): + updatable_docs: list[Document] + id_to_db_doc_map: dict[str, DBDocument] + + class Config: + arbitrary_types_allowed = True + + class IndexingPipelineProtocol(Protocol): def __call__( self, @@ -113,20 +125,72 @@ def get_doc_ids_to_update( return updatable_docs -@log_function_time() -def index_doc_batch( +def index_doc_batch_with_handler( *, chunker: Chunker, embedder: IndexingEmbedder, document_index: DocumentIndex, document_batch: list[Document], index_attempt_metadata: IndexAttemptMetadata, + attempt_id: int | None, db_session: Session, ignore_time_skip: bool = False, ) -> tuple[int, int]: - """Takes different pieces of the indexing pipeline and applies it to a batch of documents - Note that the documents should already be batched at this point so that it does not inflate the - memory requirements""" + r = (0, 0) + try: + r = index_doc_batch( + chunker=chunker, + embedder=embedder, + document_index=document_index, + document_batch=document_batch, + index_attempt_metadata=index_attempt_metadata, + db_session=db_session, + ignore_time_skip=ignore_time_skip, + ) + except Exception as e: + if INDEXING_EXCEPTION_LIMIT == 0: + raise + + trace = traceback.format_exc() + create_index_attempt_error( + attempt_id, + batch=index_attempt_metadata.batch_num, + docs=document_batch, + exception_msg=str(e), + exception_traceback=trace, + db_session=db_session, + ) + logger.exception( + f"Indexing batch {index_attempt_metadata.batch_num} failed. msg='{e}' trace='{trace}'" + ) + + index_attempt_metadata.num_exceptions += 1 + if index_attempt_metadata.num_exceptions == INDEXING_EXCEPTION_LIMIT: + logger.info( + f"Maximum number of exceptions for this index attempt " + f"({INDEXING_EXCEPTION_LIMIT}) has been reached. " + f"The next exception will abort the indexing attempt." + ) + elif index_attempt_metadata.num_exceptions > INDEXING_EXCEPTION_LIMIT: + logger.warning( + f"Maximum number of exceptions for this index attempt " + f"({INDEXING_EXCEPTION_LIMIT}) has been exceeded. Raising RuntimeError." + ) + raise RuntimeError( + f"Maximum exception limit of {INDEXING_EXCEPTION_LIMIT} exceeded." + ) + else: + pass + + return r + + +def index_doc_batch_prepare( + document_batch: list[Document], + index_attempt_metadata: IndexAttemptMetadata, + db_session: Session, + ignore_time_skip: bool = False, +) -> DocumentBatchPrepareContext | None: documents = [] for document in document_batch: empty_contents = not any(section.text.strip() for section in document.sections) @@ -154,11 +218,10 @@ def index_doc_batch( documents.append(document) document_ids = [document.id for document in documents] - db_docs = get_documents_by_ids( + db_docs: list[DBDocument] = get_documents_by_ids( document_ids=document_ids, db_session=db_session, ) - id_to_db_doc_map = {doc.id: doc for doc in db_docs} # Skip indexing docs that don't have a newer updated at # Shortcuts the time-consuming flow on connector index retries @@ -170,9 +233,7 @@ def index_doc_batch( # No docs to update either because the batch is empty or every doc was already indexed if not updatable_docs: - return 0, 0 - - updatable_ids = [doc.id for doc in updatable_docs] + return None # Create records in the source of truth about these documents, # does not include doc_updated_at which is also used to indicate a successful update @@ -182,9 +243,39 @@ def index_doc_batch( db_session=db_session, ) + id_to_db_doc_map = {doc.id: doc for doc in db_docs} + return DocumentBatchPrepareContext( + updatable_docs=updatable_docs, id_to_db_doc_map=id_to_db_doc_map + ) + + +@log_function_time() +def index_doc_batch( + *, + chunker: Chunker, + embedder: IndexingEmbedder, + document_index: DocumentIndex, + document_batch: list[Document], + index_attempt_metadata: IndexAttemptMetadata, + db_session: Session, + ignore_time_skip: bool = False, +) -> tuple[int, int]: + """Takes different pieces of the indexing pipeline and applies it to a batch of documents + Note that the documents should already be batched at this point so that it does not inflate the + memory requirements""" + + ctx = index_doc_batch_prepare( + document_batch=document_batch, + index_attempt_metadata=index_attempt_metadata, + ignore_time_skip=ignore_time_skip, + db_session=db_session, + ) + if not ctx: + return 0, 0 + logger.debug("Starting chunking") chunks: list[DocAwareChunk] = [] - for document in updatable_docs: + for document in ctx.updatable_docs: chunks.extend(chunker.chunk(document=document)) logger.debug("Starting embedding") @@ -196,6 +287,8 @@ def index_doc_batch( else [] ) + updatable_ids = [doc.id for doc in ctx.updatable_docs] + # Acquires a lock on the documents so that no other process can modify them # NOTE: don't need to acquire till here, since this is when the actual race condition # with Vespa can occur. @@ -220,8 +313,8 @@ def index_doc_batch( document_id_to_document_set.get(chunk.source_document.id, []) ), boost=( - id_to_db_doc_map[chunk.source_document.id].boost - if chunk.source_document.id in id_to_db_doc_map + ctx.id_to_db_doc_map[chunk.source_document.id].boost + if chunk.source_document.id in ctx.id_to_db_doc_map else DEFAULT_BOOST ), ) @@ -238,7 +331,7 @@ def index_doc_batch( successful_doc_ids = [record.document_id for record in insertion_records] successful_docs = [ - doc for doc in updatable_docs if doc.id in successful_doc_ids + doc for doc in ctx.updatable_docs if doc.id in successful_doc_ids ] # Update the time of latest version of the doc successfully indexed @@ -264,6 +357,7 @@ def build_indexing_pipeline( db_session: Session, chunker: Chunker | None = None, ignore_time_skip: bool = False, + attempt_id: int | None = None, ) -> IndexingPipelineProtocol: """Builds a pipeline which takes in a list (batch) of docs and indexes them.""" search_settings = get_search_settings() @@ -293,10 +387,11 @@ def build_indexing_pipeline( ) return partial( - index_doc_batch, + index_doc_batch_with_handler, chunker=chunker, embedder=embedder, document_index=document_index, ignore_time_skip=ignore_time_skip, + attempt_id=attempt_id, db_session=db_session, ) diff --git a/backend/danswer/main.py b/backend/danswer/main.py index aa5597c08..76789e474 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -73,6 +73,7 @@ from danswer.server.documents.cc_pair import router as cc_pair_router from danswer.server.documents.connector import router as connector_router from danswer.server.documents.credential import router as credential_router from danswer.server.documents.document import router as document_router +from danswer.server.documents.indexing import router as indexing_router from danswer.server.features.document_set.api import router as document_set_router from danswer.server.features.folder.api import router as folder_router from danswer.server.features.input_prompt.api import ( @@ -393,6 +394,7 @@ def get_application() -> FastAPI: include_router_with_global_prefix_prepended( application, token_rate_limit_settings_router ) + include_router_with_global_prefix_prepended(application, indexing_router) if AUTH_TYPE == AuthType.DISABLED: # Server logs this during auth setup verification step diff --git a/backend/danswer/server/documents/indexing.py b/backend/danswer/server/documents/indexing.py new file mode 100644 index 000000000..4d5081c3f --- /dev/null +++ b/backend/danswer/server/documents/indexing.py @@ -0,0 +1,23 @@ +from fastapi import APIRouter +from fastapi import Depends +from sqlalchemy.orm import Session + +from danswer.auth.users import current_admin_user +from danswer.db.engine import get_session +from danswer.db.index_attempt import ( + get_index_attempt_errors, +) +from danswer.db.models import User +from danswer.server.documents.models import IndexAttemptError + +router = APIRouter(prefix="/manage") + + +@router.get("/admin/indexing-errors/{index_attempt_id}") +def get_indexing_errors( + index_attempt_id: int, + _: User | None = Depends(current_admin_user), + db_session: Session = Depends(get_session), +) -> list[IndexAttemptError]: + indexing_errors = get_index_attempt_errors(index_attempt_id, db_session) + return [IndexAttemptError.from_db_model(e) for e in indexing_errors] diff --git a/backend/danswer/server/documents/models.py b/backend/danswer/server/documents/models.py index 135de446d..ed23b79d6 100644 --- a/backend/danswer/server/documents/models.py +++ b/backend/danswer/server/documents/models.py @@ -6,12 +6,14 @@ from pydantic import BaseModel from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX from danswer.configs.constants import DocumentSource +from danswer.connectors.models import DocumentErrorSummary from danswer.connectors.models import InputType from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import Connector from danswer.db.models import ConnectorCredentialPair from danswer.db.models import Credential from danswer.db.models import IndexAttempt +from danswer.db.models import IndexAttemptError as DbIndexAttemptError from danswer.db.models import IndexingStatus from danswer.db.models import TaskStatus from danswer.server.utils import mask_credential_dict @@ -122,6 +124,7 @@ class IndexAttemptSnapshot(BaseModel): total_docs_indexed: int # includes docs that are updated docs_removed_from_index: int error_msg: str | None + error_count: int full_exception_trace: str | None time_started: str | None time_updated: str @@ -137,6 +140,7 @@ class IndexAttemptSnapshot(BaseModel): total_docs_indexed=index_attempt.total_docs_indexed or 0, docs_removed_from_index=index_attempt.docs_removed_from_index or 0, error_msg=index_attempt.error_msg, + error_count=len(index_attempt.error_rows), full_exception_trace=index_attempt.full_exception_trace, time_started=( index_attempt.time_started.isoformat() @@ -147,6 +151,31 @@ class IndexAttemptSnapshot(BaseModel): ) +class IndexAttemptError(BaseModel): + id: int + index_attempt_id: int | None + batch_number: int | None + doc_summaries: list[DocumentErrorSummary] + error_msg: str | None + traceback: str | None + time_created: str + + @classmethod + def from_db_model(cls, error: DbIndexAttemptError) -> "IndexAttemptError": + doc_summaries = [ + DocumentErrorSummary.from_dict(summary) for summary in error.doc_summaries + ] + return IndexAttemptError( + id=error.id, + index_attempt_id=error.index_attempt_id, + batch_number=error.batch, + doc_summaries=doc_summaries, + error_msg=error.error_msg, + traceback=error.traceback, + time_created=error.time_created.isoformat(), + ) + + class CCPairFullInfo(BaseModel): id: int name: str diff --git a/backend/danswer/utils/timing.py b/backend/danswer/utils/timing.py index 2aa150955..8e240278d 100644 --- a/backend/danswer/utils/timing.py +++ b/backend/danswer/utils/timing.py @@ -29,7 +29,8 @@ def log_function_time( start_time = time.time() user = kwargs.get("user") result = func(*args, **kwargs) - elapsed_time_str = str(time.time() - start_time) + elapsed_time = time.time() - start_time + elapsed_time_str = f"{elapsed_time:.3f}" log_name = func_name or func.__name__ args_str = f" args={args} kwargs={kwargs}" if include_args else "" final_log = f"{log_name}{args_str} took {elapsed_time_str} seconds" diff --git a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx index baa833645..b9861a297 100644 --- a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx +++ b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx @@ -18,7 +18,8 @@ import { PageSelector } from "@/components/PageSelector"; import { localizeAndPrettify } from "@/lib/time"; import { getDocsProcessedPerMinute } from "@/lib/indexAttempt"; import { Modal } from "@/components/Modal"; -import { CheckmarkIcon, CopyIcon } from "@/components/icons/icons"; +import { CheckmarkIcon, CopyIcon, SearchIcon } from "@/components/icons/icons"; +import Link from "next/link"; import ExceptionTraceModal from "@/components/modals/ExceptionTraceModal"; const NUM_IN_PAGE = 8; @@ -50,7 +51,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { Status New Doc Cnt Total Doc Cnt - Error Msg + Error Message @@ -93,9 +94,31 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { {indexAttempt.total_docs_indexed}
- - {indexAttempt.error_msg || "-"} - + {indexAttempt.error_count > 0 && ( + + + +  View Errors + + + )} + + {indexAttempt.status === "success" && ( + + {"-"} + + )} + + {indexAttempt.status === "failed" && + indexAttempt.error_msg && ( + + {indexAttempt.error_msg} + + )} + {indexAttempt.full_exception_trace && (
{ diff --git a/web/src/app/admin/indexing/[id]/IndexAttemptErrorsTable.tsx b/web/src/app/admin/indexing/[id]/IndexAttemptErrorsTable.tsx new file mode 100644 index 000000000..6ee8efef5 --- /dev/null +++ b/web/src/app/admin/indexing/[id]/IndexAttemptErrorsTable.tsx @@ -0,0 +1,189 @@ +"use client"; + +import { Modal } from "@/components/Modal"; +import { PageSelector } from "@/components/PageSelector"; +import { CheckmarkIcon, CopyIcon } from "@/components/icons/icons"; +import { localizeAndPrettify } from "@/lib/time"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeaderCell, + TableRow, + Text, +} from "@tremor/react"; +import { useState } from "react"; +import { IndexAttemptError } from "./types"; + +const NUM_IN_PAGE = 8; + +export function CustomModal({ + isVisible, + onClose, + title, + content, + showCopyButton = false, +}: { + isVisible: boolean; + onClose: () => void; + title: string; + content: string; + showCopyButton?: boolean; +}) { + const [copyClicked, setCopyClicked] = useState(false); + + if (!isVisible) return null; + + return ( + +
+ {showCopyButton && ( +
+ {!copyClicked ? ( +
{ + navigator.clipboard.writeText(content); + setCopyClicked(true); + setTimeout(() => setCopyClicked(false), 2000); + }} + className="flex w-fit cursor-pointer hover:bg-hover-light p-2 border-border border rounded" + > + Copy full content + +
+ ) : ( +
+ Copied to clipboard + +
+ )} +
+ )} +
{content}
+
+
+ ); +} + +export function IndexAttemptErrorsTable({ + indexAttemptErrors, +}: { + indexAttemptErrors: IndexAttemptError[]; +}) { + const [page, setPage] = useState(1); + const [modalData, setModalData] = useState<{ + id: number | null; + title: string; + content: string; + } | null>(null); + const closeModal = () => setModalData(null); + + return ( + <> + {modalData && ( + + )} + + + + + Timestamp + Batch Number + Document Summaries + Error Message + + + + {indexAttemptErrors + .slice(NUM_IN_PAGE * (page - 1), NUM_IN_PAGE * page) + .map((indexAttemptError) => { + return ( + + + {indexAttemptError.time_created + ? localizeAndPrettify(indexAttemptError.time_created) + : "-"} + + {indexAttemptError.batch_number} + + {indexAttemptError.doc_summaries && ( +
+ setModalData({ + id: indexAttemptError.id, + title: "Document Summaries", + content: JSON.stringify( + indexAttemptError.doc_summaries, + null, + 2 + ), + }) + } + className="mt-2 text-link cursor-pointer select-none" + > + View Document Summaries +
+ )} +
+ +
+ + {indexAttemptError.error_msg || "-"} + + {indexAttemptError.traceback && ( +
+ setModalData({ + id: indexAttemptError.id, + title: "Exception Traceback", + content: indexAttemptError.traceback!, + }) + } + className="mt-2 text-link cursor-pointer select-none" + > + View Full Trace +
+ )} +
+
+
+ ); + })} +
+
+ {indexAttemptErrors.length > NUM_IN_PAGE && ( +
+
+ { + setPage(newPage); + window.scrollTo({ + top: 0, + left: 0, + behavior: "smooth", + }); + }} + /> +
+
+ )} + + ); +} diff --git a/web/src/app/admin/indexing/[id]/lib.ts b/web/src/app/admin/indexing/[id]/lib.ts new file mode 100644 index 000000000..f81f95d8c --- /dev/null +++ b/web/src/app/admin/indexing/[id]/lib.ts @@ -0,0 +1,3 @@ +export function buildIndexingErrorsUrl(id: string | number) { + return `/api/manage/admin/indexing-errors/${id}`; +} diff --git a/web/src/app/admin/indexing/[id]/page.tsx b/web/src/app/admin/indexing/[id]/page.tsx new file mode 100644 index 000000000..51fe69454 --- /dev/null +++ b/web/src/app/admin/indexing/[id]/page.tsx @@ -0,0 +1,58 @@ +"use client"; + +import { BackButton } from "@/components/BackButton"; +import { ErrorCallout } from "@/components/ErrorCallout"; +import { ThreeDotsLoader } from "@/components/Loading"; +import { errorHandlingFetcher } from "@/lib/fetcher"; +import { ValidSources } from "@/lib/types"; +import { Title } from "@tremor/react"; +import useSWR from "swr"; +import { IndexAttemptErrorsTable } from "./IndexAttemptErrorsTable"; +import { buildIndexingErrorsUrl } from "./lib"; +import { IndexAttemptError } from "./types"; + +function Main({ id }: { id: number }) { + const { + data: indexAttemptErrors, + isLoading, + error, + } = useSWR( + buildIndexingErrorsUrl(id), + errorHandlingFetcher + ); + + if (isLoading) { + return ; + } + + if (error || !indexAttemptErrors) { + return ( + + ); + } + + return ( + <> + +
+
+ Indexing Errors for Attempt {id} +
+ +
+ + ); +} + +export default function Page({ params }: { params: { id: string } }) { + const id = parseInt(params.id); + + return ( +
+
+
+ ); +} diff --git a/web/src/app/admin/indexing/[id]/types.ts b/web/src/app/admin/indexing/[id]/types.ts new file mode 100644 index 000000000..66480805f --- /dev/null +++ b/web/src/app/admin/indexing/[id]/types.ts @@ -0,0 +1,15 @@ +export interface IndexAttemptError { + id: number; + index_attempt_id: number; + batch_number: number; + doc_summaries: DocumentErrorSummary[]; + error_msg: string; + traceback: string; + time_created: string; +} + +export interface DocumentErrorSummary { + id: string; + semantic_id: string; + section_link: string; +} diff --git a/web/src/components/Status.tsx b/web/src/components/Status.tsx index 99c556aaf..a4ab2090d 100644 --- a/web/src/components/Status.tsx +++ b/web/src/components/Status.tsx @@ -42,6 +42,26 @@ export function IndexAttemptStatus({ } else { badge = icon; } + } else if (status === "completed_with_errors") { + const icon = ( + + Completed with errors + + ); + badge = ( + {icon}
} + popupContent={ +
+ The indexing attempt completed, but some errors were encountered + during the run. +
+
+ Click View Errors for more details. +
+ } + /> + ); } else if (status === "success") { badge = ( diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index df39c4e00..16d3f3bd8 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -38,6 +38,7 @@ export interface MinimalUserSnapshot { export type ValidInputTypes = "load_state" | "poll" | "event"; export type ValidStatuses = | "success" + | "completed_with_errors" | "failed" | "in_progress" | "not_started"; @@ -59,6 +60,7 @@ export interface IndexAttemptSnapshot { docs_removed_from_index: number; total_docs_indexed: number; error_msg: string | null; + error_count: number; full_exception_trace: string | null; time_started: string | null; time_updated: string;