From 0318507911a0d33cc4561b0f9384d0bfb466b587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aliaksandr=5F=D0=A1?= <30312859+AlexandrSher@users.noreply.github.com> Date: Fri, 22 Dec 2023 10:13:24 +0100 Subject: [PATCH] Indexing settings and logging improve (#821) --------- Co-authored-by: Aliaksandr Chernak Co-authored-by: Yuhong Sun --- backend/danswer/background/update.py | 8 +++++--- backend/danswer/configs/app_configs.py | 4 +++- backend/danswer/indexing/embedder.py | 7 ++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index d3f4e6c5f..15d642cac 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -13,6 +13,7 @@ from danswer.background.indexing.dask_utils import ResourceLogger from danswer.background.indexing.job_client import SimpleJob from danswer.background.indexing.job_client import SimpleJobClient from danswer.background.indexing.run_indexing import run_indexing_entrypoint +from danswer.configs.app_configs import CLEANUP_INDEXING_JOBS_TIMEOUT from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED from danswer.configs.app_configs import LOG_LEVEL from danswer.configs.app_configs import MODEL_SERVER_HOST @@ -155,7 +156,8 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None: def cleanup_indexing_jobs( - existing_jobs: dict[int, Future | SimpleJob] + existing_jobs: dict[int, Future | SimpleJob], + timeout_hours: int = CLEANUP_INDEXING_JOBS_TIMEOUT, ) -> dict[int, Future | SimpleJob]: existing_jobs_copy = existing_jobs.copy() @@ -203,13 +205,13 @@ def cleanup_indexing_jobs( ) for index_attempt in in_progress_indexing_attempts: if index_attempt.id in existing_jobs: - # check to see if the job has been updated in last hour, if not + # check to see if the job has been updated in last n hours, if not # assume it to frozen in some bad state and just mark it as failed. Note: this relies # on the fact that the `time_updated` field is constantly updated every # batch of documents indexed current_db_time = get_db_current_time(db_session=db_session) time_since_update = current_db_time - index_attempt.time_updated - if time_since_update.total_seconds() > 60 * 60: + if time_since_update.total_seconds() > 60 * 60 * timeout_hours: existing_jobs[index_attempt.id].cancel() _mark_run_failed( db_session=db_session, diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 2f18142ff..0d5396336 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -97,7 +97,7 @@ VESPA_DEPLOYMENT_ZIP = ( os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip" ) # Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder) -INDEX_BATCH_SIZE = 16 +INDEX_BATCH_SIZE = os.environ.get("INDEX_BATCH_SIZE", 16) # Below are intended to match the env variables names used by the official postgres docker image # https://hub.docker.com/_/postgres @@ -173,6 +173,8 @@ ENABLE_MINI_CHUNK = os.environ.get("ENABLE_MINI_CHUNK", "").lower() == "true" # Slightly larger since the sentence aware split is a max cutoff so most minichunks will be under MINI_CHUNK_SIZE # tokens. But we need it to be at least as big as 1/4th chunk size to avoid having a tiny mini-chunk at the end MINI_CHUNK_SIZE = 150 +# Timeout to wait for job's last update before killing it, in hours +CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 1)) ##### diff --git a/backend/danswer/indexing/embedder.py b/backend/danswer/indexing/embedder.py index 3504c9be7..3f04a911e 100644 --- a/backend/danswer/indexing/embedder.py +++ b/backend/danswer/indexing/embedder.py @@ -9,8 +9,11 @@ from danswer.indexing.models import DocAwareChunk from danswer.indexing.models import IndexChunk from danswer.search.models import Embedder from danswer.search.search_nlp_models import EmbeddingModel +from danswer.utils.logger import setup_logger from danswer.utils.timing import log_function_time +logger = setup_logger() + @log_function_time() def embed_chunks( @@ -42,7 +45,9 @@ def embed_chunks( ] embeddings: list[list[float]] = [] - for text_batch in text_batches: + len_text_batches = len(text_batches) + for idx, text_batch in enumerate(text_batches, start=1): + logger.debug(f"Embedding text batch {idx} of {len_text_batches}") # Normalize embeddings is only configured via model_configs.py, be sure to use right value for the set loss embeddings.extend(embedding_model.encode(text_batch))