Add ingestion metrics (#256)

This commit is contained in:
Yuhong Sun
2023-07-29 16:37:22 -07:00
committed by GitHub
parent eec4e21bad
commit 87fe6f7575
5 changed files with 72 additions and 25 deletions

View File

@@ -0,0 +1,32 @@
"""Remove Document IDs
Revision ID: d7111c1238cd
Revises: 465f78d9b7f9
Create Date: 2023-07-29 15:06:25.126169
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "d7111c1238cd"
down_revision = "465f78d9b7f9"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.drop_column("index_attempt", "document_ids")
def downgrade() -> None:
op.add_column(
"index_attempt",
sa.Column(
"document_ids",
postgresql.ARRAY(sa.VARCHAR()),
autoincrement=False,
nullable=True,
),
)

View File

@@ -115,6 +115,16 @@ def run_indexing_jobs(db_session: Session) -> None:
f"with config: '{attempt.connector.connector_specific_config}', and " f"with config: '{attempt.connector.connector_specific_config}', and "
f"with credentials: '{attempt.credential_id}'" f"with credentials: '{attempt.credential_id}'"
) )
run_time = time.time()
run_time_str = datetime.utcfromtimestamp(run_time).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Connector Starting UTC Time: {run_time_str}")
# "official" timestamp for this run
# used for setting time bounds when fetching updates from apps and
# is stored in the DB as the last successful run time if this run succeeds
run_dt = datetime.fromtimestamp(run_time, tz=timezone.utc)
mark_attempt_in_progress(attempt, db_session) mark_attempt_in_progress(attempt, db_session)
db_connector = attempt.connector db_connector = attempt.connector
@@ -148,11 +158,6 @@ def run_indexing_jobs(db_session: Session) -> None:
net_doc_change = 0 net_doc_change = 0
try: try:
# "official" timestamp for this run
# used for setting time bounds when fetching updates from apps + is
# stored in the DB as the last successful run time if this run succeeds
run_time = time.time()
run_dt = datetime.fromtimestamp(run_time, tz=timezone.utc)
if task == InputType.LOAD_STATE: if task == InputType.LOAD_STATE:
assert isinstance(runnable_connector, LoadConnector) assert isinstance(runnable_connector, LoadConnector)
doc_batch_generator = runnable_connector.load_from_state() doc_batch_generator = runnable_connector.load_from_state()
@@ -175,17 +180,20 @@ def run_indexing_jobs(db_session: Session) -> None:
# Event types cannot be handled by a background type, leave these untouched # Event types cannot be handled by a background type, leave these untouched
continue continue
document_ids: list[str] = [] document_count = 0
chunk_count = 0
for doc_batch in doc_batch_generator: for doc_batch in doc_batch_generator:
index_user_id = ( index_user_id = (
None if db_credential.public_doc else db_credential.user_id None if db_credential.public_doc else db_credential.user_id
) )
net_doc_change += indexing_pipeline( new_docs, total_batch_chunks = indexing_pipeline(
documents=doc_batch, user_id=index_user_id documents=doc_batch, user_id=index_user_id
) )
document_ids.extend([doc.id for doc in doc_batch]) net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
mark_attempt_succeeded(attempt, document_ids, db_session) mark_attempt_succeeded(attempt, db_session)
update_connector_credential_pair( update_connector_credential_pair(
connector_id=db_connector.id, connector_id=db_connector.id,
credential_id=db_credential.id, credential_id=db_credential.id,
@@ -195,10 +203,18 @@ def run_indexing_jobs(db_session: Session) -> None:
db_session=db_session, db_session=db_session,
) )
logger.info(f"Indexed {len(document_ids)} documents") logger.info(
f"Indexed or updated {document_count} total documents for a total of {chunk_count} chunks"
)
logger.info(
f"Connector successfully finished, elapsed time: {time.time() - run_time} seconds"
)
except Exception as e: except Exception as e:
logger.exception(f"Indexing job with id {attempt.id} failed due to {e}") logger.exception(f"Indexing job with id {attempt.id} failed due to {e}")
logger.info(
f"Failed connector elapsed time: {time.time() - run_time} seconds"
)
mark_attempt_failed(attempt, db_session, failure_reason=str(e)) mark_attempt_failed(attempt, db_session, failure_reason=str(e))
update_connector_credential_pair( update_connector_credential_pair(
connector_id=db_connector.id, connector_id=db_connector.id,
@@ -214,7 +230,8 @@ def update_loop(delay: int = 10) -> None:
engine = get_sqlalchemy_engine() engine = get_sqlalchemy_engine()
while True: while True:
start = time.time() start = time.time()
logger.info(f"Running update, current time: {time.ctime(start)}") start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Running update, current UTC time: {start_time_utc}")
try: try:
with Session(engine, expire_on_commit=False) as db_session: with Session(engine, expire_on_commit=False) as db_session:
create_indexing_jobs(db_session) create_indexing_jobs(db_session)

View File

@@ -18,7 +18,9 @@ logger = setup_logger()
class IndexingPipelineProtocol(Protocol): class IndexingPipelineProtocol(Protocol):
def __call__(self, documents: list[Document], user_id: UUID | None) -> int: def __call__(
self, documents: list[Document], user_id: UUID | None
) -> tuple[int, int]:
... ...
@@ -30,7 +32,10 @@ def _indexing_pipeline(
keyword_index: KeywordIndex, keyword_index: KeywordIndex,
documents: list[Document], documents: list[Document],
user_id: UUID | None, user_id: UUID | None,
) -> int: ) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
memory requirements"""
# TODO: make entire indexing pipeline async to not block the entire process # TODO: make entire indexing pipeline async to not block the entire process
# when running on async endpoints # when running on async endpoints
chunks = list(chain(*[chunker.chunk(document) for document in documents])) chunks = list(chain(*[chunker.chunk(document) for document in documents]))
@@ -42,7 +47,7 @@ def _indexing_pipeline(
logger.warning("Document count change from keyword/vector indices don't align") logger.warning("Document count change from keyword/vector indices don't align")
net_new_docs = max(net_doc_count_keyword, net_doc_count_vector) net_new_docs = max(net_doc_count_keyword, net_doc_count_vector)
logger.info(f"Indexed {net_new_docs} new documents") logger.info(f"Indexed {net_new_docs} new documents")
return net_new_docs return net_new_docs, len(chunks)
def build_indexing_pipeline( def build_indexing_pipeline(
@@ -52,7 +57,7 @@ def build_indexing_pipeline(
vector_index: VectorIndex | None = None, vector_index: VectorIndex | None = None,
keyword_index: KeywordIndex | None = None, keyword_index: KeywordIndex | None = None,
) -> IndexingPipelineProtocol: ) -> IndexingPipelineProtocol:
"""Builds a pipline which takes in a list of docs and indexes them. """Builds a pipline which takes in a list (batch) of docs and indexes them.
Default uses _ chunker, _ embedder, and qdrant for the datastore""" Default uses _ chunker, _ embedder, and qdrant for the datastore"""
if chunker is None: if chunker is None:

View File

@@ -56,11 +56,9 @@ def mark_attempt_in_progress(
def mark_attempt_succeeded( def mark_attempt_succeeded(
index_attempt: IndexAttempt, index_attempt: IndexAttempt,
docs_indexed: list[str],
db_session: Session, db_session: Session,
) -> None: ) -> None:
index_attempt.status = IndexingStatus.SUCCESS index_attempt.status = IndexingStatus.SUCCESS
index_attempt.document_ids = docs_indexed
db_session.add(index_attempt) db_session.add(index_attempt)
db_session.commit() db_session.commit()

View File

@@ -4,6 +4,9 @@ from typing import Any
from typing import List from typing import List
from uuid import UUID from uuid import UUID
from danswer.auth.schemas import UserRole
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
from fastapi_users.db import SQLAlchemyBaseUserTableUUID from fastapi_users.db import SQLAlchemyBaseUserTableUUID
from fastapi_users_db_sqlalchemy.access_token import SQLAlchemyBaseAccessTokenTableUUID from fastapi_users_db_sqlalchemy.access_token import SQLAlchemyBaseAccessTokenTableUUID
@@ -21,10 +24,6 @@ from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from danswer.auth.schemas import UserRole
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
class IndexingStatus(str, PyEnum): class IndexingStatus(str, PyEnum):
NOT_STARTED = "not_started" NOT_STARTED = "not_started"
@@ -161,9 +160,6 @@ class IndexAttempt(Base):
nullable=True, nullable=True,
) )
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus)) status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
document_ids: Mapped[list[str] | None] = mapped_column(
postgresql.ARRAY(String()), default=None
) # only filled if status = "complete"
error_msg: Mapped[str | None] = mapped_column( error_msg: Mapped[str | None] = mapped_column(
String(), default=None String(), default=None
) # only filled if status = "failed" ) # only filled if status = "failed"
@@ -189,7 +185,6 @@ class IndexAttempt(Base):
f"<IndexAttempt(id={self.id!r}, " f"<IndexAttempt(id={self.id!r}, "
f"connector_id={self.connector_id!r}, " f"connector_id={self.connector_id!r}, "
f"status={self.status!r}, " f"status={self.status!r}, "
f"document_ids={self.document_ids!r}, "
f"error_msg={self.error_msg!r})>" f"error_msg={self.error_msg!r})>"
f"time_created={self.time_created!r}, " f"time_created={self.time_created!r}, "
f"time_updated={self.time_updated!r}, " f"time_updated={self.time_updated!r}, "