diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index fa990dc39..1837a7ad1 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -9,6 +9,9 @@ from distributed import LocalCluster from sqlalchemy.orm import Session from danswer.configs.app_configs import NUM_INDEXING_WORKERS +from danswer.configs.model_configs import ( + BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED, +) from danswer.connectors.factory import instantiate_connector from danswer.connectors.interfaces import GenerateDocumentsOutput from danswer.connectors.interfaces import LoadConnector @@ -348,6 +351,18 @@ def _run_indexing_entrypoint(index_attempt_id: int) -> None: """Entrypoint for indexing run when using dask distributed. Wraps the actual logic in a `try` block so that we can catch any exceptions and mark the attempt as failed.""" + import torch + import os + + # force torch to use more cores if available. On VMs pytorch only takes + # advantage of a single core by default + cpu_cores_to_use = max( + (os.cpu_count() or 1) - BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED, + torch.get_num_threads(), + ) + logger.info(f"Setting task to use {cpu_cores_to_use} threads") + torch.set_num_threads(cpu_cores_to_use) + try: # set the indexing attempt ID so that all log messages from this process # will have it added as a prefix diff --git a/backend/danswer/configs/model_configs.py b/backend/danswer/configs/model_configs.py index bccaa7dfb..2a8f5b4d9 100644 --- a/backend/danswer/configs/model_configs.py +++ b/backend/danswer/configs/model_configs.py @@ -30,6 +30,14 @@ ASYM_QUERY_PREFIX = os.environ.get("ASYM_QUERY_PREFIX", "") ASYM_PASSAGE_PREFIX = os.environ.get("ASYM_PASSAGE_PREFIX", "") # Purely an optimization, memory limitation consideration BATCH_SIZE_ENCODE_CHUNKS = 8 +# This controls the number of pytorch "threads" to allocate to the embedding +# model. Specifically, this is computed as `num_cpu_cores - BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED`. +# This is useful for limiting the number of CPU cores that the background job consumes to leave some +# compute for other processes (most importantly the api_server and web_server). +BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED = int( + os.environ.get("BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED") or 1 +) + # Cross Encoder Settings SKIP_RERANKING = os.environ.get("SKIP_RERANKING", "").lower() == "true" diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 02b61aea8..f25550435 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -91,6 +91,7 @@ services: - ASYM_PASSAGE_PREFIX=${ASYM_PASSAGE_PREFIX:-} - SKIP_RERANKING=${SKIP_RERANKING:-} - EDIT_KEYWORD_QUERY=${EDIT_KEYWORD_QUERY:-} + - BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED=${BACKGROUND_JOB_EMBEDDING_MODEL_CPU_CORES_LEFT_UNUSED:-} # Set to debug to get more fine-grained logs - LOG_LEVEL=${LOG_LEVEL:-info} volumes: