diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 7435342c4..cc7440e00 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -23,8 +23,7 @@ from onyx.background.celery.celery_utils import celery_is_worker_primary from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine import get_sqlalchemy_engine -from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client -from onyx.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL +from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_delete import RedisConnectorDelete @@ -280,51 +279,6 @@ def wait_for_db(sender: Any, **kwargs: Any) -> None: return -def wait_for_vespa(sender: Any, **kwargs: Any) -> None: - """Waits for Vespa to become ready subject to a hardcoded timeout. - Will raise WorkerShutdown to kill the celery worker if the timeout is reached.""" - - WAIT_INTERVAL = 5 - WAIT_LIMIT = 60 - - ready = False - time_start = time.monotonic() - logger.info("Vespa: Readiness probe starting.") - while True: - try: - client = get_vespa_http_client() - response = client.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health") - response.raise_for_status() - - response_dict = response.json() - if response_dict["status"]["code"] == "up": - ready = True - break - except Exception: - pass - - time_elapsed = time.monotonic() - time_start - if time_elapsed > WAIT_LIMIT: - break - - logger.info( - f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}" - ) - - time.sleep(WAIT_INTERVAL) - - if not ready: - msg = ( - f"Vespa: Readiness probe did not succeed within the timeout " - f"({WAIT_LIMIT} seconds). Exiting..." - ) - logger.error(msg) - raise WorkerShutdown(msg) - - logger.info("Vespa: Readiness probe succeeded. Continuing...") - return - - def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("Running as a secondary celery worker.") @@ -510,3 +464,13 @@ def reset_tenant_id( ) -> None: """Signal handler to reset tenant ID in context var after task ends.""" CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA) + + +def wait_for_vespa_or_shutdown(sender: Any, **kwargs: Any) -> None: + """Waits for Vespa to become ready subject to a timeout. + Raises WorkerShutdown if the timeout is reached.""" + + if not wait_for_vespa_with_timeout(): + msg = "Vespa: Readiness probe did not succeed within the timeout. Exiting..." + logger.error(msg) + raise WorkerShutdown(msg) diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index 4854940fd..7535a8c97 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -62,7 +62,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) - app_base.wait_for_vespa(sender, **kwargs) + app_base.wait_for_vespa_or_shutdown(sender, **kwargs) # Less startup checks in multi-tenant case if MULTI_TENANT: diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index e222da5e3..df50bd4ec 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -68,7 +68,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) - app_base.wait_for_vespa(sender, **kwargs) + app_base.wait_for_vespa_or_shutdown(sender, **kwargs) # Less startup checks in multi-tenant case if MULTI_TENANT: diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index abc2cfab1..5802da511 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -63,7 +63,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) - app_base.wait_for_vespa(sender, **kwargs) + app_base.wait_for_vespa_or_shutdown(sender, **kwargs) # Less startup checks in multi-tenant case if MULTI_TENANT: diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index b2ccd0874..435a88e2c 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -86,7 +86,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) - app_base.wait_for_vespa(sender, **kwargs) + app_base.wait_for_vespa_or_shutdown(sender, **kwargs) logger.info("Running as the primary celery worker.") diff --git a/backend/onyx/document_index/vespa/shared_utils/utils.py b/backend/onyx/document_index/vespa/shared_utils/utils.py index da0f5c298..c8a382e72 100644 --- a/backend/onyx/document_index/vespa/shared_utils/utils.py +++ b/backend/onyx/document_index/vespa/shared_utils/utils.py @@ -1,4 +1,5 @@ import re +import time from typing import cast import httpx @@ -7,6 +8,10 @@ from onyx.configs.app_configs import MANAGED_VESPA from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH from onyx.configs.app_configs import VESPA_REQUEST_TIMEOUT +from onyx.document_index.vespa_constants import VESPA_APP_CONTAINER_URL +from onyx.utils.logger import setup_logger + +logger = setup_logger() # NOTE: This does not seem to be used in reality despite the Vespa Docs pointing to this code # See here for reference: https://docs.vespa.ai/en/documents.html @@ -69,3 +74,37 @@ def get_vespa_http_client(no_timeout: bool = False, http2: bool = True) -> httpx timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT, http2=http2, ) + + +def wait_for_vespa_with_timeout(wait_interval: int = 5, wait_limit: int = 60) -> bool: + """Waits for Vespa to become ready subject to a timeout. + Returns True if Vespa is ready, False otherwise.""" + + time_start = time.monotonic() + logger.info("Vespa: Readiness probe starting.") + while True: + try: + client = get_vespa_http_client() + response = client.get(f"{VESPA_APP_CONTAINER_URL}/state/v1/health") + response.raise_for_status() + + response_dict = response.json() + if response_dict["status"]["code"] == "up": + logger.info("Vespa: Readiness probe succeeded. Continuing...") + return True + except Exception: + pass + + time_elapsed = time.monotonic() - time_start + if time_elapsed > wait_limit: + logger.info( + f"Vespa: Readiness probe did not succeed within the timeout " + f"({wait_limit} seconds)." + ) + return False + + logger.info( + f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={wait_limit:.1f}" + ) + + time.sleep(wait_interval) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index b895a0f7a..e97389610 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -26,6 +26,7 @@ from onyx.db.index_attempt import mock_successful_index_attempt from onyx.db.search_settings import get_current_search_settings from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import IndexBatchParams +from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout from onyx.indexing.indexing_pipeline import index_doc_batch_prepare from onyx.indexing.models import ChunkEmbedding from onyx.indexing.models import DocMetadataAwareIndexChunk @@ -33,7 +34,6 @@ from onyx.key_value_store.factory import get_kv_store from onyx.key_value_store.interface import KvKeyNotFoundError from onyx.server.documents.models import ConnectorBase from onyx.utils.logger import setup_logger -from onyx.utils.retry_wrapper import retry_builder from onyx.utils.variable_functionality import fetch_versioned_implementation logger = setup_logger() @@ -218,9 +218,11 @@ def seed_initial_documents( # Retries here because the index may take a few seconds to become ready # as we just sent over the Vespa schema and there is a slight delay + if not wait_for_vespa_with_timeout(): + logger.error("Vespa did not become ready within the timeout") + raise ValueError("Vespa failed to become ready within the timeout") - index_with_retries = retry_builder(tries=15)(document_index.index) - index_with_retries( + document_index.index( chunks=chunks, index_batch_params=IndexBatchParams( doc_id_to_previous_chunk_cnt={},