diff --git a/backend/alembic/versions/5e84129c8be3_add_docs_indexed_column_to_index_.py b/backend/alembic/versions/5e84129c8be3_add_docs_indexed_column_to_index_.py new file mode 100644 index 000000000..c8ef3f2d9 --- /dev/null +++ b/backend/alembic/versions/5e84129c8be3_add_docs_indexed_column_to_index_.py @@ -0,0 +1,36 @@ +"""Add docs_indexed_column + time_started to index_attempt table + +Revision ID: 5e84129c8be3 +Revises: e6a4bbc13fe4 +Create Date: 2023-08-10 21:43:09.069523 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "5e84129c8be3" +down_revision = "e6a4bbc13fe4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "index_attempt", + sa.Column("num_docs_indexed", sa.Integer()), + ) + op.add_column( + "index_attempt", + sa.Column( + "time_started", + sa.DateTime(timezone=True), + nullable=True, + ), + ) + + +def downgrade() -> None: + op.drop_column("index_attempt", "time_started") + op.drop_column("index_attempt", "num_docs_indexed") diff --git a/backend/alembic/versions/e6a4bbc13fe4_add_index_for_retrieving_latest_index_.py b/backend/alembic/versions/e6a4bbc13fe4_add_index_for_retrieving_latest_index_.py new file mode 100644 index 000000000..2ceef79c0 --- /dev/null +++ b/backend/alembic/versions/e6a4bbc13fe4_add_index_for_retrieving_latest_index_.py @@ -0,0 +1,32 @@ +"""Add index for retrieving latest index_attempt + +Revision ID: e6a4bbc13fe4 +Revises: b082fec533f0 +Create Date: 2023-08-10 12:37:23.335471 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "e6a4bbc13fe4" +down_revision = "b082fec533f0" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index( + op.f("ix_index_attempt_latest_for_connector_credential_pair"), + "index_attempt", + ["connector_id", "credential_id", "time_created"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + op.f("ix_index_attempt_latest_for_connector_credential_pair"), + table_name="index_attempt", + ) diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 214b7fce4..d39d6ba3a 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -88,6 +88,7 @@ def _delete_connector_credential_pair( return num_docs_deleted num_docs_deleted = _delete_singly_indexed_docs() + logger.info(f"Deleted {num_docs_deleted} documents from document stores") def _update_multi_indexed_docs() -> None: # if a document is indexed by multiple connector_credential_pairs, we should @@ -132,8 +133,8 @@ def _delete_connector_credential_pair( # categorize into groups of updates to try and batch them more efficiently update_groups: dict[tuple[str, ...], list[str]] = {} for document_id, allowed_users_lst in document_id_to_allowed_users.items(): - # if document_id in document_ids_not_needing_update: - # continue + if document_id in document_ids_not_needing_update: + continue allowed_users_lst.remove(to_be_deleted_user) allowed_users = tuple(sorted(set(allowed_users_lst))) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 18fb5ba93..9438094d0 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -19,11 +19,12 @@ from danswer.db.engine import get_db_current_time from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import create_index_attempt from danswer.db.index_attempt import get_inprogress_index_attempts -from danswer.db.index_attempt import get_last_successful_attempt +from danswer.db.index_attempt import get_last_attempt from danswer.db.index_attempt import get_not_started_index_attempts from danswer.db.index_attempt import mark_attempt_failed from danswer.db.index_attempt import mark_attempt_in_progress from danswer.db.index_attempt import mark_attempt_succeeded +from danswer.db.index_attempt import update_docs_indexed from danswer.db.models import Connector from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus @@ -45,7 +46,9 @@ def should_create_new_indexing( def create_indexing_jobs(db_session: Session) -> None: - connectors = fetch_connectors(db_session, disabled_status=False) + connectors = fetch_connectors(db_session) + + # clean up in-progress jobs that were never completed for connector in connectors: in_progress_indexing_attempts = get_inprogress_index_attempts( connector.id, db_session @@ -59,7 +62,11 @@ def create_indexing_jobs(db_session: Session) -> None: f"Marking in-progress attempt 'connector: {attempt.connector_id}, " f"credential: {attempt.credential_id}' as failed" ) - mark_attempt_failed(attempt, db_session) + mark_attempt_failed( + attempt, + db_session, + failure_reason="Stopped mid run, likely due to the background process being killed", + ) if attempt.connector_id and attempt.credential_id: update_connector_credential_pair( connector_id=attempt.connector_id, @@ -70,15 +77,16 @@ def create_indexing_jobs(db_session: Session) -> None: db_session=db_session, ) + # potentially kick off new runs + enabled_connectors = [ + connector for connector in connectors if not connector.disabled + ] + for connector in enabled_connectors: for association in connector.credentials: credential = association.credential - last_successful_attempt = get_last_successful_attempt( - connector.id, credential.id, db_session - ) - if not should_create_new_indexing( - connector, last_successful_attempt, db_session - ): + last_attempt = get_last_attempt(connector.id, credential.id, db_session) + if not should_create_new_indexing(connector, last_attempt, db_session): continue create_index_attempt(connector.id, credential.id, db_session) @@ -199,6 +207,17 @@ def run_indexing_jobs(db_session: Session) -> None: net_doc_change += new_docs chunk_count += total_batch_chunks document_count += len(doc_batch) + update_docs_indexed( + db_session=db_session, + index_attempt=attempt, + num_docs_indexed=document_count, + ) + + # check if connector is disabled mid run and stop if so + db_session.refresh(db_connector) + if db_connector.disabled: + # let the `except` block handle this + raise RuntimeError("Connector was disabled mid run") mark_attempt_succeeded(attempt, db_session) update_connector_credential_pair( diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index ca8638294..80c4884f7 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -8,6 +8,7 @@ from sqlalchemy.orm import Session from danswer.db.connector import fetch_connector_by_id from danswer.db.credentials import fetch_credential_by_id from danswer.db.models import ConnectorCredentialPair +from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus from danswer.db.models import User from danswer.server.models import StatusResponse diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index ee3cdc95c..7de9ef504 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -1,10 +1,17 @@ +from collections.abc import Sequence + +from sqlalchemy import and_ +from sqlalchemy import ColumnElement from sqlalchemy import delete from sqlalchemy import desc +from sqlalchemy import func +from sqlalchemy import or_ from sqlalchemy import select from sqlalchemy.orm import Session from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus +from danswer.server.models import ConnectorCredentialPairIdentifier from danswer.utils.logger import setup_logger @@ -52,6 +59,7 @@ def mark_attempt_in_progress( db_session: Session, ) -> None: index_attempt.status = IndexingStatus.IN_PROGRESS + index_attempt.time_started = index_attempt.time_started or func.now() db_session.add(index_attempt) db_session.commit() @@ -74,7 +82,15 @@ def mark_attempt_failed( db_session.commit() -def get_last_successful_attempt( +def update_docs_indexed( + db_session: Session, index_attempt: IndexAttempt, num_docs_indexed: int +) -> None: + index_attempt.num_docs_indexed = num_docs_indexed + db_session.add(index_attempt) + db_session.commit() + + +def get_last_attempt( connector_id: int, credential_id: int, db_session: Session, @@ -82,13 +98,39 @@ def get_last_successful_attempt( stmt = select(IndexAttempt) stmt = stmt.where(IndexAttempt.connector_id == connector_id) stmt = stmt.where(IndexAttempt.credential_id == credential_id) - stmt = stmt.where(IndexAttempt.status == IndexingStatus.SUCCESS) # Note, the below is using time_created instead of time_updated stmt = stmt.order_by(desc(IndexAttempt.time_created)) return db_session.execute(stmt).scalars().first() +def get_latest_index_attempts( + connector_credential_pair_identifiers: list[ConnectorCredentialPairIdentifier], + db_session: Session, +) -> Sequence[IndexAttempt]: + ids_stmt = select( + IndexAttempt.id, func.max(IndexAttempt.time_created).label("max_updated_at") + ) + + where_stmts: list[ColumnElement] = [] + for connector_credential_pair_identifier in connector_credential_pair_identifiers: + where_stmts.append( + and_( + IndexAttempt.connector_id + == connector_credential_pair_identifier.connector_id, + IndexAttempt.credential_id + == connector_credential_pair_identifier.credential_id, + ) + ) + if where_stmts: + ids_stmt = ids_stmt.where(or_(*where_stmts)) + ids_stmt = ids_stmt.group_by(IndexAttempt.id) + ids_subqery = ids_stmt.subquery() + + stmt = select(IndexAttempt).join(ids_subqery, ids_subqery.c.id == IndexAttempt.id) + return db_session.execute(stmt).scalars().all() + + def delete_index_attempts( connector_id: int, credential_id: int, diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index d48446c93..e38f17935 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -185,6 +185,7 @@ class IndexAttempt(Base): nullable=True, ) status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus)) + num_docs_indexed: Mapped[int] = mapped_column(Integer, default=0) error_msg: Mapped[str | None] = mapped_column( String(), default=None ) # only filled if status = "failed" @@ -192,6 +193,11 @@ class IndexAttempt(Base): DateTime(timezone=True), server_default=func.now(), ) + # when the actual indexing run began + # NOTE: will use the api_server clock rather than DB server clock + time_started: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), default=None + ) time_updated: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), diff --git a/backend/danswer/server/manage.py b/backend/danswer/server/manage.py index 181c327d4..9953780dd 100644 --- a/backend/danswer/server/manage.py +++ b/backend/danswer/server/manage.py @@ -51,8 +51,10 @@ from danswer.db.deletion_attempt import get_deletion_attempts from danswer.db.engine import get_session from danswer.db.engine import get_sqlalchemy_async_engine from danswer.db.index_attempt import create_index_attempt +from danswer.db.index_attempt import get_latest_index_attempts from danswer.db.models import DeletionAttempt from danswer.db.models import DeletionStatus +from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus from danswer.db.models import User from danswer.direct_qa import check_model_api_key_is_valid @@ -74,6 +76,7 @@ from danswer.server.models import DeletionAttemptSnapshot from danswer.server.models import FileUploadResponse from danswer.server.models import GDriveCallback from danswer.server.models import GoogleAppCredentials +from danswer.server.models import IndexAttemptSnapshot from danswer.server.models import ObjectCreationIdResponse from danswer.server.models import RunConnectorRequest from danswer.server.models import StatusResponse @@ -190,6 +193,20 @@ def get_connector_indexing_status( # TODO: make this one query cc_pairs = get_connector_credential_pairs(db_session) + latest_index_attempts = get_latest_index_attempts( + db_session=db_session, + connector_credential_pair_identifiers=[ + ConnectorCredentialPairIdentifier( + connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id + ) + for cc_pair in cc_pairs + ], + ) + cc_pair_to_latest_index_attempt = { + (index_attempt.connector_id, index_attempt.credential_id): index_attempt + for index_attempt in latest_index_attempts + } + deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = { cc_pair.connector.id: [] for cc_pair in cc_pairs } @@ -205,6 +222,9 @@ def get_connector_indexing_status( for cc_pair in cc_pairs: connector = cc_pair.connector credential = cc_pair.credential + latest_index_attempt = cc_pair_to_latest_index_attempt.get( + (connector.id, credential.id) + ) deletion_attemts = deletion_attempts_by_connector.get(connector.id, []) indexing_statuses.append( ConnectorIndexingStatus( @@ -215,6 +235,14 @@ def get_connector_indexing_status( last_status=cc_pair.last_attempt_status, last_success=cc_pair.last_successful_index_time, docs_indexed=cc_pair.total_docs_indexed, + error_msg=latest_index_attempt.error_msg + if latest_index_attempt + else None, + latest_index_attempt=IndexAttemptSnapshot.from_index_attempt_db_model( + latest_index_attempt + ) + if latest_index_attempt + else None, deletion_attempts=[ DeletionAttemptSnapshot.from_deletion_attempt_db_model( deletion_attempt diff --git a/backend/danswer/server/models.py b/backend/danswer/server/models.py index a2735053a..87e908f34 100644 --- a/backend/danswer/server/models.py +++ b/backend/danswer/server/models.py @@ -17,6 +17,7 @@ from danswer.db.models import Connector from danswer.db.models import Credential from danswer.db.models import DeletionAttempt from danswer.db.models import DeletionStatus +from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus from danswer.search.models import QueryFlow from danswer.search.models import SearchType @@ -124,6 +125,28 @@ class IndexAttemptRequest(BaseModel): connector_specific_config: dict[str, Any] +class IndexAttemptSnapshot(BaseModel): + status: IndexingStatus | None + num_docs_indexed: int + error_msg: str | None + time_started: str | None + time_updated: str + + @classmethod + def from_index_attempt_db_model( + cls, index_attempt: IndexAttempt + ) -> "IndexAttemptSnapshot": + return IndexAttemptSnapshot( + status=index_attempt.status, + num_docs_indexed=index_attempt.num_docs_indexed or 0, + error_msg=index_attempt.error_msg, + time_started=index_attempt.time_started.isoformat() + if index_attempt.time_started + else None, + time_updated=index_attempt.time_updated.isoformat(), + ) + + class DeletionAttemptSnapshot(BaseModel): connector_id: int status: DeletionStatus @@ -215,6 +238,8 @@ class ConnectorIndexingStatus(BaseModel): last_status: IndexingStatus | None last_success: datetime | None docs_indexed: int + error_msg: str | None + latest_index_attempt: IndexAttemptSnapshot | None deletion_attempts: list[DeletionAttemptSnapshot] is_deletable: bool diff --git a/web/src/app/admin/indexing/status/page.tsx b/web/src/app/admin/indexing/status/page.tsx index 3cac880e9..8165f2c51 100644 --- a/web/src/app/admin/indexing/status/page.tsx +++ b/web/src/app/admin/indexing/status/page.tsx @@ -5,12 +5,18 @@ import useSWR from "swr"; import { BasicTable } from "@/components/admin/connectors/BasicTable"; import { LoadingAnimation } from "@/components/Loading"; import { timeAgo } from "@/lib/time"; -import { NotebookIcon, XSquareIcon } from "@/components/icons/icons"; +import { + NotebookIcon, + QuestionIcon, + XSquareIcon, +} from "@/components/icons/icons"; import { fetcher } from "@/lib/fetcher"; import { getSourceMetadata } from "@/components/source"; import { CheckCircle, XCircle } from "@phosphor-icons/react"; import { HealthCheckBanner } from "@/components/health/healthcheck"; import { ConnectorIndexingStatus } from "@/lib/types"; +import { useState } from "react"; +import { getDocsProcessedPerMinute } from "@/lib/indexAttempt"; const getSourceDisplay = ( connectorIndexingStatus: ConnectorIndexingStatus @@ -59,6 +65,33 @@ const getSourceDisplay = ( return sourceMetadata.displayName; }; +const ErrorDisplay = ({ message }: { message: string }) => { + const [isHovered, setIsHovered] = useState(false); + return ( +
{ + setIsHovered(true); + }} + onMouseLeave={() => setIsHovered(false)} + className="relative" + > + {isHovered && ( +
+
+
Error Message:
+ + {message} +
+
+ )} +
+ + Error +
+
+ ); +}; + function Main() { const { data: indexAttemptData, @@ -102,7 +135,9 @@ function Main() { const sourceMetadata = getSourceMetadata( connectorIndexingStatus.connector.source ); - let statusDisplay =
In Progress...
; + let statusDisplay = ( +
Initializing...
+ ); if (connectorIndexingStatus.connector.disabled) { statusDisplay = (
@@ -119,9 +154,35 @@ function Main() { ); } else if (connectorIndexingStatus.last_status === "failed") { statusDisplay = ( -
- - Error + + ); + } else if (connectorIndexingStatus.last_status === "not_started") { + statusDisplay =
Scheduled
; + } else if (connectorIndexingStatus.last_status === "in_progress") { + const docsPerMinute = getDocsProcessedPerMinute( + connectorIndexingStatus.latest_index_attempt + )?.toFixed(2); + statusDisplay = ( +
+ In Progress...{" "} + {connectorIndexingStatus?.latest_index_attempt + ?.num_docs_indexed ? ( +
+
+ Current Run:{" "} + { + connectorIndexingStatus.latest_index_attempt + .num_docs_indexed + }{" "} + docs indexed +
+ {docsPerMinute && ( +
+ Speed: ~{docsPerMinute} docs / min +
+ )} +
+ ) : null}
); } @@ -132,7 +193,7 @@ function Main() { : "-", connector: ( {sourceMetadata.icon({ size: 20 })} diff --git a/web/src/components/admin/connectors/table/ConnectorsTable.tsx b/web/src/components/admin/connectors/table/ConnectorsTable.tsx index 85be293ee..1b59b1bdb 100644 --- a/web/src/components/admin/connectors/table/ConnectorsTable.tsx +++ b/web/src/components/admin/connectors/table/ConnectorsTable.tsx @@ -5,7 +5,6 @@ import { useState } from "react"; import { LinkBreakIcon, LinkIcon, TrashIcon } from "@/components/icons/icons"; import { updateConnector } from "@/lib/connector"; import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable"; -import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion"; import { DeleteColumn } from "./DeleteColumn"; interface StatusRowProps { diff --git a/web/src/components/icons/icons.tsx b/web/src/components/icons/icons.tsx index aa347efeb..564db4065 100644 --- a/web/src/components/icons/icons.tsx +++ b/web/src/components/icons/icons.tsx @@ -12,6 +12,7 @@ import { Brain, PencilSimple, X, + Question, } from "@phosphor-icons/react"; import { SiBookstack } from "react-icons/si"; import { FaFile, FaGlobe } from "react-icons/fa"; @@ -97,6 +98,13 @@ export const InfoIcon = ({ return ; }; +export const QuestionIcon = ({ + size = 16, + className = defaultTailwindCSS, +}: IconProps) => { + return ; +}; + export const BrainIcon = ({ size = 16, className = defaultTailwindCSS, diff --git a/web/src/lib/indexAttempt.ts b/web/src/lib/indexAttempt.ts new file mode 100644 index 000000000..c5aa6a932 --- /dev/null +++ b/web/src/lib/indexAttempt.ts @@ -0,0 +1,26 @@ +import { IndexAttemptSnapshot } from "./types"; + +export const getDocsProcessedPerMinute = ( + indexAttempt: IndexAttemptSnapshot | null +): number | null => { + if ( + !indexAttempt || + !indexAttempt.time_started || + !indexAttempt.time_updated || + indexAttempt.num_docs_indexed === 0 + ) { + return null; + } + + const timeStarted = new Date(indexAttempt.time_started); + const timeUpdated = new Date(indexAttempt.time_updated); + const timeDiff = timeUpdated.getTime() - timeStarted.getTime(); + const seconds = timeDiff / 1000; + // due to some issues with `time_updated` having delayed updates, + // the docs / min will be really high at first. To avoid this, + // we can wait a little bit to let the updated_at catch up a bit + if (seconds < 10) { + return null; + } + return (indexAttempt.num_docs_indexed / seconds) * 60; +}; diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index bda9a305a..f4216738f 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -87,6 +87,14 @@ export interface FileConfig { export interface NotionConfig {} +export interface IndexAttemptSnapshot { + status: ValidStatuses | null; + num_docs_indexed: number; + error_msg: string | null; + time_started: string | null; + time_updated: string; +} + export interface ConnectorIndexingStatus< ConnectorConfigType, ConnectorCredentialType @@ -98,6 +106,8 @@ export interface ConnectorIndexingStatus< last_status: ValidStatuses | null; last_success: string | null; docs_indexed: number; + error_msg: string; + latest_index_attempt: IndexAttemptSnapshot | null; deletion_attempts: DeletionAttemptSnapshot[]; is_deletable: boolean; }