Bugfix/indexing UI (#2879)

* fresh indexing feature branch

* cherry pick test

* Revert "cherry pick test"

This reverts commit 2a624220687affdda3de347e30f2011136f64bda.

* set multitenant so that vespa fields match when indexing

* cleanup pass

* mypy

* pass through env var to control celery indexing concurrency

* comments on task kickoff and some logging improvements

* disentangle configuration for different workers and beats.

* use get_session_with_tenant

* comment out all of update.py

* rename to RedisConnectorIndexingFenceData

* first check num_indexing_workers

* refactor RedisConnectorIndexingFenceData

* comment out on_worker_process_init

* missed a file

* scope db sessions to short lengths

* update launch.json template

* fix types

* keep index button disabled until indexing is truly finished

* change priority order of tooltips

* should be using the logger from app_base

* if we run out of retries, just mark the doc as modified so it gets synced later

* tighten up the logging ... we know these are ID's

* add logging
This commit is contained in:
rkuo-danswer 2024-10-23 13:25:52 -07:00 committed by GitHub
parent 8b72264535
commit 3eb67baf5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 73 additions and 24 deletions

View File

@ -11,9 +11,9 @@ from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
from celery.utils.log import get_task_logger
import danswer.background.celery.apps.app_base as app_base
from danswer.background.celery.apps.app_base import task_logger
from danswer.background.celery.celery_redis import RedisConnectorCredentialPair
from danswer.background.celery.celery_redis import RedisConnectorDeletion
from danswer.background.celery.celery_redis import RedisConnectorIndexing
@ -31,9 +31,6 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
# use this within celery tasks to get celery task specific logging
task_logger = get_task_logger(__name__)
celery_app = Celery(__name__)
celery_app.config_from_object("danswer.background.celery.configs.primary")

View File

@ -465,14 +465,8 @@ class RedisConnectorPruning(RedisObjectHelper):
return len(async_results)
def is_pruning(self, db_session: Session, redis_client: Redis) -> bool:
def is_pruning(self, redis_client: Redis) -> bool:
"""A single example of a helper method being refactored into the redis helper"""
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=int(self._id), db_session=db_session
)
if not cc_pair:
raise ValueError(f"cc_pair_id {self._id} does not exist.")
if redis_client.exists(self.fence_key):
return True
@ -538,6 +532,13 @@ class RedisConnectorIndexing(RedisObjectHelper):
) -> int | None:
return None
def is_indexing(self, redis_client: Redis) -> bool:
"""A single example of a helper method being refactored into the redis helper"""
if redis_client.exists(self.fence_key):
return True
return False
def celery_get_queue_length(queue: str, r: Redis) -> int:
"""This is a redis specific way to get the length of a celery queue.

View File

@ -11,6 +11,7 @@ from danswer.db.document import delete_document_by_connector_credential_pair__no
from danswer.db.document import delete_documents_complete__no_commit
from danswer.db.document import get_document
from danswer.db.document import get_document_connector_count
from danswer.db.document import mark_document_as_modified
from danswer.db.document import mark_document_as_synced
from danswer.db.document_set import fetch_document_sets_for_document
from danswer.db.engine import get_session_with_tenant
@ -19,6 +20,8 @@ from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import VespaDocumentFields
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3
class RedisConnectorIndexingFenceData(BaseModel):
index_attempt_id: int | None
@ -32,7 +35,7 @@ class RedisConnectorIndexingFenceData(BaseModel):
bind=True,
soft_time_limit=45,
time_limit=60,
max_retries=3,
max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES,
)
def document_by_cc_pair_cleanup_task(
self: Task,
@ -56,7 +59,7 @@ def document_by_cc_pair_cleanup_task(
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
task_logger.info(f"document_id={document_id}")
task_logger.info(f"tenant_id={tenant_id} document_id={document_id}")
try:
with get_session_with_tenant(tenant_id) as db_session:
@ -122,6 +125,8 @@ def document_by_cc_pair_cleanup_task(
else:
pass
db_session.commit()
task_logger.info(
f"tenant_id={tenant_id} "
f"document_id={document_id} "
@ -129,16 +134,27 @@ def document_by_cc_pair_cleanup_task(
f"refcount={count} "
f"chunks={chunks_affected}"
)
db_session.commit()
except SoftTimeLimitExceeded:
task_logger.info(
f"SoftTimeLimitExceeded exception. tenant_id={tenant_id} doc_id={document_id}"
)
return False
except Exception as e:
task_logger.exception("Unexpected exception")
# Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
countdown = 2 ** (self.request.retries + 4)
self.retry(exc=e, countdown=countdown)
if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
# Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
countdown = 2 ** (self.request.retries + 4)
self.retry(exc=e, countdown=countdown)
else:
# This is the last attempt! mark the document as dirty in the db so that it
# eventually gets fixed out of band via stale document reconciliation
task_logger.info(
f"Max retries reached. Marking doc as dirty for reconciliation: "
f"tenant_id={tenant_id} document_id={document_id}"
)
with get_session_with_tenant(tenant_id):
mark_document_as_modified(document_id, db_session)
return False
return True

View File

@ -375,6 +375,20 @@ def update_docs_last_modified__no_commit(
doc.last_modified = now
def mark_document_as_modified(
document_id: str,
db_session: Session,
) -> None:
stmt = select(DbDocument).where(DbDocument.id == document_id)
doc = db_session.scalar(stmt)
if doc is None:
raise ValueError(f"No document with ID: {document_id}")
# update last_synced
doc.last_modified = datetime.now(timezone.utc)
db_session.commit()
def mark_document_as_synced(document_id: str, db_session: Session) -> None:
stmt = select(DbDocument).where(DbDocument.id == document_id)
doc = db_session.scalar(stmt)

View File

@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
from danswer.auth.users import current_curator_or_admin_user
from danswer.auth.users import current_user
from danswer.background.celery.celery_redis import RedisConnectorIndexing
from danswer.background.celery.celery_redis import RedisConnectorPruning
from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot
from danswer.background.celery.tasks.pruning.tasks import (
@ -34,6 +35,7 @@ from danswer.db.index_attempt import count_index_attempts_for_connector
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id
from danswer.db.models import User
from danswer.db.search_settings import get_current_search_settings
from danswer.db.tasks import check_task_is_live_and_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.redis.redis_pool import get_redis_client
@ -93,6 +95,8 @@ def get_cc_pair_full_info(
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> CCPairFullInfo:
r = get_redis_client()
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id, db_session, user, get_editable=False
)
@ -122,11 +126,16 @@ def get_cc_pair_full_info(
latest_attempt = get_latest_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair.id,
connector_credential_pair_id=cc_pair_id,
secondary_index=False,
only_finished=False,
)
search_settings = get_current_search_settings(db_session)
rci = RedisConnectorIndexing(
cc_pair_id=cc_pair_id, search_settings_id=search_settings.id
)
return CCPairFullInfo.from_models(
cc_pair_model=cc_pair,
number_of_index_attempts=count_index_attempts_for_connector(
@ -141,6 +150,7 @@ def get_cc_pair_full_info(
),
num_docs_indexed=documents_indexed,
is_editable_for_current_user=is_editable_for_current_user,
indexing=rci.is_indexing(r),
)
@ -250,7 +260,7 @@ def prune_cc_pair(
r = get_redis_client()
rcp = RedisConnectorPruning(cc_pair_id)
if rcp.is_pruning(db_session, r):
if rcp.is_pruning(r):
raise HTTPException(
status_code=HTTPStatus.CONFLICT,
detail="Pruning task already in progress.",

View File

@ -222,6 +222,7 @@ class CCPairFullInfo(BaseModel):
access_type: AccessType
is_editable_for_current_user: bool
deletion_failure_message: str | None
indexing: bool
@classmethod
def from_models(
@ -232,6 +233,7 @@ class CCPairFullInfo(BaseModel):
last_index_attempt: IndexAttempt | None,
num_docs_indexed: int, # not ideal, but this must be computed separately
is_editable_for_current_user: bool,
indexing: bool,
) -> "CCPairFullInfo":
# figure out if we need to artificially deflate the number of docs indexed.
# This is required since the total number of docs indexed by a CC Pair is
@ -265,6 +267,7 @@ class CCPairFullInfo(BaseModel):
access_type=cc_pair_model.access_type,
is_editable_for_current_user=is_editable_for_current_user,
deletion_failure_message=cc_pair_model.deletion_failure_message,
indexing=indexing,
)

View File

@ -61,10 +61,10 @@ class DanswerLoggingAdapter(logging.LoggerAdapter):
cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id()
if attempt_id is not None:
msg = f"[Attempt ID: {attempt_id}] {msg}"
msg = f"[Attempt: {attempt_id}] {msg}"
if cc_pair_id is not None:
msg = f"[CC Pair ID: {cc_pair_id}] {msg}"
msg = f"[CC Pair: {cc_pair_id}] {msg}"
# For Slack Bot, logs the channel relevant to the request
channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None
@ -185,6 +185,7 @@ def setup_logger(
def print_loggers() -> None:
"""Print information about all loggers. Use to debug logging issues."""
root_logger = logging.getLogger()
loggers: list[logging.Logger | logging.PlaceHolder] = [root_logger]
loggers.extend(logging.Logger.manager.loggerDict.values())

View File

@ -94,12 +94,14 @@ export function ReIndexButton({
connectorId,
credentialId,
isDisabled,
isIndexing,
isDeleting,
}: {
ccPairId: number;
connectorId: number;
credentialId: number;
isDisabled: boolean;
isIndexing: boolean;
isDeleting: boolean;
}) {
const { popup, setPopup } = usePopup();
@ -128,9 +130,11 @@ export function ReIndexButton({
tooltip={
isDeleting
? "Cannot index while connector is deleting"
: isDisabled
? "Connector must be re-enabled before indexing"
: undefined
: isIndexing
? "Indexing is already in progress"
: isDisabled
? "Connector must be re-enabled before indexing"
: undefined
}
>
Index

View File

@ -188,8 +188,10 @@ function Main({ ccPairId }: { ccPairId: number }) {
connectorId={ccPair.connector.id}
credentialId={ccPair.credential.id}
isDisabled={
ccPair.indexing ||
ccPair.status === ConnectorCredentialPairStatus.PAUSED
}
isIndexing={ccPair.indexing}
isDeleting={isDeleting}
/>
)}

View File

@ -25,6 +25,7 @@ export interface CCPairFullInfo {
is_public: boolean;
is_editable_for_current_user: boolean;
deletion_failure_message: string | null;
indexing: boolean;
}
export interface PaginatedIndexAttempts {