From 37e9ccf864400d11dd7e3faf674b54016a3c4880 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Mon, 16 Oct 2023 20:18:19 -0700 Subject: [PATCH] Make docs indexed cnt more accurate (#579) --- ...dd_index_for_getting_documents_just_by_.py | 35 +++++++++++++++++++ backend/danswer/db/document.py | 32 +++++++++++++++++ backend/danswer/server/manage.py | 28 +++++++++++---- 3 files changed, 88 insertions(+), 7 deletions(-) create mode 100644 backend/alembic/versions/7f99be1cb9f5_add_index_for_getting_documents_just_by_.py diff --git a/backend/alembic/versions/7f99be1cb9f5_add_index_for_getting_documents_just_by_.py b/backend/alembic/versions/7f99be1cb9f5_add_index_for_getting_documents_just_by_.py new file mode 100644 index 000000000..b8ac75418 --- /dev/null +++ b/backend/alembic/versions/7f99be1cb9f5_add_index_for_getting_documents_just_by_.py @@ -0,0 +1,35 @@ +"""Add index for getting documents just by connector id / credential id + +Revision ID: 7f99be1cb9f5 +Revises: 78dbe7e38469 +Create Date: 2023-10-15 22:48:15.487762 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "7f99be1cb9f5" +down_revision = "78dbe7e38469" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index( + op.f( + "ix_document_by_connector_credential_pair_pkey__connector_id__credential_id" + ), + "document_by_connector_credential_pair", + ["connector_id", "credential_id"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + op.f( + "ix_document_by_connector_credential_pair_pkey__connector_id__credential_id" + ), + table_name="document_by_connector_credential_pair", + ) diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index c81a4bd88..baa69f7a8 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -5,6 +5,7 @@ from uuid import UUID from sqlalchemy import and_ from sqlalchemy import delete from sqlalchemy import func +from sqlalchemy import or_ from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session @@ -53,6 +54,37 @@ def get_document_connector_cnts( return db_session.execute(stmt).all() # type: ignore +def get_document_cnts_for_cc_pairs( + db_session: Session, cc_pair_identifiers: list[ConnectorCredentialPairIdentifier] +) -> Sequence[tuple[int, int, int]]: + stmt = ( + select( + DocumentByConnectorCredentialPair.connector_id, + DocumentByConnectorCredentialPair.credential_id, + func.count(), + ) + .where( + or_( + *[ + and_( + DocumentByConnectorCredentialPair.connector_id + == cc_pair_identifier.connector_id, + DocumentByConnectorCredentialPair.credential_id + == cc_pair_identifier.credential_id, + ) + for cc_pair_identifier in cc_pair_identifiers + ] + ) + ) + .group_by( + DocumentByConnectorCredentialPair.connector_id, + DocumentByConnectorCredentialPair.credential_id, + ) + ) + + return db_session.execute(stmt).all() # type: ignore + + def get_acccess_info_for_documents( db_session: Session, document_ids: list[str], diff --git a/backend/danswer/server/manage.py b/backend/danswer/server/manage.py index 03d072341..35aa9a709 100644 --- a/backend/danswer/server/manage.py +++ b/backend/danswer/server/manage.py @@ -50,6 +50,7 @@ from danswer.db.credentials import create_credential from danswer.db.credentials import delete_google_drive_service_account_credentials from danswer.db.credentials import fetch_credential_by_id from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed +from danswer.db.document import get_document_cnts_for_cc_pairs from danswer.db.engine import get_session from danswer.db.feedback import fetch_docs_ranked_by_boost from danswer.db.feedback import update_document_boost @@ -294,20 +295,31 @@ def get_connector_indexing_status( # TODO: make this one query cc_pairs = get_connector_credential_pairs(db_session) + cc_pair_identifiers = [ + ConnectorCredentialPairIdentifier( + connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id + ) + for cc_pair in cc_pairs + ] + latest_index_attempts = get_latest_index_attempts( db_session=db_session, - connector_credential_pair_identifiers=[ - ConnectorCredentialPairIdentifier( - connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id - ) - for cc_pair in cc_pairs - ], + connector_credential_pair_identifiers=cc_pair_identifiers, ) cc_pair_to_latest_index_attempt = { (index_attempt.connector_id, index_attempt.credential_id): index_attempt for index_attempt in latest_index_attempts } + document_count_info = get_document_cnts_for_cc_pairs( + db_session=db_session, + cc_pair_identifiers=cc_pair_identifiers, + ) + cc_pair_to_document_cnt = { + (connector_id, credential_id): cnt + for connector_id, credential_id, cnt in document_count_info + } + for cc_pair in cc_pairs: connector = cc_pair.connector credential = cc_pair.credential @@ -324,7 +336,9 @@ def get_connector_indexing_status( owner=credential.user.email if credential.user else "", last_status=cc_pair.last_attempt_status, last_success=cc_pair.last_successful_index_time, - docs_indexed=cc_pair.total_docs_indexed, + docs_indexed=cc_pair_to_document_cnt.get( + (connector.id, credential.id), 0 + ), error_msg=latest_index_attempt.error_msg if latest_index_attempt else None,