From 87fe6f75750b995eb162c343e2835a2af921fec2 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Sat, 29 Jul 2023 16:37:22 -0700 Subject: [PATCH] Add ingestion metrics (#256) --- .../d7111c1238cd_remove_document_ids.py | 32 +++++++++++++++ backend/danswer/background/update.py | 39 +++++++++++++------ .../danswer/datastores/indexing_pipeline.py | 13 +++++-- backend/danswer/db/index_attempt.py | 2 - backend/danswer/db/models.py | 11 ++---- 5 files changed, 72 insertions(+), 25 deletions(-) create mode 100644 backend/alembic/versions/d7111c1238cd_remove_document_ids.py diff --git a/backend/alembic/versions/d7111c1238cd_remove_document_ids.py b/backend/alembic/versions/d7111c1238cd_remove_document_ids.py new file mode 100644 index 000000000000..6bbb5fa2d0c9 --- /dev/null +++ b/backend/alembic/versions/d7111c1238cd_remove_document_ids.py @@ -0,0 +1,32 @@ +"""Remove Document IDs + +Revision ID: d7111c1238cd +Revises: 465f78d9b7f9 +Create Date: 2023-07-29 15:06:25.126169 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "d7111c1238cd" +down_revision = "465f78d9b7f9" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_column("index_attempt", "document_ids") + + +def downgrade() -> None: + op.add_column( + "index_attempt", + sa.Column( + "document_ids", + postgresql.ARRAY(sa.VARCHAR()), + autoincrement=False, + nullable=True, + ), + ) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 9a3866b48e07..da0e36dd7703 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -115,6 +115,16 @@ def run_indexing_jobs(db_session: Session) -> None: f"with config: '{attempt.connector.connector_specific_config}', and " f"with credentials: '{attempt.credential_id}'" ) + + run_time = time.time() + run_time_str = datetime.utcfromtimestamp(run_time).strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"Connector Starting UTC Time: {run_time_str}") + + # "official" timestamp for this run + # used for setting time bounds when fetching updates from apps and + # is stored in the DB as the last successful run time if this run succeeds + run_dt = datetime.fromtimestamp(run_time, tz=timezone.utc) + mark_attempt_in_progress(attempt, db_session) db_connector = attempt.connector @@ -148,11 +158,6 @@ def run_indexing_jobs(db_session: Session) -> None: net_doc_change = 0 try: - # "official" timestamp for this run - # used for setting time bounds when fetching updates from apps + is - # stored in the DB as the last successful run time if this run succeeds - run_time = time.time() - run_dt = datetime.fromtimestamp(run_time, tz=timezone.utc) if task == InputType.LOAD_STATE: assert isinstance(runnable_connector, LoadConnector) doc_batch_generator = runnable_connector.load_from_state() @@ -175,17 +180,20 @@ def run_indexing_jobs(db_session: Session) -> None: # Event types cannot be handled by a background type, leave these untouched continue - document_ids: list[str] = [] + document_count = 0 + chunk_count = 0 for doc_batch in doc_batch_generator: index_user_id = ( None if db_credential.public_doc else db_credential.user_id ) - net_doc_change += indexing_pipeline( + new_docs, total_batch_chunks = indexing_pipeline( documents=doc_batch, user_id=index_user_id ) - document_ids.extend([doc.id for doc in doc_batch]) + net_doc_change += new_docs + chunk_count += total_batch_chunks + document_count += len(doc_batch) - mark_attempt_succeeded(attempt, document_ids, db_session) + mark_attempt_succeeded(attempt, db_session) update_connector_credential_pair( connector_id=db_connector.id, credential_id=db_credential.id, @@ -195,10 +203,18 @@ def run_indexing_jobs(db_session: Session) -> None: db_session=db_session, ) - logger.info(f"Indexed {len(document_ids)} documents") + logger.info( + f"Indexed or updated {document_count} total documents for a total of {chunk_count} chunks" + ) + logger.info( + f"Connector successfully finished, elapsed time: {time.time() - run_time} seconds" + ) except Exception as e: logger.exception(f"Indexing job with id {attempt.id} failed due to {e}") + logger.info( + f"Failed connector elapsed time: {time.time() - run_time} seconds" + ) mark_attempt_failed(attempt, db_session, failure_reason=str(e)) update_connector_credential_pair( connector_id=db_connector.id, @@ -214,7 +230,8 @@ def update_loop(delay: int = 10) -> None: engine = get_sqlalchemy_engine() while True: start = time.time() - logger.info(f"Running update, current time: {time.ctime(start)}") + start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"Running update, current UTC time: {start_time_utc}") try: with Session(engine, expire_on_commit=False) as db_session: create_indexing_jobs(db_session) diff --git a/backend/danswer/datastores/indexing_pipeline.py b/backend/danswer/datastores/indexing_pipeline.py index 6f2993bdee71..42275db7be73 100644 --- a/backend/danswer/datastores/indexing_pipeline.py +++ b/backend/danswer/datastores/indexing_pipeline.py @@ -18,7 +18,9 @@ logger = setup_logger() class IndexingPipelineProtocol(Protocol): - def __call__(self, documents: list[Document], user_id: UUID | None) -> int: + def __call__( + self, documents: list[Document], user_id: UUID | None + ) -> tuple[int, int]: ... @@ -30,7 +32,10 @@ def _indexing_pipeline( keyword_index: KeywordIndex, documents: list[Document], user_id: UUID | None, -) -> int: +) -> tuple[int, int]: + """Takes different pieces of the indexing pipeline and applies it to a batch of documents + Note that the documents should already be batched at this point so that it does not inflate the + memory requirements""" # TODO: make entire indexing pipeline async to not block the entire process # when running on async endpoints chunks = list(chain(*[chunker.chunk(document) for document in documents])) @@ -42,7 +47,7 @@ def _indexing_pipeline( logger.warning("Document count change from keyword/vector indices don't align") net_new_docs = max(net_doc_count_keyword, net_doc_count_vector) logger.info(f"Indexed {net_new_docs} new documents") - return net_new_docs + return net_new_docs, len(chunks) def build_indexing_pipeline( @@ -52,7 +57,7 @@ def build_indexing_pipeline( vector_index: VectorIndex | None = None, keyword_index: KeywordIndex | None = None, ) -> IndexingPipelineProtocol: - """Builds a pipline which takes in a list of docs and indexes them. + """Builds a pipline which takes in a list (batch) of docs and indexes them. Default uses _ chunker, _ embedder, and qdrant for the datastore""" if chunker is None: diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 673b2546e22d..4001ab5ee133 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -56,11 +56,9 @@ def mark_attempt_in_progress( def mark_attempt_succeeded( index_attempt: IndexAttempt, - docs_indexed: list[str], db_session: Session, ) -> None: index_attempt.status = IndexingStatus.SUCCESS - index_attempt.document_ids = docs_indexed db_session.add(index_attempt) db_session.commit() diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index a619cd7a5d52..d708818ec620 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -4,6 +4,9 @@ from typing import Any from typing import List from uuid import UUID +from danswer.auth.schemas import UserRole +from danswer.configs.constants import DocumentSource +from danswer.connectors.models import InputType from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID from fastapi_users.db import SQLAlchemyBaseUserTableUUID from fastapi_users_db_sqlalchemy.access_token import SQLAlchemyBaseAccessTokenTableUUID @@ -21,10 +24,6 @@ from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship -from danswer.auth.schemas import UserRole -from danswer.configs.constants import DocumentSource -from danswer.connectors.models import InputType - class IndexingStatus(str, PyEnum): NOT_STARTED = "not_started" @@ -161,9 +160,6 @@ class IndexAttempt(Base): nullable=True, ) status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus)) - document_ids: Mapped[list[str] | None] = mapped_column( - postgresql.ARRAY(String()), default=None - ) # only filled if status = "complete" error_msg: Mapped[str | None] = mapped_column( String(), default=None ) # only filled if status = "failed" @@ -189,7 +185,6 @@ class IndexAttempt(Base): f"" f"time_created={self.time_created!r}, " f"time_updated={self.time_updated!r}, "