From 3eb67baf5b1c2f9f4b183b9bc28a226900e1d438 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 23 Oct 2024 13:25:52 -0700 Subject: [PATCH] Bugfix/indexing UI (#2879) * fresh indexing feature branch * cherry pick test * Revert "cherry pick test" This reverts commit 2a624220687affdda3de347e30f2011136f64bda. * set multitenant so that vespa fields match when indexing * cleanup pass * mypy * pass through env var to control celery indexing concurrency * comments on task kickoff and some logging improvements * disentangle configuration for different workers and beats. * use get_session_with_tenant * comment out all of update.py * rename to RedisConnectorIndexingFenceData * first check num_indexing_workers * refactor RedisConnectorIndexingFenceData * comment out on_worker_process_init * missed a file * scope db sessions to short lengths * update launch.json template * fix types * keep index button disabled until indexing is truly finished * change priority order of tooltips * should be using the logger from app_base * if we run out of retries, just mark the doc as modified so it gets synced later * tighten up the logging ... we know these are ID's * add logging --- .../danswer/background/celery/apps/primary.py | 5 +--- .../danswer/background/celery/celery_redis.py | 15 +++++----- .../background/celery/tasks/shared/tasks.py | 28 +++++++++++++++---- backend/danswer/db/document.py | 14 ++++++++++ backend/danswer/server/documents/cc_pair.py | 14 ++++++++-- backend/danswer/server/documents/models.py | 3 ++ backend/danswer/utils/logger.py | 5 ++-- .../connector/[ccPairId]/ReIndexButton.tsx | 10 +++++-- .../app/admin/connector/[ccPairId]/page.tsx | 2 ++ .../app/admin/connector/[ccPairId]/types.ts | 1 + 10 files changed, 73 insertions(+), 24 deletions(-) diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index c99607f4b..58e464f37 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -11,9 +11,9 @@ from celery.signals import celeryd_init from celery.signals import worker_init from celery.signals import worker_ready from celery.signals import worker_shutdown -from celery.utils.log import get_task_logger import danswer.background.celery.apps.app_base as app_base +from danswer.background.celery.apps.app_base import task_logger from danswer.background.celery.celery_redis import RedisConnectorCredentialPair from danswer.background.celery.celery_redis import RedisConnectorDeletion from danswer.background.celery.celery_redis import RedisConnectorIndexing @@ -31,9 +31,6 @@ from danswer.utils.logger import setup_logger logger = setup_logger() -# use this within celery tasks to get celery task specific logging -task_logger = get_task_logger(__name__) - celery_app = Celery(__name__) celery_app.config_from_object("danswer.background.celery.configs.primary") diff --git a/backend/danswer/background/celery/celery_redis.py b/backend/danswer/background/celery/celery_redis.py index f1a5697e2..1ea5e3b17 100644 --- a/backend/danswer/background/celery/celery_redis.py +++ b/backend/danswer/background/celery/celery_redis.py @@ -465,14 +465,8 @@ class RedisConnectorPruning(RedisObjectHelper): return len(async_results) - def is_pruning(self, db_session: Session, redis_client: Redis) -> bool: + def is_pruning(self, redis_client: Redis) -> bool: """A single example of a helper method being refactored into the redis helper""" - cc_pair = get_connector_credential_pair_from_id( - cc_pair_id=int(self._id), db_session=db_session - ) - if not cc_pair: - raise ValueError(f"cc_pair_id {self._id} does not exist.") - if redis_client.exists(self.fence_key): return True @@ -538,6 +532,13 @@ class RedisConnectorIndexing(RedisObjectHelper): ) -> int | None: return None + def is_indexing(self, redis_client: Redis) -> bool: + """A single example of a helper method being refactored into the redis helper""" + if redis_client.exists(self.fence_key): + return True + + return False + def celery_get_queue_length(queue: str, r: Redis) -> int: """This is a redis specific way to get the length of a celery queue. diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 474a749e7..7ce43454a 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -11,6 +11,7 @@ from danswer.db.document import delete_document_by_connector_credential_pair__no from danswer.db.document import delete_documents_complete__no_commit from danswer.db.document import get_document from danswer.db.document import get_document_connector_count +from danswer.db.document import mark_document_as_modified from danswer.db.document import mark_document_as_synced from danswer.db.document_set import fetch_document_sets_for_document from danswer.db.engine import get_session_with_tenant @@ -19,6 +20,8 @@ from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import VespaDocumentFields from danswer.server.documents.models import ConnectorCredentialPairIdentifier +DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3 + class RedisConnectorIndexingFenceData(BaseModel): index_attempt_id: int | None @@ -32,7 +35,7 @@ class RedisConnectorIndexingFenceData(BaseModel): bind=True, soft_time_limit=45, time_limit=60, - max_retries=3, + max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES, ) def document_by_cc_pair_cleanup_task( self: Task, @@ -56,7 +59,7 @@ def document_by_cc_pair_cleanup_task( connector / credential pair from the access list (6) delete all relevant entries from postgres """ - task_logger.info(f"document_id={document_id}") + task_logger.info(f"tenant_id={tenant_id} document_id={document_id}") try: with get_session_with_tenant(tenant_id) as db_session: @@ -122,6 +125,8 @@ def document_by_cc_pair_cleanup_task( else: pass + db_session.commit() + task_logger.info( f"tenant_id={tenant_id} " f"document_id={document_id} " @@ -129,16 +134,27 @@ def document_by_cc_pair_cleanup_task( f"refcount={count} " f"chunks={chunks_affected}" ) - db_session.commit() except SoftTimeLimitExceeded: task_logger.info( f"SoftTimeLimitExceeded exception. tenant_id={tenant_id} doc_id={document_id}" ) + return False except Exception as e: task_logger.exception("Unexpected exception") - # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 - countdown = 2 ** (self.request.retries + 4) - self.retry(exc=e, countdown=countdown) + if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: + # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 + countdown = 2 ** (self.request.retries + 4) + self.retry(exc=e, countdown=countdown) + else: + # This is the last attempt! mark the document as dirty in the db so that it + # eventually gets fixed out of band via stale document reconciliation + task_logger.info( + f"Max retries reached. Marking doc as dirty for reconciliation: " + f"tenant_id={tenant_id} document_id={document_id}" + ) + with get_session_with_tenant(tenant_id): + mark_document_as_modified(document_id, db_session) + return False return True diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index 8aee28aef..2e142a2c0 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -375,6 +375,20 @@ def update_docs_last_modified__no_commit( doc.last_modified = now +def mark_document_as_modified( + document_id: str, + db_session: Session, +) -> None: + stmt = select(DbDocument).where(DbDocument.id == document_id) + doc = db_session.scalar(stmt) + if doc is None: + raise ValueError(f"No document with ID: {document_id}") + + # update last_synced + doc.last_modified = datetime.now(timezone.utc) + db_session.commit() + + def mark_document_as_synced(document_id: str, db_session: Session) -> None: stmt = select(DbDocument).where(DbDocument.id == document_id) doc = db_session.scalar(stmt) diff --git a/backend/danswer/server/documents/cc_pair.py b/backend/danswer/server/documents/cc_pair.py index db35807ad..92a94a638 100644 --- a/backend/danswer/server/documents/cc_pair.py +++ b/backend/danswer/server/documents/cc_pair.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session from danswer.auth.users import current_curator_or_admin_user from danswer.auth.users import current_user +from danswer.background.celery.celery_redis import RedisConnectorIndexing from danswer.background.celery.celery_redis import RedisConnectorPruning from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot from danswer.background.celery.tasks.pruning.tasks import ( @@ -34,6 +35,7 @@ from danswer.db.index_attempt import count_index_attempts_for_connector from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id from danswer.db.models import User +from danswer.db.search_settings import get_current_search_settings from danswer.db.tasks import check_task_is_live_and_not_timed_out from danswer.db.tasks import get_latest_task from danswer.redis.redis_pool import get_redis_client @@ -93,6 +95,8 @@ def get_cc_pair_full_info( user: User | None = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), ) -> CCPairFullInfo: + r = get_redis_client() + cc_pair = get_connector_credential_pair_from_id( cc_pair_id, db_session, user, get_editable=False ) @@ -122,11 +126,16 @@ def get_cc_pair_full_info( latest_attempt = get_latest_index_attempt_for_cc_pair_id( db_session=db_session, - connector_credential_pair_id=cc_pair.id, + connector_credential_pair_id=cc_pair_id, secondary_index=False, only_finished=False, ) + search_settings = get_current_search_settings(db_session) + rci = RedisConnectorIndexing( + cc_pair_id=cc_pair_id, search_settings_id=search_settings.id + ) + return CCPairFullInfo.from_models( cc_pair_model=cc_pair, number_of_index_attempts=count_index_attempts_for_connector( @@ -141,6 +150,7 @@ def get_cc_pair_full_info( ), num_docs_indexed=documents_indexed, is_editable_for_current_user=is_editable_for_current_user, + indexing=rci.is_indexing(r), ) @@ -250,7 +260,7 @@ def prune_cc_pair( r = get_redis_client() rcp = RedisConnectorPruning(cc_pair_id) - if rcp.is_pruning(db_session, r): + if rcp.is_pruning(r): raise HTTPException( status_code=HTTPStatus.CONFLICT, detail="Pruning task already in progress.", diff --git a/backend/danswer/server/documents/models.py b/backend/danswer/server/documents/models.py index 780d8a3f2..fcbc0a76a 100644 --- a/backend/danswer/server/documents/models.py +++ b/backend/danswer/server/documents/models.py @@ -222,6 +222,7 @@ class CCPairFullInfo(BaseModel): access_type: AccessType is_editable_for_current_user: bool deletion_failure_message: str | None + indexing: bool @classmethod def from_models( @@ -232,6 +233,7 @@ class CCPairFullInfo(BaseModel): last_index_attempt: IndexAttempt | None, num_docs_indexed: int, # not ideal, but this must be computed separately is_editable_for_current_user: bool, + indexing: bool, ) -> "CCPairFullInfo": # figure out if we need to artificially deflate the number of docs indexed. # This is required since the total number of docs indexed by a CC Pair is @@ -265,6 +267,7 @@ class CCPairFullInfo(BaseModel): access_type=cc_pair_model.access_type, is_editable_for_current_user=is_editable_for_current_user, deletion_failure_message=cc_pair_model.deletion_failure_message, + indexing=indexing, ) diff --git a/backend/danswer/utils/logger.py b/backend/danswer/utils/logger.py index 065d6282c..2aadbe379 100644 --- a/backend/danswer/utils/logger.py +++ b/backend/danswer/utils/logger.py @@ -61,10 +61,10 @@ class DanswerLoggingAdapter(logging.LoggerAdapter): cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id() if attempt_id is not None: - msg = f"[Attempt ID: {attempt_id}] {msg}" + msg = f"[Attempt: {attempt_id}] {msg}" if cc_pair_id is not None: - msg = f"[CC Pair ID: {cc_pair_id}] {msg}" + msg = f"[CC Pair: {cc_pair_id}] {msg}" # For Slack Bot, logs the channel relevant to the request channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None @@ -185,6 +185,7 @@ def setup_logger( def print_loggers() -> None: + """Print information about all loggers. Use to debug logging issues.""" root_logger = logging.getLogger() loggers: list[logging.Logger | logging.PlaceHolder] = [root_logger] loggers.extend(logging.Logger.manager.loggerDict.values()) diff --git a/web/src/app/admin/connector/[ccPairId]/ReIndexButton.tsx b/web/src/app/admin/connector/[ccPairId]/ReIndexButton.tsx index dced8811a..75bb2eca9 100644 --- a/web/src/app/admin/connector/[ccPairId]/ReIndexButton.tsx +++ b/web/src/app/admin/connector/[ccPairId]/ReIndexButton.tsx @@ -94,12 +94,14 @@ export function ReIndexButton({ connectorId, credentialId, isDisabled, + isIndexing, isDeleting, }: { ccPairId: number; connectorId: number; credentialId: number; isDisabled: boolean; + isIndexing: boolean; isDeleting: boolean; }) { const { popup, setPopup } = usePopup(); @@ -128,9 +130,11 @@ export function ReIndexButton({ tooltip={ isDeleting ? "Cannot index while connector is deleting" - : isDisabled - ? "Connector must be re-enabled before indexing" - : undefined + : isIndexing + ? "Indexing is already in progress" + : isDisabled + ? "Connector must be re-enabled before indexing" + : undefined } > Index diff --git a/web/src/app/admin/connector/[ccPairId]/page.tsx b/web/src/app/admin/connector/[ccPairId]/page.tsx index 9cdf7c83e..e2576cc9b 100644 --- a/web/src/app/admin/connector/[ccPairId]/page.tsx +++ b/web/src/app/admin/connector/[ccPairId]/page.tsx @@ -188,8 +188,10 @@ function Main({ ccPairId }: { ccPairId: number }) { connectorId={ccPair.connector.id} credentialId={ccPair.credential.id} isDisabled={ + ccPair.indexing || ccPair.status === ConnectorCredentialPairStatus.PAUSED } + isIndexing={ccPair.indexing} isDeleting={isDeleting} /> )} diff --git a/web/src/app/admin/connector/[ccPairId]/types.ts b/web/src/app/admin/connector/[ccPairId]/types.ts index 55bbe9557..5e9cec428 100644 --- a/web/src/app/admin/connector/[ccPairId]/types.ts +++ b/web/src/app/admin/connector/[ccPairId]/types.ts @@ -25,6 +25,7 @@ export interface CCPairFullInfo { is_public: boolean; is_editable_for_current_user: boolean; deletion_failure_message: string | null; + indexing: boolean; } export interface PaginatedIndexAttempts {