Indexing settings and logging improve (#821)

---------

Co-authored-by: Aliaksandr Chernak <aliaksandr_chernak@epam.com>
Co-authored-by: Yuhong Sun <yuhongsun96@gmail.com>
This commit is contained in:
Aliaksandr_С 2023-12-22 10:13:24 +01:00 committed by GitHub
parent 6650f01dc6
commit 0318507911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 5 deletions

View File

@ -13,6 +13,7 @@ from danswer.background.indexing.dask_utils import ResourceLogger
from danswer.background.indexing.job_client import SimpleJob
from danswer.background.indexing.job_client import SimpleJobClient
from danswer.background.indexing.run_indexing import run_indexing_entrypoint
from danswer.configs.app_configs import CLEANUP_INDEXING_JOBS_TIMEOUT
from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import LOG_LEVEL
from danswer.configs.app_configs import MODEL_SERVER_HOST
@ -155,7 +156,8 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None:
def cleanup_indexing_jobs(
existing_jobs: dict[int, Future | SimpleJob]
existing_jobs: dict[int, Future | SimpleJob],
timeout_hours: int = CLEANUP_INDEXING_JOBS_TIMEOUT,
) -> dict[int, Future | SimpleJob]:
existing_jobs_copy = existing_jobs.copy()
@ -203,13 +205,13 @@ def cleanup_indexing_jobs(
)
for index_attempt in in_progress_indexing_attempts:
if index_attempt.id in existing_jobs:
# check to see if the job has been updated in last hour, if not
# check to see if the job has been updated in last n hours, if not
# assume it to frozen in some bad state and just mark it as failed. Note: this relies
# on the fact that the `time_updated` field is constantly updated every
# batch of documents indexed
current_db_time = get_db_current_time(db_session=db_session)
time_since_update = current_db_time - index_attempt.time_updated
if time_since_update.total_seconds() > 60 * 60:
if time_since_update.total_seconds() > 60 * 60 * timeout_hours:
existing_jobs[index_attempt.id].cancel()
_mark_run_failed(
db_session=db_session,

View File

@ -97,7 +97,7 @@ VESPA_DEPLOYMENT_ZIP = (
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
)
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
INDEX_BATCH_SIZE = 16
INDEX_BATCH_SIZE = os.environ.get("INDEX_BATCH_SIZE", 16)
# Below are intended to match the env variables names used by the official postgres docker image
# https://hub.docker.com/_/postgres
@ -173,6 +173,8 @@ ENABLE_MINI_CHUNK = os.environ.get("ENABLE_MINI_CHUNK", "").lower() == "true"
# Slightly larger since the sentence aware split is a max cutoff so most minichunks will be under MINI_CHUNK_SIZE
# tokens. But we need it to be at least as big as 1/4th chunk size to avoid having a tiny mini-chunk at the end
MINI_CHUNK_SIZE = 150
# Timeout to wait for job's last update before killing it, in hours
CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 1))
#####

View File

@ -9,8 +9,11 @@ from danswer.indexing.models import DocAwareChunk
from danswer.indexing.models import IndexChunk
from danswer.search.models import Embedder
from danswer.search.search_nlp_models import EmbeddingModel
from danswer.utils.logger import setup_logger
from danswer.utils.timing import log_function_time
logger = setup_logger()
@log_function_time()
def embed_chunks(
@ -42,7 +45,9 @@ def embed_chunks(
]
embeddings: list[list[float]] = []
for text_batch in text_batches:
len_text_batches = len(text_batches)
for idx, text_batch in enumerate(text_batches, start=1):
logger.debug(f"Embedding text batch {idx} of {len_text_batches}")
# Normalize embeddings is only configured via model_configs.py, be sure to use right value for the set loss
embeddings.extend(embedding_model.encode(text_batch))