Improve indexing status display (#278)

Adds:
- actual error message in UI for indexing failure
- if a connector is disabled, stops indexing immediately (after the current batch of documents) to allow for deletion
- adds num docs indexed for the current run + a speed
This commit is contained in:
Chris Weaver 2023-08-12 14:49:04 -07:00 committed by GitHub
parent bca63e5a76
commit d5bb10b61f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 314 additions and 20 deletions

View File

@ -0,0 +1,36 @@
"""Add docs_indexed_column + time_started to index_attempt table
Revision ID: 5e84129c8be3
Revises: e6a4bbc13fe4
Create Date: 2023-08-10 21:43:09.069523
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "5e84129c8be3"
down_revision = "e6a4bbc13fe4"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"index_attempt",
sa.Column("num_docs_indexed", sa.Integer()),
)
op.add_column(
"index_attempt",
sa.Column(
"time_started",
sa.DateTime(timezone=True),
nullable=True,
),
)
def downgrade() -> None:
op.drop_column("index_attempt", "time_started")
op.drop_column("index_attempt", "num_docs_indexed")

View File

@ -0,0 +1,32 @@
"""Add index for retrieving latest index_attempt
Revision ID: e6a4bbc13fe4
Revises: b082fec533f0
Create Date: 2023-08-10 12:37:23.335471
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e6a4bbc13fe4"
down_revision = "b082fec533f0"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
"index_attempt",
["connector_id", "credential_id", "time_created"],
unique=False,
)
def downgrade() -> None:
op.drop_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
table_name="index_attempt",
)

View File

@ -88,6 +88,7 @@ def _delete_connector_credential_pair(
return num_docs_deleted
num_docs_deleted = _delete_singly_indexed_docs()
logger.info(f"Deleted {num_docs_deleted} documents from document stores")
def _update_multi_indexed_docs() -> None:
# if a document is indexed by multiple connector_credential_pairs, we should
@ -132,8 +133,8 @@ def _delete_connector_credential_pair(
# categorize into groups of updates to try and batch them more efficiently
update_groups: dict[tuple[str, ...], list[str]] = {}
for document_id, allowed_users_lst in document_id_to_allowed_users.items():
# if document_id in document_ids_not_needing_update:
# continue
if document_id in document_ids_not_needing_update:
continue
allowed_users_lst.remove(to_be_deleted_user)
allowed_users = tuple(sorted(set(allowed_users_lst)))

View File

@ -19,11 +19,12 @@ from danswer.db.engine import get_db_current_time
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_inprogress_index_attempts
from danswer.db.index_attempt import get_last_successful_attempt
from danswer.db.index_attempt import get_last_attempt
from danswer.db.index_attempt import get_not_started_index_attempts
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.index_attempt import mark_attempt_in_progress
from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import Connector
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
@ -45,7 +46,9 @@ def should_create_new_indexing(
def create_indexing_jobs(db_session: Session) -> None:
connectors = fetch_connectors(db_session, disabled_status=False)
connectors = fetch_connectors(db_session)
# clean up in-progress jobs that were never completed
for connector in connectors:
in_progress_indexing_attempts = get_inprogress_index_attempts(
connector.id, db_session
@ -59,7 +62,11 @@ def create_indexing_jobs(db_session: Session) -> None:
f"Marking in-progress attempt 'connector: {attempt.connector_id}, "
f"credential: {attempt.credential_id}' as failed"
)
mark_attempt_failed(attempt, db_session)
mark_attempt_failed(
attempt,
db_session,
failure_reason="Stopped mid run, likely due to the background process being killed",
)
if attempt.connector_id and attempt.credential_id:
update_connector_credential_pair(
connector_id=attempt.connector_id,
@ -70,15 +77,16 @@ def create_indexing_jobs(db_session: Session) -> None:
db_session=db_session,
)
# potentially kick off new runs
enabled_connectors = [
connector for connector in connectors if not connector.disabled
]
for connector in enabled_connectors:
for association in connector.credentials:
credential = association.credential
last_successful_attempt = get_last_successful_attempt(
connector.id, credential.id, db_session
)
if not should_create_new_indexing(
connector, last_successful_attempt, db_session
):
last_attempt = get_last_attempt(connector.id, credential.id, db_session)
if not should_create_new_indexing(connector, last_attempt, db_session):
continue
create_index_attempt(connector.id, credential.id, db_session)
@ -199,6 +207,17 @@ def run_indexing_jobs(db_session: Session) -> None:
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
update_docs_indexed(
db_session=db_session,
index_attempt=attempt,
num_docs_indexed=document_count,
)
# check if connector is disabled mid run and stop if so
db_session.refresh(db_connector)
if db_connector.disabled:
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")
mark_attempt_succeeded(attempt, db_session)
update_connector_credential_pair(

View File

@ -8,6 +8,7 @@ from sqlalchemy.orm import Session
from danswer.db.connector import fetch_connector_by_id
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.server.models import StatusResponse

View File

@ -1,10 +1,17 @@
from collections.abc import Sequence
from sqlalchemy import and_
from sqlalchemy import ColumnElement
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.server.models import ConnectorCredentialPairIdentifier
from danswer.utils.logger import setup_logger
@ -52,6 +59,7 @@ def mark_attempt_in_progress(
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.IN_PROGRESS
index_attempt.time_started = index_attempt.time_started or func.now()
db_session.add(index_attempt)
db_session.commit()
@ -74,7 +82,15 @@ def mark_attempt_failed(
db_session.commit()
def get_last_successful_attempt(
def update_docs_indexed(
db_session: Session, index_attempt: IndexAttempt, num_docs_indexed: int
) -> None:
index_attempt.num_docs_indexed = num_docs_indexed
db_session.add(index_attempt)
db_session.commit()
def get_last_attempt(
connector_id: int,
credential_id: int,
db_session: Session,
@ -82,13 +98,39 @@ def get_last_successful_attempt(
stmt = select(IndexAttempt)
stmt = stmt.where(IndexAttempt.connector_id == connector_id)
stmt = stmt.where(IndexAttempt.credential_id == credential_id)
stmt = stmt.where(IndexAttempt.status == IndexingStatus.SUCCESS)
# Note, the below is using time_created instead of time_updated
stmt = stmt.order_by(desc(IndexAttempt.time_created))
return db_session.execute(stmt).scalars().first()
def get_latest_index_attempts(
connector_credential_pair_identifiers: list[ConnectorCredentialPairIdentifier],
db_session: Session,
) -> Sequence[IndexAttempt]:
ids_stmt = select(
IndexAttempt.id, func.max(IndexAttempt.time_created).label("max_updated_at")
)
where_stmts: list[ColumnElement] = []
for connector_credential_pair_identifier in connector_credential_pair_identifiers:
where_stmts.append(
and_(
IndexAttempt.connector_id
== connector_credential_pair_identifier.connector_id,
IndexAttempt.credential_id
== connector_credential_pair_identifier.credential_id,
)
)
if where_stmts:
ids_stmt = ids_stmt.where(or_(*where_stmts))
ids_stmt = ids_stmt.group_by(IndexAttempt.id)
ids_subqery = ids_stmt.subquery()
stmt = select(IndexAttempt).join(ids_subqery, ids_subqery.c.id == IndexAttempt.id)
return db_session.execute(stmt).scalars().all()
def delete_index_attempts(
connector_id: int,
credential_id: int,

View File

@ -185,6 +185,7 @@ class IndexAttempt(Base):
nullable=True,
)
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
num_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
error_msg: Mapped[str | None] = mapped_column(
String(), default=None
) # only filled if status = "failed"
@ -192,6 +193,11 @@ class IndexAttempt(Base):
DateTime(timezone=True),
server_default=func.now(),
)
# when the actual indexing run began
# NOTE: will use the api_server clock rather than DB server clock
time_started: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), default=None
)
time_updated: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),

View File

@ -51,8 +51,10 @@ from danswer.db.deletion_attempt import get_deletion_attempts
from danswer.db.engine import get_session
from danswer.db.engine import get_sqlalchemy_async_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.direct_qa import check_model_api_key_is_valid
@ -74,6 +76,7 @@ from danswer.server.models import DeletionAttemptSnapshot
from danswer.server.models import FileUploadResponse
from danswer.server.models import GDriveCallback
from danswer.server.models import GoogleAppCredentials
from danswer.server.models import IndexAttemptSnapshot
from danswer.server.models import ObjectCreationIdResponse
from danswer.server.models import RunConnectorRequest
from danswer.server.models import StatusResponse
@ -190,6 +193,20 @@ def get_connector_indexing_status(
# TODO: make this one query
cc_pairs = get_connector_credential_pairs(db_session)
latest_index_attempts = get_latest_index_attempts(
db_session=db_session,
connector_credential_pair_identifiers=[
ConnectorCredentialPairIdentifier(
connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id
)
for cc_pair in cc_pairs
],
)
cc_pair_to_latest_index_attempt = {
(index_attempt.connector_id, index_attempt.credential_id): index_attempt
for index_attempt in latest_index_attempts
}
deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = {
cc_pair.connector.id: [] for cc_pair in cc_pairs
}
@ -205,6 +222,9 @@ def get_connector_indexing_status(
for cc_pair in cc_pairs:
connector = cc_pair.connector
credential = cc_pair.credential
latest_index_attempt = cc_pair_to_latest_index_attempt.get(
(connector.id, credential.id)
)
deletion_attemts = deletion_attempts_by_connector.get(connector.id, [])
indexing_statuses.append(
ConnectorIndexingStatus(
@ -215,6 +235,14 @@ def get_connector_indexing_status(
last_status=cc_pair.last_attempt_status,
last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair.total_docs_indexed,
error_msg=latest_index_attempt.error_msg
if latest_index_attempt
else None,
latest_index_attempt=IndexAttemptSnapshot.from_index_attempt_db_model(
latest_index_attempt
)
if latest_index_attempt
else None,
deletion_attempts=[
DeletionAttemptSnapshot.from_deletion_attempt_db_model(
deletion_attempt

View File

@ -17,6 +17,7 @@ from danswer.db.models import Connector
from danswer.db.models import Credential
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.search.models import QueryFlow
from danswer.search.models import SearchType
@ -124,6 +125,28 @@ class IndexAttemptRequest(BaseModel):
connector_specific_config: dict[str, Any]
class IndexAttemptSnapshot(BaseModel):
status: IndexingStatus | None
num_docs_indexed: int
error_msg: str | None
time_started: str | None
time_updated: str
@classmethod
def from_index_attempt_db_model(
cls, index_attempt: IndexAttempt
) -> "IndexAttemptSnapshot":
return IndexAttemptSnapshot(
status=index_attempt.status,
num_docs_indexed=index_attempt.num_docs_indexed or 0,
error_msg=index_attempt.error_msg,
time_started=index_attempt.time_started.isoformat()
if index_attempt.time_started
else None,
time_updated=index_attempt.time_updated.isoformat(),
)
class DeletionAttemptSnapshot(BaseModel):
connector_id: int
status: DeletionStatus
@ -215,6 +238,8 @@ class ConnectorIndexingStatus(BaseModel):
last_status: IndexingStatus | None
last_success: datetime | None
docs_indexed: int
error_msg: str | None
latest_index_attempt: IndexAttemptSnapshot | None
deletion_attempts: list[DeletionAttemptSnapshot]
is_deletable: bool

View File

@ -5,12 +5,18 @@ import useSWR from "swr";
import { BasicTable } from "@/components/admin/connectors/BasicTable";
import { LoadingAnimation } from "@/components/Loading";
import { timeAgo } from "@/lib/time";
import { NotebookIcon, XSquareIcon } from "@/components/icons/icons";
import {
NotebookIcon,
QuestionIcon,
XSquareIcon,
} from "@/components/icons/icons";
import { fetcher } from "@/lib/fetcher";
import { getSourceMetadata } from "@/components/source";
import { CheckCircle, XCircle } from "@phosphor-icons/react";
import { HealthCheckBanner } from "@/components/health/healthcheck";
import { ConnectorIndexingStatus } from "@/lib/types";
import { useState } from "react";
import { getDocsProcessedPerMinute } from "@/lib/indexAttempt";
const getSourceDisplay = (
connectorIndexingStatus: ConnectorIndexingStatus<any, any>
@ -59,6 +65,33 @@ const getSourceDisplay = (
return sourceMetadata.displayName;
};
const ErrorDisplay = ({ message }: { message: string }) => {
const [isHovered, setIsHovered] = useState(false);
return (
<div
onMouseEnter={() => {
setIsHovered(true);
}}
onMouseLeave={() => setIsHovered(false)}
className="relative"
>
{isHovered && (
<div className="absolute pt-8 top-0 left-0">
<div className="bg-gray-700 px-3 pb-3 pt-2 rounded shadow-lg text-xs">
<div className="text-sm text-red-600 mb-1 flex">Error Message:</div>
{message}
</div>
</div>
)}
<div className="text-red-600 flex cursor-default">
<QuestionIcon className="my-auto mr-1" size={18} />
Error
</div>
</div>
);
};
function Main() {
const {
data: indexAttemptData,
@ -102,7 +135,9 @@ function Main() {
const sourceMetadata = getSourceMetadata(
connectorIndexingStatus.connector.source
);
let statusDisplay = <div className="text-gray-400">In Progress...</div>;
let statusDisplay = (
<div className="text-gray-400">Initializing...</div>
);
if (connectorIndexingStatus.connector.disabled) {
statusDisplay = (
<div className="text-red-600 flex">
@ -119,9 +154,35 @@ function Main() {
);
} else if (connectorIndexingStatus.last_status === "failed") {
statusDisplay = (
<div className="text-red-600 flex">
<XCircle className="my-auto mr-1" size="18" />
Error
<ErrorDisplay message={connectorIndexingStatus.error_msg} />
);
} else if (connectorIndexingStatus.last_status === "not_started") {
statusDisplay = <div className="text-gray-400">Scheduled</div>;
} else if (connectorIndexingStatus.last_status === "in_progress") {
const docsPerMinute = getDocsProcessedPerMinute(
connectorIndexingStatus.latest_index_attempt
)?.toFixed(2);
statusDisplay = (
<div className="text-gray-400">
In Progress...{" "}
{connectorIndexingStatus?.latest_index_attempt
?.num_docs_indexed ? (
<div className="text-xs mt-0.5">
<div>
<i>Current Run:</i>{" "}
{
connectorIndexingStatus.latest_index_attempt
.num_docs_indexed
}{" "}
docs indexed
</div>
{docsPerMinute && (
<div>
<i>Speed:</i> ~{docsPerMinute} docs / min
</div>
)}
</div>
) : null}
</div>
);
}
@ -132,7 +193,7 @@ function Main() {
: "-",
connector: (
<a
className="text-blue-500 flex"
className="text-blue-500 flex w-fit"
href={sourceMetadata.adminPageLink}
>
{sourceMetadata.icon({ size: 20 })}

View File

@ -5,7 +5,6 @@ import { useState } from "react";
import { LinkBreakIcon, LinkIcon, TrashIcon } from "@/components/icons/icons";
import { updateConnector } from "@/lib/connector";
import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable";
import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion";
import { DeleteColumn } from "./DeleteColumn";
interface StatusRowProps<ConnectorConfigType, ConnectorCredentialType> {

View File

@ -12,6 +12,7 @@ import {
Brain,
PencilSimple,
X,
Question,
} from "@phosphor-icons/react";
import { SiBookstack } from "react-icons/si";
import { FaFile, FaGlobe } from "react-icons/fa";
@ -97,6 +98,13 @@ export const InfoIcon = ({
return <Info size={size} className={className} />;
};
export const QuestionIcon = ({
size = 16,
className = defaultTailwindCSS,
}: IconProps) => {
return <Question size={size} className={className} />;
};
export const BrainIcon = ({
size = 16,
className = defaultTailwindCSS,

View File

@ -0,0 +1,26 @@
import { IndexAttemptSnapshot } from "./types";
export const getDocsProcessedPerMinute = (
indexAttempt: IndexAttemptSnapshot | null
): number | null => {
if (
!indexAttempt ||
!indexAttempt.time_started ||
!indexAttempt.time_updated ||
indexAttempt.num_docs_indexed === 0
) {
return null;
}
const timeStarted = new Date(indexAttempt.time_started);
const timeUpdated = new Date(indexAttempt.time_updated);
const timeDiff = timeUpdated.getTime() - timeStarted.getTime();
const seconds = timeDiff / 1000;
// due to some issues with `time_updated` having delayed updates,
// the docs / min will be really high at first. To avoid this,
// we can wait a little bit to let the updated_at catch up a bit
if (seconds < 10) {
return null;
}
return (indexAttempt.num_docs_indexed / seconds) * 60;
};

View File

@ -87,6 +87,14 @@ export interface FileConfig {
export interface NotionConfig {}
export interface IndexAttemptSnapshot {
status: ValidStatuses | null;
num_docs_indexed: number;
error_msg: string | null;
time_started: string | null;
time_updated: string;
}
export interface ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
@ -98,6 +106,8 @@ export interface ConnectorIndexingStatus<
last_status: ValidStatuses | null;
last_success: string | null;
docs_indexed: number;
error_msg: string;
latest_index_attempt: IndexAttemptSnapshot | null;
deletion_attempts: DeletionAttemptSnapshot[];
is_deletable: boolean;
}