From 4fe99d05fd15574a2fa7fa629188b733fc1c8f21 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 29 Jan 2025 15:24:44 -0800 Subject: [PATCH] add timings for syncing (#3798) * add timings for syncing * add more logging * more debugging * refactor multipass/db check out of VespaIndex * circular imports? * more debugging * add logs * various improvements * additional logs to narrow down issue * use global httpx pool for the main vespa flows in celery. Use in more places eventually. * cleanup debug logging, etc * remove debug logging * this should use the secondary index * mypy * missed some logging * review fixes * refactor get_default_document_index to use search settings * more missed logging * fix circular refs --------- Co-authored-by: Richard Kuo (Danswer) Co-authored-by: pablodanswer --- .../onyx/background/celery/apps/app_base.py | 3 + backend/onyx/background/celery/apps/light.py | 17 ++- .../onyx/background/celery/celery_utils.py | 26 ++++ .../background/celery/tasks/indexing/tasks.py | 19 ++- .../celery/tasks/monitoring/tasks.py | 6 +- .../background/celery/tasks/shared/tasks.py | 9 +- .../background/celery/tasks/vespa/tasks.py | 9 +- .../onyx/background/indexing/run_indexing.py | 6 +- backend/onyx/chat/process_message.py | 4 +- backend/onyx/context/search/pipeline.py | 5 +- backend/onyx/db/models.py | 28 ++++ backend/onyx/db/search_settings.py | 44 ++++-- .../document_index/document_index_utils.py | 45 ++++++- backend/onyx/document_index/factory.py | 23 +++- .../document_index/vespa/chunk_retrieval.py | 31 ++--- backend/onyx/document_index/vespa/index.py | 126 ++++++++++-------- .../document_index/vespa/indexing_utils.py | 85 ++++++------ backend/onyx/httpx/httpx_pool.py | 57 ++++++++ backend/onyx/indexing/indexing_pipeline.py | 11 +- backend/onyx/indexing/models.py | 4 +- .../starter_message_creation.py | 9 +- backend/onyx/seeding/load_docs.py | 15 ++- backend/onyx/server/documents/cc_pair.py | 4 +- backend/onyx/server/documents/document.py | 10 +- backend/onyx/server/manage/search_settings.py | 8 +- backend/onyx/server/onyx_api/ingestion.py | 17 +-- .../server/query_and_chat/query_backend.py | 5 +- backend/onyx/setup.py | 22 ++- .../scripts/force_delete_connector_by_id.py | 7 +- backend/scripts/orphan_doc_cleanup_script.py | 10 +- .../query_time_check/seed_dummy_docs.py | 9 +- .../query_time_check/test_query_times.py | 9 +- .../tests/integration/common_utils/reset.py | 17 ++- .../tests/unit/onyx/indexing/test_vespa.py | 4 +- 34 files changed, 489 insertions(+), 215 deletions(-) create mode 100644 backend/onyx/httpx/httpx_pool.py diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index cc7440e00..c0661d2f1 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -24,6 +24,7 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine import get_sqlalchemy_engine from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout +from onyx.httpx.httpx_pool import HttpxPool from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_delete import RedisConnectorDelete @@ -316,6 +317,8 @@ def on_worker_ready(sender: Any, **kwargs: Any) -> None: def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: + HttpxPool.close_all() + if not celery_is_worker_primary(sender): return diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index 5802da511..638d1afc7 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -10,6 +10,10 @@ from celery.signals import worker_ready from celery.signals import worker_shutdown import onyx.background.celery.apps.app_base as app_base +from onyx.background.celery.celery_utils import httpx_init_vespa_pool +from onyx.configs.app_configs import MANAGED_VESPA +from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH +from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH from onyx.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME from onyx.db.engine import SqlEngine from onyx.utils.logger import setup_logger @@ -54,12 +58,23 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: @worker_init.connect def on_worker_init(sender: Worker, **kwargs: Any) -> None: + EXTRA_CONCURRENCY = 8 # small extra fudge factor for connection limits + logger.info("worker_init signal received.") logger.info(f"Concurrency: {sender.concurrency}") # type: ignore SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore + + if MANAGED_VESPA: + httpx_init_vespa_pool( + sender.concurrency + EXTRA_CONCURRENCY, # type: ignore + ssl_cert=VESPA_CLOUD_CERT_PATH, + ssl_key=VESPA_CLOUD_KEY_PATH, + ) + else: + httpx_init_vespa_pool(sender.concurrency + EXTRA_CONCURRENCY) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/celery_utils.py b/backend/onyx/background/celery/celery_utils.py index 394dff352..60b3dfacc 100644 --- a/backend/onyx/background/celery/celery_utils.py +++ b/backend/onyx/background/celery/celery_utils.py @@ -1,10 +1,13 @@ from datetime import datetime from datetime import timezone from typing import Any +from typing import cast +import httpx from sqlalchemy.orm import Session from onyx.configs.app_configs import MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE +from onyx.configs.app_configs import VESPA_REQUEST_TIMEOUT from onyx.connectors.cross_connector_utils.rate_limit_wrapper import ( rate_limit_builder, ) @@ -17,6 +20,7 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import TaskStatus from onyx.db.models import TaskQueueState +from onyx.httpx.httpx_pool import HttpxPool from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.redis.redis_connector import RedisConnector from onyx.server.documents.models import DeletionAttemptSnapshot @@ -154,3 +158,25 @@ def celery_is_worker_primary(worker: Any) -> bool: return True return False + + +def httpx_init_vespa_pool( + max_keepalive_connections: int, + timeout: int = VESPA_REQUEST_TIMEOUT, + ssl_cert: str | None = None, + ssl_key: str | None = None, +) -> None: + httpx_cert = None + httpx_verify = False + if ssl_cert and ssl_key: + httpx_cert = cast(tuple[str, str], (ssl_cert, ssl_key)) + httpx_verify = True + + HttpxPool.init_client( + name="vespa", + cert=httpx_cert, + verify=httpx_verify, + timeout=timeout, + http2=False, + limits=httpx.Limits(max_keepalive_connections=max_keepalive_connections), + ) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 94ace6776..2e3c0e3ae 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -15,6 +15,7 @@ from redis import Redis from redis.lock import Lock as RedisLock from onyx.background.celery.apps.app_base import task_logger +from onyx.background.celery.celery_utils import httpx_init_vespa_pool from onyx.background.celery.tasks.indexing.utils import _should_index from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids from onyx.background.celery.tasks.indexing.utils import IndexingCallback @@ -22,6 +23,9 @@ from onyx.background.celery.tasks.indexing.utils import try_creating_indexing_ta from onyx.background.celery.tasks.indexing.utils import validate_indexing_fences from onyx.background.indexing.job_client import SimpleJobClient from onyx.background.indexing.run_indexing import run_indexing_entrypoint +from onyx.configs.app_configs import MANAGED_VESPA +from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH +from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT @@ -37,8 +41,7 @@ from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import get_last_attempt_for_cc_pair from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed -from onyx.db.models import SearchSettings -from onyx.db.search_settings import get_active_search_settings +from onyx.db.search_settings import get_active_search_settings_list from onyx.db.search_settings import get_current_search_settings from onyx.db.swap_index import check_index_swap from onyx.natural_language_processing.search_nlp_models import EmbeddingModel @@ -121,9 +124,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: redis_connector = RedisConnector(tenant_id, cc_pair_id) with get_session_with_tenant(tenant_id) as db_session: - search_settings_list: list[SearchSettings] = get_active_search_settings( - db_session - ) + search_settings_list = get_active_search_settings_list(db_session) for search_settings_instance in search_settings_list: redis_connector_index = redis_connector.new_index( search_settings_instance.id @@ -303,6 +304,14 @@ def connector_indexing_task( attempt_found = False n_final_progress: int | None = None + # 20 is the documented default for httpx max_keepalive_connections + if MANAGED_VESPA: + httpx_init_vespa_pool( + 20, ssl_cert=VESPA_CLOUD_CERT_PATH, ssl_key=VESPA_CLOUD_KEY_PATH + ) + else: + httpx_init_vespa_pool(20) + redis_connector = RedisConnector(tenant_id, cc_pair_id) redis_connector_index = redis_connector.new_index(search_settings_id) diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 80c29d3e2..d1bc14d73 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -34,7 +34,7 @@ from onyx.db.models import DocumentSet from onyx.db.models import IndexAttempt from onyx.db.models import SyncRecord from onyx.db.models import UserGroup -from onyx.db.search_settings import get_active_search_settings +from onyx.db.search_settings import get_active_search_settings_list from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump from onyx.utils.telemetry import optional_telemetry @@ -315,13 +315,13 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me # Get all connector credential pairs cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all() # Might be more than one search setting, or just one - active_search_settings = get_active_search_settings(db_session) + active_search_settings_list = get_active_search_settings_list(db_session) metrics = [] # If you want to process each cc_pair against each search setting: for cc_pair in cc_pairs: - for search_settings in active_search_settings: + for search_settings in active_search_settings_list: recent_attempts = ( db_session.query(IndexAttempt) .filter( diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index 214987fa8..48a21bc97 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -27,9 +27,10 @@ from onyx.db.document import mark_document_as_synced from onyx.db.document_set import fetch_document_sets_for_document from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import get_session_with_tenant -from onyx.document_index.document_index_utils import get_both_index_names +from onyx.db.search_settings import get_active_search_settings from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import VespaDocumentFields +from onyx.httpx.httpx_pool import HttpxPool from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump from onyx.server.documents.models import ConnectorCredentialPairIdentifier @@ -79,9 +80,11 @@ def document_by_cc_pair_cleanup_task( action = "skip" chunks_affected = 0 - curr_ind_name, sec_ind_name = get_both_index_names(db_session) + active_search_settings = get_active_search_settings(db_session) doc_index = get_default_document_index( - primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + active_search_settings.primary, + active_search_settings.secondary, + httpx_client=HttpxPool.get("vespa"), ) retry_index = RetryDocumentIndex(doc_index) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index fa82d21fc..66e920128 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -61,12 +61,13 @@ from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import mark_attempt_failed from onyx.db.models import DocumentSet from onyx.db.models import UserGroup +from onyx.db.search_settings import get_active_search_settings from onyx.db.sync_record import cleanup_sync_records from onyx.db.sync_record import insert_sync_record from onyx.db.sync_record import update_sync_record_status -from onyx.document_index.document_index_utils import get_both_index_names from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import VespaDocumentFields +from onyx.httpx.httpx_pool import HttpxPool from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_delete import RedisConnectorDelete @@ -1096,9 +1097,11 @@ def vespa_metadata_sync_task( try: with get_session_with_tenant(tenant_id) as db_session: - curr_ind_name, sec_ind_name = get_both_index_names(db_session) + active_search_settings = get_active_search_settings(db_session) doc_index = get_default_document_index( - primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + search_settings=active_search_settings.primary, + secondary_search_settings=active_search_settings.secondary, + httpx_client=HttpxPool.get("vespa"), ) retry_index = RetryDocumentIndex(doc_index) diff --git a/backend/onyx/background/indexing/run_indexing.py b/backend/onyx/background/indexing/run_indexing.py index 11bb26b18..94f8cb01b 100644 --- a/backend/onyx/background/indexing/run_indexing.py +++ b/backend/onyx/background/indexing/run_indexing.py @@ -35,6 +35,7 @@ from onyx.db.models import IndexAttempt from onyx.db.models import IndexingStatus from onyx.db.models import IndexModelStatus from onyx.document_index.factory import get_default_document_index +from onyx.httpx.httpx_pool import HttpxPool from onyx.indexing.embedder import DefaultIndexingEmbedder from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.indexing.indexing_pipeline import build_indexing_pipeline @@ -219,9 +220,10 @@ def _run_indexing( callback=callback, ) - # Indexing is only done into one index at a time document_index = get_default_document_index( - primary_index_name=ctx.index_name, secondary_index_name=None + index_attempt_start.search_settings, + None, + httpx_client=HttpxPool.get("vespa"), ) indexing_pipeline = build_indexing_pipeline( diff --git a/backend/onyx/chat/process_message.py b/backend/onyx/chat/process_message.py index 2fb9e9a8b..9478cfdf8 100644 --- a/backend/onyx/chat/process_message.py +++ b/backend/onyx/chat/process_message.py @@ -426,9 +426,7 @@ def stream_chat_message_objects( ) search_settings = get_current_search_settings(db_session) - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, secondary_index_name=None - ) + document_index = get_default_document_index(search_settings, None) # Every chat Session begins with an empty root message root_message = get_or_create_root_message( diff --git a/backend/onyx/context/search/pipeline.py b/backend/onyx/context/search/pipeline.py index c6f8d8cbe..844949881 100644 --- a/backend/onyx/context/search/pipeline.py +++ b/backend/onyx/context/search/pipeline.py @@ -67,10 +67,7 @@ class SearchPipeline: self.rerank_metrics_callback = rerank_metrics_callback self.search_settings = get_current_search_settings(db_session) - self.document_index = get_default_document_index( - primary_index_name=self.search_settings.index_name, - secondary_index_name=None, - ) + self.document_index = get_default_document_index(self.search_settings, None) self.prompt_config: PromptConfig | None = prompt_config # Preprocessing steps generate this diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index de3412dfa..6727a6e8d 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -747,6 +747,34 @@ class SearchSettings(Base): def api_key(self) -> str | None: return self.cloud_provider.api_key if self.cloud_provider is not None else None + @property + def large_chunks_enabled(self) -> bool: + """ + Given multipass usage and an embedder, decides whether large chunks are allowed + based on model/provider constraints. + """ + # Only local models that support a larger context are from Nomic + # Cohere does not support larger contexts (they recommend not going above ~512 tokens) + return SearchSettings.can_use_large_chunks( + self.multipass_indexing, self.model_name, self.provider_type + ) + + @staticmethod + def can_use_large_chunks( + multipass: bool, model_name: str, provider_type: EmbeddingProvider | None + ) -> bool: + """ + Given multipass usage and an embedder, decides whether large chunks are allowed + based on model/provider constraints. + """ + # Only local models that support a larger context are from Nomic + # Cohere does not support larger contexts (they recommend not going above ~512 tokens) + return ( + multipass + and model_name.startswith("nomic-ai") + and provider_type != EmbeddingProvider.COHERE + ) + class IndexAttempt(Base): """ diff --git a/backend/onyx/db/search_settings.py b/backend/onyx/db/search_settings.py index d7324de35..f0497ecbc 100644 --- a/backend/onyx/db/search_settings.py +++ b/backend/onyx/db/search_settings.py @@ -29,9 +29,21 @@ from onyx.utils.logger import setup_logger from shared_configs.configs import PRESERVED_SEARCH_FIELDS from shared_configs.enums import EmbeddingProvider + logger = setup_logger() +class ActiveSearchSettings: + primary: SearchSettings + secondary: SearchSettings | None + + def __init__( + self, primary: SearchSettings, secondary: SearchSettings | None + ) -> None: + self.primary = primary + self.secondary = secondary + + def create_search_settings( search_settings: SavedSearchSettings, db_session: Session, @@ -143,21 +155,27 @@ def get_secondary_search_settings(db_session: Session) -> SearchSettings | None: return latest_settings -def get_active_search_settings(db_session: Session) -> list[SearchSettings]: - """Returns active search settings. The first entry will always be the current search - settings. If there are new search settings that are being migrated to, those will be - the second entry.""" +def get_active_search_settings(db_session: Session) -> ActiveSearchSettings: + """Returns active search settings. Secondary search settings may be None.""" + + # Get the primary and secondary search settings + primary_search_settings = get_current_search_settings(db_session) + secondary_search_settings = get_secondary_search_settings(db_session) + return ActiveSearchSettings( + primary=primary_search_settings, secondary=secondary_search_settings + ) + + +def get_active_search_settings_list(db_session: Session) -> list[SearchSettings]: + """Returns active search settings as a list. Primary settings are the first element, + and if secondary search settings exist, they will be the second element.""" + search_settings_list: list[SearchSettings] = [] - # Get the primary search settings - primary_search_settings = get_current_search_settings(db_session) - search_settings_list.append(primary_search_settings) - - # Check for secondary search settings - secondary_search_settings = get_secondary_search_settings(db_session) - if secondary_search_settings is not None: - # If secondary settings exist, add them to the list - search_settings_list.append(secondary_search_settings) + active_search_settings = get_active_search_settings(db_session) + search_settings_list.append(active_search_settings.primary) + if active_search_settings.secondary: + search_settings_list.append(active_search_settings.secondary) return search_settings_list diff --git a/backend/onyx/document_index/document_index_utils.py b/backend/onyx/document_index/document_index_utils.py index 831ffb6d4..ae79c73d8 100644 --- a/backend/onyx/document_index/document_index_utils.py +++ b/backend/onyx/document_index/document_index_utils.py @@ -4,24 +4,63 @@ from uuid import UUID from sqlalchemy.orm import Session +from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING +from onyx.db.models import SearchSettings from onyx.db.search_settings import get_current_search_settings from onyx.db.search_settings import get_secondary_search_settings from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo from onyx.indexing.models import DocMetadataAwareIndexChunk +from onyx.indexing.models import MultipassConfig from shared_configs.configs import MULTI_TENANT DEFAULT_BATCH_SIZE = 30 DEFAULT_INDEX_NAME = "danswer_chunk" -def get_both_index_names(db_session: Session) -> tuple[str, str | None]: +def should_use_multipass(search_settings: SearchSettings | None) -> bool: + """ + Determines whether multipass should be used based on the search settings + or the default config if settings are unavailable. + """ + if search_settings is not None: + return search_settings.multipass_indexing + return ENABLE_MULTIPASS_INDEXING + + +def get_multipass_config(search_settings: SearchSettings) -> MultipassConfig: + """ + Determines whether to enable multipass and large chunks by examining + the current search settings and the embedder configuration. + """ + if not search_settings: + return MultipassConfig(multipass_indexing=False, enable_large_chunks=False) + + multipass = should_use_multipass(search_settings) + enable_large_chunks = SearchSettings.can_use_large_chunks( + multipass, search_settings.model_name, search_settings.provider_type + ) + return MultipassConfig( + multipass_indexing=multipass, enable_large_chunks=enable_large_chunks + ) + + +def get_both_index_properties( + db_session: Session, +) -> tuple[str, str | None, bool, bool | None]: search_settings = get_current_search_settings(db_session) + config_1 = get_multipass_config(search_settings) search_settings_new = get_secondary_search_settings(db_session) if not search_settings_new: - return search_settings.index_name, None + return search_settings.index_name, None, config_1.enable_large_chunks, None - return search_settings.index_name, search_settings_new.index_name + config_2 = get_multipass_config(search_settings) + return ( + search_settings.index_name, + search_settings_new.index_name, + config_1.enable_large_chunks, + config_2.enable_large_chunks, + ) def translate_boost_count_to_multiplier(boost: int) -> float: diff --git a/backend/onyx/document_index/factory.py b/backend/onyx/document_index/factory.py index 1949daf75..6068b7618 100644 --- a/backend/onyx/document_index/factory.py +++ b/backend/onyx/document_index/factory.py @@ -1,5 +1,7 @@ +import httpx from sqlalchemy.orm import Session +from onyx.db.models import SearchSettings from onyx.db.search_settings import get_current_search_settings from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.vespa.index import VespaIndex @@ -7,17 +9,28 @@ from shared_configs.configs import MULTI_TENANT def get_default_document_index( - primary_index_name: str, - secondary_index_name: str | None, + search_settings: SearchSettings, + secondary_search_settings: SearchSettings | None, + httpx_client: httpx.Client | None = None, ) -> DocumentIndex: """Primary index is the index that is used for querying/updating etc. Secondary index is for when both the currently used index and the upcoming index both need to be updated, updates are applied to both indices""" + + secondary_index_name: str | None = None + secondary_large_chunks_enabled: bool | None = None + if secondary_search_settings: + secondary_index_name = secondary_search_settings.index_name + secondary_large_chunks_enabled = secondary_search_settings.large_chunks_enabled + # Currently only supporting Vespa return VespaIndex( - index_name=primary_index_name, + index_name=search_settings.index_name, secondary_index_name=secondary_index_name, + large_chunks_enabled=search_settings.large_chunks_enabled, + secondary_large_chunks_enabled=secondary_large_chunks_enabled, multitenant=MULTI_TENANT, + httpx_client=httpx_client, ) @@ -27,6 +40,6 @@ def get_current_primary_default_document_index(db_session: Session) -> DocumentI """ search_settings = get_current_search_settings(db_session) return get_default_document_index( - primary_index_name=search_settings.index_name, - secondary_index_name=None, + search_settings, + None, ) diff --git a/backend/onyx/document_index/vespa/chunk_retrieval.py b/backend/onyx/document_index/vespa/chunk_retrieval.py index c5b34950e..37225b452 100644 --- a/backend/onyx/document_index/vespa/chunk_retrieval.py +++ b/backend/onyx/document_index/vespa/chunk_retrieval.py @@ -231,21 +231,22 @@ def _get_chunks_via_visit_api( return document_chunks -@retry(tries=10, delay=1, backoff=2) -def get_all_vespa_ids_for_document_id( - document_id: str, - index_name: str, - filters: IndexFilters | None = None, - get_large_chunks: bool = False, -) -> list[str]: - document_chunks = _get_chunks_via_visit_api( - chunk_request=VespaChunkRequest(document_id=document_id), - index_name=index_name, - filters=filters or IndexFilters(access_control_list=None), - field_names=[DOCUMENT_ID], - get_large_chunks=get_large_chunks, - ) - return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks] +# TODO(rkuo): candidate for removal if not being used +# @retry(tries=10, delay=1, backoff=2) +# def get_all_vespa_ids_for_document_id( +# document_id: str, +# index_name: str, +# filters: IndexFilters | None = None, +# get_large_chunks: bool = False, +# ) -> list[str]: +# document_chunks = _get_chunks_via_visit_api( +# chunk_request=VespaChunkRequest(document_id=document_id), +# index_name=index_name, +# filters=filters or IndexFilters(access_control_list=None), +# field_names=[DOCUMENT_ID], +# get_large_chunks=get_large_chunks, +# ) +# return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks] def parallel_visit_api_retrieval( diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index f5c4fc113..6dec87229 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -25,7 +25,6 @@ from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS from onyx.configs.constants import KV_REINDEX_KEY from onyx.context.search.models import IndexFilters from onyx.context.search.models import InferenceChunkUncleaned -from onyx.db.engine import get_session_with_tenant from onyx.document_index.document_index_utils import get_document_chunk_ids from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentInsertionRecord @@ -41,12 +40,12 @@ from onyx.document_index.vespa.chunk_retrieval import ( ) from onyx.document_index.vespa.chunk_retrieval import query_vespa from onyx.document_index.vespa.deletion import delete_vespa_chunks +from onyx.document_index.vespa.indexing_utils import BaseHTTPXClientContext from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy -from onyx.document_index.vespa.indexing_utils import ( - get_multipass_config, -) +from onyx.document_index.vespa.indexing_utils import GlobalHTTPXClientContext +from onyx.document_index.vespa.indexing_utils import TemporaryHTTPXClientContext from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, @@ -132,12 +131,34 @@ class VespaIndex(DocumentIndex): self, index_name: str, secondary_index_name: str | None, + large_chunks_enabled: bool, + secondary_large_chunks_enabled: bool | None, multitenant: bool = False, + httpx_client: httpx.Client | None = None, ) -> None: self.index_name = index_name self.secondary_index_name = secondary_index_name + + self.large_chunks_enabled = large_chunks_enabled + self.secondary_large_chunks_enabled = secondary_large_chunks_enabled + self.multitenant = multitenant - self.http_client = get_vespa_http_client() + + self.httpx_client_context: BaseHTTPXClientContext + + if httpx_client: + self.httpx_client_context = GlobalHTTPXClientContext(httpx_client) + else: + self.httpx_client_context = TemporaryHTTPXClientContext( + get_vespa_http_client + ) + + self.index_to_large_chunks_enabled: dict[str, bool] = {} + self.index_to_large_chunks_enabled[index_name] = large_chunks_enabled + if secondary_index_name and secondary_large_chunks_enabled: + self.index_to_large_chunks_enabled[ + secondary_index_name + ] = secondary_large_chunks_enabled def ensure_indices_exist( self, @@ -331,7 +352,7 @@ class VespaIndex(DocumentIndex): # indexing / updates / deletes since we have to make a large volume of requests. with ( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, - get_vespa_http_client() as http_client, + self.httpx_client_context as http_client, ): # We require the start and end index for each document in order to # know precisely which chunks to delete. This information exists for @@ -390,9 +411,11 @@ class VespaIndex(DocumentIndex): for doc_id in all_doc_ids } - @staticmethod + @classmethod def _apply_updates_batched( + cls, updates: list[_VespaUpdateRequest], + httpx_client: httpx.Client, batch_size: int = BATCH_SIZE, ) -> None: """Runs a batch of updates in parallel via the ThreadPoolExecutor.""" @@ -414,7 +437,7 @@ class VespaIndex(DocumentIndex): with ( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, - get_vespa_http_client() as http_client, + httpx_client as http_client, ): for update_batch in batch_generator(updates, batch_size): future_to_document_id = { @@ -455,7 +478,7 @@ class VespaIndex(DocumentIndex): index_names.append(self.secondary_index_name) chunk_id_start_time = time.monotonic() - with get_vespa_http_client() as http_client: + with self.httpx_client_context as http_client: for update_request in update_requests: for doc_info in update_request.minimal_document_indexing_info: for index_name in index_names: @@ -511,7 +534,8 @@ class VespaIndex(DocumentIndex): ) ) - self._apply_updates_batched(processed_updates_requests) + with self.httpx_client_context as httpx_client: + self._apply_updates_batched(processed_updates_requests, httpx_client) logger.debug( "Finished updating Vespa documents in %.2f seconds", time.monotonic() - update_start, @@ -523,6 +547,7 @@ class VespaIndex(DocumentIndex): index_name: str, fields: VespaDocumentFields, doc_id: str, + http_client: httpx.Client, ) -> None: """ Update a single "chunk" (document) in Vespa using its chunk ID. @@ -554,18 +579,17 @@ class VespaIndex(DocumentIndex): vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true" - with get_vespa_http_client(http2=False) as http_client: - try: - resp = http_client.put( - vespa_url, - headers={"Content-Type": "application/json"}, - json=update_dict, - ) - resp.raise_for_status() - except httpx.HTTPStatusError as e: - error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}" - logger.error(error_message) - raise + try: + resp = http_client.put( + vespa_url, + headers={"Content-Type": "application/json"}, + json=update_dict, + ) + resp.raise_for_status() + except httpx.HTTPStatusError as e: + error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}" + logger.error(error_message) + raise def update_single( self, @@ -579,24 +603,16 @@ class VespaIndex(DocumentIndex): function will complete with no errors or exceptions. Handle other exceptions if you wish to implement retry behavior """ - doc_chunk_count = 0 - index_names = [self.index_name] - if self.secondary_index_name: - index_names.append(self.secondary_index_name) - - with get_vespa_http_client(http2=False) as http_client: - for index_name in index_names: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: - multipass_config = get_multipass_config( - db_session=db_session, - primary_index=index_name == self.index_name, - ) - large_chunks_enabled = multipass_config.enable_large_chunks + with self.httpx_client_context as httpx_client: + for ( + index_name, + large_chunks_enabled, + ) in self.index_to_large_chunks_enabled.items(): enriched_doc_infos = VespaIndex.enrich_basic_chunk_info( index_name=index_name, - http_client=http_client, + http_client=httpx_client, document_id=doc_id, previous_chunk_count=chunk_count, new_chunk_count=0, @@ -612,10 +628,7 @@ class VespaIndex(DocumentIndex): for doc_chunk_id in doc_chunk_ids: self.update_single_chunk( - doc_chunk_id=doc_chunk_id, - index_name=index_name, - fields=fields, - doc_id=doc_id, + doc_chunk_id, index_name, fields, doc_id, httpx_client ) return doc_chunk_count @@ -637,19 +650,13 @@ class VespaIndex(DocumentIndex): if self.secondary_index_name: index_names.append(self.secondary_index_name) - with get_vespa_http_client( - http2=False - ) as http_client, concurrent.futures.ThreadPoolExecutor( + with self.httpx_client_context as http_client, concurrent.futures.ThreadPoolExecutor( max_workers=NUM_THREADS ) as executor: - for index_name in index_names: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: - multipass_config = get_multipass_config( - db_session=db_session, - primary_index=index_name == self.index_name, - ) - large_chunks_enabled = multipass_config.enable_large_chunks - + for ( + index_name, + large_chunks_enabled, + ) in self.index_to_large_chunks_enabled.items(): enriched_doc_infos = VespaIndex.enrich_basic_chunk_info( index_name=index_name, http_client=http_client, @@ -818,6 +825,9 @@ class VespaIndex(DocumentIndex): """ Deletes all entries in the specified index with the given tenant_id. + Currently unused, but we anticipate this being useful. The entire flow does not + use the httpx connection pool of an instance. + Parameters: tenant_id (str): The tenant ID whose documents are to be deleted. index_name (str): The name of the index from which to delete documents. @@ -850,6 +860,8 @@ class VespaIndex(DocumentIndex): """ Retrieves all document IDs with the specified tenant_id, handling pagination. + Internal helper function for delete_entries_by_tenant_id. + Parameters: tenant_id (str): The tenant ID to search for. index_name (str): The name of the index to search in. @@ -882,8 +894,8 @@ class VespaIndex(DocumentIndex): f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}" ) - with get_vespa_http_client(no_timeout=True) as http_client: - response = http_client.get(url, params=query_params) + with get_vespa_http_client() as http_client: + response = http_client.get(url, params=query_params, timeout=None) response.raise_for_status() search_result = response.json() @@ -913,6 +925,11 @@ class VespaIndex(DocumentIndex): """ Deletes documents in batches using multiple threads. + Internal helper function for delete_entries_by_tenant_id. + + This is a class method and does not use the httpx pool of the instance. + This is OK because we don't use this method often. + Parameters: delete_requests (List[_VespaDeleteRequest]): The list of delete requests. batch_size (int): The number of documents to delete in each batch. @@ -925,13 +942,14 @@ class VespaIndex(DocumentIndex): response = http_client.delete( delete_request.url, headers={"Content-Type": "application/json"}, + timeout=None, ) response.raise_for_status() logger.debug(f"Starting batch deletion for {len(delete_requests)} documents") with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: - with get_vespa_http_client(no_timeout=True) as http_client: + with get_vespa_http_client() as http_client: for batch_start in range(0, len(delete_requests), batch_size): batch = delete_requests[batch_start : batch_start + batch_size] diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index ed802ada9..f37ebb66e 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -1,21 +1,19 @@ import concurrent.futures import json import uuid +from abc import ABC +from abc import abstractmethod +from collections.abc import Callable from datetime import datetime from datetime import timezone from http import HTTPStatus import httpx from retry import retry -from sqlalchemy.orm import Session -from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, ) -from onyx.db.models import SearchSettings -from onyx.db.search_settings import get_current_search_settings -from onyx.db.search_settings import get_secondary_search_settings from onyx.document_index.document_index_utils import get_uuid_from_chunk from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old from onyx.document_index.interfaces import MinimalDocumentIndexingInfo @@ -50,10 +48,9 @@ from onyx.document_index.vespa_constants import TENANT_ID from onyx.document_index.vespa_constants import TITLE from onyx.document_index.vespa_constants import TITLE_EMBEDDING from onyx.indexing.models import DocMetadataAwareIndexChunk -from onyx.indexing.models import EmbeddingProvider -from onyx.indexing.models import MultipassConfig from onyx.utils.logger import setup_logger + logger = setup_logger() @@ -275,46 +272,42 @@ def check_for_final_chunk_existence( index += 1 -def should_use_multipass(search_settings: SearchSettings | None) -> bool: - """ - Determines whether multipass should be used based on the search settings - or the default config if settings are unavailable. - """ - if search_settings is not None: - return search_settings.multipass_indexing - return ENABLE_MULTIPASS_INDEXING +class BaseHTTPXClientContext(ABC): + """Abstract base class for an HTTPX client context manager.""" + + @abstractmethod + def __enter__(self) -> httpx.Client: + pass + + @abstractmethod + def __exit__(self, exc_type, exc_value, traceback): # type: ignore + pass -def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool: - """ - Given multipass usage and an embedder, decides whether large chunks are allowed - based on model/provider constraints. - """ - # Only local models that support a larger context are from Nomic - # Cohere does not support larger contexts (they recommend not going above ~512 tokens) - return ( - multipass - and search_settings.model_name.startswith("nomic-ai") - and search_settings.provider_type != EmbeddingProvider.COHERE - ) +class GlobalHTTPXClientContext(BaseHTTPXClientContext): + """Context manager for a global HTTPX client that does not close it.""" + + def __init__(self, client: httpx.Client): + self._client = client + + def __enter__(self) -> httpx.Client: + return self._client # Reuse the global client + + def __exit__(self, exc_type, exc_value, traceback): # type: ignore + pass # Do nothing; don't close the global client -def get_multipass_config( - db_session: Session, primary_index: bool = True -) -> MultipassConfig: - """ - Determines whether to enable multipass and large chunks by examining - the current search settings and the embedder configuration. - """ - search_settings = ( - get_current_search_settings(db_session) - if primary_index - else get_secondary_search_settings(db_session) - ) - multipass = should_use_multipass(search_settings) - if not search_settings: - return MultipassConfig(multipass_indexing=False, enable_large_chunks=False) - enable_large_chunks = can_use_large_chunks(multipass, search_settings) - return MultipassConfig( - multipass_indexing=multipass, enable_large_chunks=enable_large_chunks - ) +class TemporaryHTTPXClientContext(BaseHTTPXClientContext): + """Context manager for a temporary HTTPX client that closes it after use.""" + + def __init__(self, client_factory: Callable[[], httpx.Client]): + self._client_factory = client_factory + self._client: httpx.Client | None = None # Client will be created in __enter__ + + def __enter__(self) -> httpx.Client: + self._client = self._client_factory() # Create a new client + return self._client + + def __exit__(self, exc_type, exc_value, traceback): # type: ignore + if self._client: + self._client.close() diff --git a/backend/onyx/httpx/httpx_pool.py b/backend/onyx/httpx/httpx_pool.py new file mode 100644 index 000000000..d6fe881e3 --- /dev/null +++ b/backend/onyx/httpx/httpx_pool.py @@ -0,0 +1,57 @@ +import threading +from typing import Any + +import httpx + + +class HttpxPool: + """Class to manage a global httpx Client instance""" + + _clients: dict[str, httpx.Client] = {} + _lock: threading.Lock = threading.Lock() + + # Default parameters for creation + DEFAULT_KWARGS = { + "http2": True, + "limits": lambda: httpx.Limits(), + } + + def __init__(self) -> None: + pass + + @classmethod + def _init_client(cls, **kwargs: Any) -> httpx.Client: + """Private helper method to create and return an httpx.Client.""" + merged_kwargs = {**cls.DEFAULT_KWARGS, **kwargs} + return httpx.Client(**merged_kwargs) + + @classmethod + def init_client(cls, name: str, **kwargs: Any) -> None: + """Allow the caller to init the client with extra params.""" + with cls._lock: + if name not in cls._clients: + cls._clients[name] = cls._init_client(**kwargs) + + @classmethod + def close_client(cls, name: str) -> None: + """Allow the caller to close the client.""" + with cls._lock: + client = cls._clients.pop(name, None) + if client: + client.close() + + @classmethod + def close_all(cls) -> None: + """Close all registered clients.""" + with cls._lock: + for client in cls._clients.values(): + client.close() + cls._clients.clear() + + @classmethod + def get(cls, name: str) -> httpx.Client: + """Gets the httpx.Client. Will init to default settings if not init'd.""" + with cls._lock: + if name not in cls._clients: + cls._clients[name] = cls._init_client() + return cls._clients[name] diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index d12838f64..ea7228a97 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -31,14 +31,15 @@ from onyx.db.document import upsert_documents from onyx.db.document_set import fetch_document_sets_for_documents from onyx.db.index_attempt import create_index_attempt_error from onyx.db.models import Document as DBDocument +from onyx.db.search_settings import get_current_search_settings from onyx.db.tag import create_or_add_document_tag from onyx.db.tag import create_or_add_document_tag_list +from onyx.document_index.document_index_utils import ( + get_multipass_config, +) from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentMetadata from onyx.document_index.interfaces import IndexBatchParams -from onyx.document_index.vespa.indexing_utils import ( - get_multipass_config, -) from onyx.indexing.chunker import Chunker from onyx.indexing.embedder import IndexingEmbedder from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface @@ -357,7 +358,6 @@ def index_doc_batch( is_public=False, ) - logger.debug("Filtering Documents") filtered_documents = filter_fnc(document_batch) ctx = index_doc_batch_prepare( @@ -527,7 +527,8 @@ def build_indexing_pipeline( callback: IndexingHeartbeatInterface | None = None, ) -> IndexingPipelineProtocol: """Builds a pipeline which takes in a list (batch) of docs and indexes them.""" - multipass_config = get_multipass_config(db_session, primary_index=True) + search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) chunker = chunker or Chunker( tokenizer=embedder.embedding_model.tokenizer, diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index 44a8419cb..753ec9ad9 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -55,9 +55,7 @@ class DocAwareChunk(BaseChunk): def to_short_descriptor(self) -> str: """Used when logging the identity of a chunk""" - return ( - f"Chunk ID: '{self.chunk_id}'; {self.source_document.to_short_descriptor()}" - ) + return f"{self.source_document.to_short_descriptor()} Chunk ID: {self.chunk_id}" class IndexChunk(DocAwareChunk): diff --git a/backend/onyx/secondary_llm_flows/starter_message_creation.py b/backend/onyx/secondary_llm_flows/starter_message_creation.py index 7fda3255c..18b4f9f3e 100644 --- a/backend/onyx/secondary_llm_flows/starter_message_creation.py +++ b/backend/onyx/secondary_llm_flows/starter_message_creation.py @@ -16,7 +16,7 @@ from onyx.context.search.preprocessing.access_filters import ( from onyx.db.document_set import get_document_sets_by_ids from onyx.db.models import StarterMessageModel as StarterMessage from onyx.db.models import User -from onyx.document_index.document_index_utils import get_both_index_names +from onyx.db.search_settings import get_active_search_settings from onyx.document_index.factory import get_default_document_index from onyx.llm.factory import get_default_llms from onyx.prompts.starter_messages import format_persona_starter_message_prompt @@ -34,8 +34,11 @@ def get_random_chunks_from_doc_sets( """ Retrieves random chunks from the specified document sets. """ - curr_ind_name, sec_ind_name = get_both_index_names(db_session) - document_index = get_default_document_index(curr_ind_name, sec_ind_name) + active_search_settings = get_active_search_settings(db_session) + document_index = get_default_document_index( + search_settings=active_search_settings.primary, + secondary_search_settings=active_search_settings.secondary, + ) acl_filters = build_access_filters_for_user(user, db_session) filters = IndexFilters(document_set=doc_sets, access_control_list=acl_filters) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index e97389610..5c2a362b2 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -3,6 +3,7 @@ import json import os from typing import cast +from sqlalchemy import update from sqlalchemy.orm import Session from onyx.access.models import default_public_access @@ -23,6 +24,7 @@ from onyx.db.document import check_docs_exist from onyx.db.enums import AccessType from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.index_attempt import mock_successful_index_attempt +from onyx.db.models import Document as DbDocument from onyx.db.search_settings import get_current_search_settings from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import IndexBatchParams @@ -59,6 +61,7 @@ def _create_indexable_chunks( doc_updated_at=None, primary_owners=[], secondary_owners=[], + chunk_count=1, ) if preprocessed_doc["chunk_ind"] == 0: ids_to_documents[document.id] = document @@ -155,9 +158,7 @@ def seed_initial_documents( logger.info("Embedding model has been updated, skipping") return - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, secondary_index_name=None - ) + document_index = get_default_document_index(search_settings, None) # Create a connector so the user can delete it if they want # or reindex it with a new search model if they want @@ -240,4 +241,12 @@ def seed_initial_documents( db_session=db_session, ) + # Since we bypass the indexing flow, we need to manually update the chunk count + for doc in docs: + db_session.execute( + update(DbDocument) + .where(DbDocument.id == doc.id) + .values(chunk_count=doc.chunk_count) + ) + kv_store.store(KV_DOCUMENTS_SEEDED_KEY, True) diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index c3a3540f7..ced98377f 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -42,7 +42,7 @@ from onyx.db.index_attempt import get_latest_index_attempt_for_cc_pair_id from onyx.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id from onyx.db.models import SearchSettings from onyx.db.models import User -from onyx.db.search_settings import get_active_search_settings +from onyx.db.search_settings import get_active_search_settings_list from onyx.db.search_settings import get_current_search_settings from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_pool import get_redis_client @@ -192,7 +192,7 @@ def update_cc_pair_status( if status_update_request.status == ConnectorCredentialPairStatus.PAUSED: redis_connector.stop.set_fence(True) - search_settings_list: list[SearchSettings] = get_active_search_settings( + search_settings_list: list[SearchSettings] = get_active_search_settings_list( db_session ) diff --git a/backend/onyx/server/documents/document.py b/backend/onyx/server/documents/document.py index c84a8288f..cbe751d61 100644 --- a/backend/onyx/server/documents/document.py +++ b/backend/onyx/server/documents/document.py @@ -32,10 +32,7 @@ def get_document_info( db_session: Session = Depends(get_session), ) -> DocumentInfo: search_settings = get_current_search_settings(db_session) - - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, secondary_index_name=None - ) + document_index = get_default_document_index(search_settings, None) user_acl_filters = build_access_filters_for_user(user, db_session) inference_chunks = document_index.id_based_retrieval( @@ -79,10 +76,7 @@ def get_chunk_info( db_session: Session = Depends(get_session), ) -> ChunkInfo: search_settings = get_current_search_settings(db_session) - - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, secondary_index_name=None - ) + document_index = get_default_document_index(search_settings, None) user_acl_filters = build_access_filters_for_user(user, db_session) chunk_request = VespaChunkRequest( diff --git a/backend/onyx/server/manage/search_settings.py b/backend/onyx/server/manage/search_settings.py index ae28bbbf7..22012fb69 100644 --- a/backend/onyx/server/manage/search_settings.py +++ b/backend/onyx/server/manage/search_settings.py @@ -22,6 +22,7 @@ from onyx.db.search_settings import get_embedding_provider_from_provider_type from onyx.db.search_settings import get_secondary_search_settings from onyx.db.search_settings import update_current_search_settings from onyx.db.search_settings import update_search_settings_status +from onyx.document_index.document_index_utils import get_multipass_config from onyx.document_index.factory import get_default_document_index from onyx.file_processing.unstructured import delete_unstructured_api_key from onyx.file_processing.unstructured import get_unstructured_api_key @@ -97,10 +98,9 @@ def set_new_search_settings( ) # Ensure Vespa has the new index immediately - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, - secondary_index_name=new_search_settings.index_name, - ) + get_multipass_config(search_settings) + get_multipass_config(new_search_settings) + document_index = get_default_document_index(search_settings, new_search_settings) document_index.ensure_indices_exist( index_embedding_dim=search_settings.model_dim, diff --git a/backend/onyx/server/onyx_api/ingestion.py b/backend/onyx/server/onyx_api/ingestion.py index cb6b7e6ca..0da8a79d0 100644 --- a/backend/onyx/server/onyx_api/ingestion.py +++ b/backend/onyx/server/onyx_api/ingestion.py @@ -14,9 +14,9 @@ from onyx.db.document import get_ingestion_documents from onyx.db.engine import get_current_tenant_id from onyx.db.engine import get_session from onyx.db.models import User +from onyx.db.search_settings import get_active_search_settings from onyx.db.search_settings import get_current_search_settings from onyx.db.search_settings import get_secondary_search_settings -from onyx.document_index.document_index_utils import get_both_index_names from onyx.document_index.factory import get_default_document_index from onyx.indexing.embedder import DefaultIndexingEmbedder from onyx.indexing.indexing_pipeline import build_indexing_pipeline @@ -89,9 +89,10 @@ def upsert_ingestion_doc( ) # Need to index for both the primary and secondary index if possible - curr_ind_name, sec_ind_name = get_both_index_names(db_session) + active_search_settings = get_active_search_settings(db_session) curr_doc_index = get_default_document_index( - primary_index_name=curr_ind_name, secondary_index_name=None + active_search_settings.primary, + None, ) search_settings = get_current_search_settings(db_session) @@ -117,11 +118,7 @@ def upsert_ingestion_doc( ) # If there's a secondary index being built, index the doc but don't use it for return here - if sec_ind_name: - sec_doc_index = get_default_document_index( - primary_index_name=curr_ind_name, secondary_index_name=None - ) - + if active_search_settings.secondary: sec_search_settings = get_secondary_search_settings(db_session) if sec_search_settings is None: @@ -134,6 +131,10 @@ def upsert_ingestion_doc( search_settings=sec_search_settings ) + sec_doc_index = get_default_document_index( + active_search_settings.secondary, None + ) + sec_ind_pipeline = build_indexing_pipeline( embedder=new_index_embedding_model, document_index=sec_doc_index, diff --git a/backend/onyx/server/query_and_chat/query_backend.py b/backend/onyx/server/query_and_chat/query_backend.py index 973166620..c148898fa 100644 --- a/backend/onyx/server/query_and_chat/query_backend.py +++ b/backend/onyx/server/query_and_chat/query_backend.py @@ -64,9 +64,8 @@ def admin_search( tenant_id=tenant_id, ) search_settings = get_current_search_settings(db_session) - document_index = get_default_document_index( - primary_index_name=search_settings.index_name, secondary_index_name=None - ) + document_index = get_default_document_index(search_settings, None) + if not isinstance(document_index, VespaIndex): raise HTTPException( status_code=400, diff --git a/backend/onyx/setup.py b/backend/onyx/setup.py index c212d02a9..d7e3253cd 100644 --- a/backend/onyx/setup.py +++ b/backend/onyx/setup.py @@ -25,6 +25,7 @@ from onyx.db.llm import fetch_default_provider from onyx.db.llm import update_default_provider from onyx.db.llm import upsert_llm_provider from onyx.db.persona import delete_old_default_personas +from onyx.db.search_settings import get_active_search_settings from onyx.db.search_settings import get_current_search_settings from onyx.db.search_settings import get_secondary_search_settings from onyx.db.search_settings import update_current_search_settings @@ -70,8 +71,19 @@ def setup_onyx( The Tenant Service calls the tenants/create endpoint which runs this. """ check_index_swap(db_session=db_session) - search_settings = get_current_search_settings(db_session) - secondary_search_settings = get_secondary_search_settings(db_session) + + active_search_settings = get_active_search_settings(db_session) + search_settings = active_search_settings.primary + secondary_search_settings = active_search_settings.secondary + + # search_settings = get_current_search_settings(db_session) + # multipass_config_1 = get_multipass_config(search_settings) + + # secondary_large_chunks_enabled: bool | None = None + # secondary_search_settings = get_secondary_search_settings(db_session) + # if secondary_search_settings: + # multipass_config_2 = get_multipass_config(secondary_search_settings) + # secondary_large_chunks_enabled = multipass_config_2.enable_large_chunks # Break bad state for thrashing indexes if secondary_search_settings and DISABLE_INDEX_UPDATE_ON_SWAP: @@ -122,10 +134,8 @@ def setup_onyx( # takes a bit of time to start up logger.notice("Verifying Document Index(s) is/are available.") document_index = get_default_document_index( - primary_index_name=search_settings.index_name, - secondary_index_name=secondary_search_settings.index_name - if secondary_search_settings - else None, + search_settings, + secondary_search_settings, ) success = setup_vespa( diff --git a/backend/scripts/force_delete_connector_by_id.py b/backend/scripts/force_delete_connector_by_id.py index 89038aeed..90a3e3cee 100755 --- a/backend/scripts/force_delete_connector_by_id.py +++ b/backend/scripts/force_delete_connector_by_id.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Session from onyx.db.document import delete_documents_complete__no_commit from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.search_settings import get_active_search_settings # Modify sys.path current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -38,7 +39,6 @@ from onyx.db.connector_credential_pair import ( from onyx.db.engine import get_session_context_manager from onyx.document_index.factory import get_default_document_index from onyx.file_store.file_store import get_default_file_store -from onyx.document_index.document_index_utils import get_both_index_names # pylint: enable=E402 # flake8: noqa: E402 @@ -191,9 +191,10 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None: ) try: logger.notice("Deleting information from Vespa and Postgres") - curr_ind_name, sec_ind_name = get_both_index_names(db_session) + active_search_settings = get_active_search_settings(db_session) document_index = get_default_document_index( - primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + active_search_settings.primary, + active_search_settings.secondary, ) files_deleted_count = _unsafe_deletion( diff --git a/backend/scripts/orphan_doc_cleanup_script.py b/backend/scripts/orphan_doc_cleanup_script.py index c138bdc64..499096387 100644 --- a/backend/scripts/orphan_doc_cleanup_script.py +++ b/backend/scripts/orphan_doc_cleanup_script.py @@ -5,6 +5,8 @@ import sys from sqlalchemy import text from sqlalchemy.orm import Session +from onyx.document_index.document_index_utils import get_multipass_config + # makes it so `PYTHONPATH=.` is not required when running this script parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(parent_dir) @@ -54,8 +56,14 @@ def main() -> None: # Setup Vespa index search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) index_name = search_settings.index_name - vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) + vespa_index = VespaIndex( + index_name=index_name, + secondary_index_name=None, + large_chunks_enabled=multipass_config.enable_large_chunks, + secondary_large_chunks_enabled=None, + ) # Delete chunks from Vespa first print("Deleting orphaned document chunks from Vespa") diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index ce71b1d28..36690e88b 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -16,6 +16,7 @@ from onyx.configs.constants import DocumentSource from onyx.connectors.models import Document from onyx.db.engine import get_session_context_manager from onyx.db.search_settings import get_current_search_settings +from onyx.document_index.document_index_utils import get_multipass_config from onyx.document_index.vespa.index import VespaIndex from onyx.indexing.indexing_pipeline import IndexBatchParams from onyx.indexing.models import ChunkEmbedding @@ -133,10 +134,16 @@ def seed_dummy_docs( ) -> None: with get_session_context_manager() as db_session: search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) index_name = search_settings.index_name embedding_dim = search_settings.model_dim - vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) + vespa_index = VespaIndex( + index_name=index_name, + secondary_index_name=None, + large_chunks_enabled=multipass_config.enable_large_chunks, + secondary_large_chunks_enabled=None, + ) print(index_name) all_chunks = [] diff --git a/backend/scripts/query_time_check/test_query_times.py b/backend/scripts/query_time_check/test_query_times.py index 2975a46e0..334d07f3c 100644 --- a/backend/scripts/query_time_check/test_query_times.py +++ b/backend/scripts/query_time_check/test_query_times.py @@ -9,6 +9,7 @@ from onyx.configs.model_configs import DOC_EMBEDDING_DIM from onyx.context.search.models import IndexFilters from onyx.db.engine import get_session_context_manager from onyx.db.search_settings import get_current_search_settings +from onyx.document_index.document_index_utils import get_multipass_config from onyx.document_index.vespa.index import VespaIndex from scripts.query_time_check.seed_dummy_docs import TOTAL_ACL_ENTRIES_PER_CATEGORY from scripts.query_time_check.seed_dummy_docs import TOTAL_DOC_SETS @@ -62,9 +63,15 @@ def test_hybrid_retrieval_times( ) -> None: with get_session_context_manager() as db_session: search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) index_name = search_settings.index_name - vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) + vespa_index = VespaIndex( + index_name=index_name, + secondary_index_name=None, + large_chunks_enabled=multipass_config.enable_large_chunks, + secondary_large_chunks_enabled=None, + ) # Generate random queries queries = [f"Random Query {i}" for i in range(number_of_queries)] diff --git a/backend/tests/integration/common_utils/reset.py b/backend/tests/integration/common_utils/reset.py index 4c6afb103..34db70fe9 100644 --- a/backend/tests/integration/common_utils/reset.py +++ b/backend/tests/integration/common_utils/reset.py @@ -18,6 +18,7 @@ from onyx.db.engine import get_session_with_tenant from onyx.db.engine import SYNC_DB_API from onyx.db.search_settings import get_current_search_settings from onyx.db.swap_index import check_index_swap +from onyx.document_index.document_index_utils import get_multipass_config from onyx.document_index.vespa.index import DOCUMENT_ID_ENDPOINT from onyx.document_index.vespa.index import VespaIndex from onyx.indexing.models import IndexingSetting @@ -173,10 +174,16 @@ def reset_vespa() -> None: check_index_swap(db_session) search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) index_name = search_settings.index_name success = setup_vespa( - document_index=VespaIndex(index_name=index_name, secondary_index_name=None), + document_index=VespaIndex( + index_name=index_name, + secondary_index_name=None, + large_chunks_enabled=multipass_config.enable_large_chunks, + secondary_large_chunks_enabled=None, + ), index_setting=IndexingSetting.from_db_model(search_settings), secondary_index_setting=None, ) @@ -250,10 +257,16 @@ def reset_vespa_multitenant() -> None: check_index_swap(db_session) search_settings = get_current_search_settings(db_session) + multipass_config = get_multipass_config(search_settings) index_name = search_settings.index_name success = setup_vespa( - document_index=VespaIndex(index_name=index_name, secondary_index_name=None), + document_index=VespaIndex( + index_name=index_name, + secondary_index_name=None, + large_chunks_enabled=multipass_config.enable_large_chunks, + secondary_large_chunks_enabled=None, + ), index_setting=IndexingSetting.from_db_model(search_settings), secondary_index_setting=None, ) diff --git a/backend/tests/unit/onyx/indexing/test_vespa.py b/backend/tests/unit/onyx/indexing/test_vespa.py index e9cc42f40..0fd3942e0 100644 --- a/backend/tests/unit/onyx/indexing/test_vespa.py +++ b/backend/tests/unit/onyx/indexing/test_vespa.py @@ -6,7 +6,7 @@ import pytest from sqlalchemy.orm import Session from onyx.db.engine import get_sqlalchemy_engine -from onyx.document_index.document_index_utils import get_both_index_names +from onyx.document_index.document_index_utils import get_both_index_properties from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT @@ -19,7 +19,7 @@ def test_vespa_update() -> None: doc_id = "test-vespa-update" with Session(get_sqlalchemy_engine()) as db_session: - primary_index_name, _ = get_both_index_names(db_session) + primary_index_name, _, _, _ = get_both_index_properties(db_session) endpoint = ( f"{DOCUMENT_ID_ENDPOINT.format(index_name=primary_index_name)}/{doc_id}" )