mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-27 08:21:00 +02:00
wait for db before allowing worker to proceed (reduces error spam on … (#3079)
* wait for db before allowing worker to proceed (reduces error spam on container startup) * fix session usage * rework readiness probe logic to be less confusing and word ongoing probes better * add vespa probe too --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
This commit is contained in:
parent
55919f596c
commit
0c45488ac6
@ -3,6 +3,7 @@ import multiprocessing
|
|||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
import requests
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
from celery import Task
|
from celery import Task
|
||||||
from celery.app import trace
|
from celery.app import trace
|
||||||
@ -11,11 +12,15 @@ from celery.states import READY_STATES
|
|||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
from celery.worker import strategy # type: ignore
|
from celery.worker import strategy # type: ignore
|
||||||
from sentry_sdk.integrations.celery import CeleryIntegration
|
from sentry_sdk.integrations.celery import CeleryIntegration
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from danswer.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
|
from danswer.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
|
||||||
from danswer.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
|
from danswer.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
|
||||||
from danswer.background.celery.celery_utils import celery_is_worker_primary
|
from danswer.background.celery.celery_utils import celery_is_worker_primary
|
||||||
from danswer.configs.constants import DanswerRedisLocks
|
from danswer.configs.constants import DanswerRedisLocks
|
||||||
|
from danswer.db.engine import get_sqlalchemy_engine
|
||||||
|
from danswer.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL
|
||||||
from danswer.redis.redis_connector import RedisConnector
|
from danswer.redis.redis_connector import RedisConnector
|
||||||
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
|
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
|
||||||
from danswer.redis.redis_connector_delete import RedisConnectorDelete
|
from danswer.redis.redis_connector_delete import RedisConnectorDelete
|
||||||
@ -139,35 +144,130 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
|
|||||||
|
|
||||||
|
|
||||||
def wait_for_redis(sender: Any, **kwargs: Any) -> None:
|
def wait_for_redis(sender: Any, **kwargs: Any) -> None:
|
||||||
|
"""Waits for redis to become ready subject to a hardcoded timeout.
|
||||||
|
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""
|
||||||
|
|
||||||
r = get_redis_client(tenant_id=None)
|
r = get_redis_client(tenant_id=None)
|
||||||
|
|
||||||
WAIT_INTERVAL = 5
|
WAIT_INTERVAL = 5
|
||||||
WAIT_LIMIT = 60
|
WAIT_LIMIT = 60
|
||||||
|
|
||||||
|
ready = False
|
||||||
time_start = time.monotonic()
|
time_start = time.monotonic()
|
||||||
logger.info("Redis: Readiness check starting.")
|
logger.info("Redis: Readiness probe starting.")
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
if r.ping():
|
if r.ping():
|
||||||
|
ready = True
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
time_elapsed = time.monotonic() - time_start
|
time_elapsed = time.monotonic() - time_start
|
||||||
logger.info(
|
|
||||||
f"Redis: Ping failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
|
||||||
)
|
|
||||||
if time_elapsed > WAIT_LIMIT:
|
if time_elapsed > WAIT_LIMIT:
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Redis: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(WAIT_INTERVAL)
|
||||||
|
|
||||||
|
if not ready:
|
||||||
msg = (
|
msg = (
|
||||||
f"Redis: Readiness check did not succeed within the timeout "
|
f"Redis: Readiness probe did not succeed within the timeout "
|
||||||
f"({WAIT_LIMIT} seconds). Exiting..."
|
f"({WAIT_LIMIT} seconds). Exiting..."
|
||||||
)
|
)
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
raise WorkerShutdown(msg)
|
raise WorkerShutdown(msg)
|
||||||
|
|
||||||
|
logger.info("Redis: Readiness probe succeeded. Continuing...")
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_db(sender: Any, **kwargs: Any) -> None:
|
||||||
|
"""Waits for the db to become ready subject to a hardcoded timeout.
|
||||||
|
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""
|
||||||
|
|
||||||
|
WAIT_INTERVAL = 5
|
||||||
|
WAIT_LIMIT = 60
|
||||||
|
|
||||||
|
ready = False
|
||||||
|
time_start = time.monotonic()
|
||||||
|
logger.info("Database: Readiness probe starting.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
with Session(get_sqlalchemy_engine()) as db_session:
|
||||||
|
result = db_session.execute(text("SELECT NOW()")).scalar()
|
||||||
|
if result:
|
||||||
|
ready = True
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
time_elapsed = time.monotonic() - time_start
|
||||||
|
if time_elapsed > WAIT_LIMIT:
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Database: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
||||||
|
)
|
||||||
|
|
||||||
time.sleep(WAIT_INTERVAL)
|
time.sleep(WAIT_INTERVAL)
|
||||||
|
|
||||||
logger.info("Redis: Readiness check succeeded. Continuing...")
|
if not ready:
|
||||||
|
msg = (
|
||||||
|
f"Database: Readiness probe did not succeed within the timeout "
|
||||||
|
f"({WAIT_LIMIT} seconds). Exiting..."
|
||||||
|
)
|
||||||
|
logger.error(msg)
|
||||||
|
raise WorkerShutdown(msg)
|
||||||
|
|
||||||
|
logger.info("Database: Readiness probe succeeded. Continuing...")
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_vespa(sender: Any, **kwargs: Any) -> None:
|
||||||
|
"""Waits for Vespa to become ready subject to a hardcoded timeout.
|
||||||
|
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""
|
||||||
|
|
||||||
|
WAIT_INTERVAL = 5
|
||||||
|
WAIT_LIMIT = 60
|
||||||
|
|
||||||
|
ready = False
|
||||||
|
time_start = time.monotonic()
|
||||||
|
logger.info("Vespa: Readiness probe starting.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
response_dict = response.json()
|
||||||
|
if response_dict["status"]["code"] == "up":
|
||||||
|
ready = True
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
time_elapsed = time.monotonic() - time_start
|
||||||
|
if time_elapsed > WAIT_LIMIT:
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(WAIT_INTERVAL)
|
||||||
|
|
||||||
|
if not ready:
|
||||||
|
msg = (
|
||||||
|
f"Vespa: Readiness probe did not succeed within the timeout "
|
||||||
|
f"({WAIT_LIMIT} seconds). Exiting..."
|
||||||
|
)
|
||||||
|
logger.error(msg)
|
||||||
|
raise WorkerShutdown(msg)
|
||||||
|
|
||||||
|
logger.info("Vespa: Readiness probe succeeded. Continuing...")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
SqlEngine.init_engine(pool_size=4, max_overflow=12)
|
SqlEngine.init_engine(pool_size=4, max_overflow=12)
|
||||||
|
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
|
app_base.wait_for_db(sender, **kwargs)
|
||||||
|
app_base.wait_for_vespa(sender, **kwargs)
|
||||||
app_base.on_secondary_worker_init(sender, **kwargs)
|
app_base.on_secondary_worker_init(sender, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
||||||
|
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
|
app_base.wait_for_db(sender, **kwargs)
|
||||||
|
app_base.wait_for_vespa(sender, **kwargs)
|
||||||
app_base.on_secondary_worker_init(sender, **kwargs)
|
app_base.on_secondary_worker_init(sender, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
|
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
|
||||||
|
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
|
app_base.wait_for_db(sender, **kwargs)
|
||||||
|
app_base.wait_for_vespa(sender, **kwargs)
|
||||||
app_base.on_secondary_worker_init(sender, **kwargs)
|
app_base.on_secondary_worker_init(sender, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -76,6 +76,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
||||||
|
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
|
app_base.wait_for_db(sender, **kwargs)
|
||||||
|
app_base.wait_for_vespa(sender, **kwargs)
|
||||||
|
|
||||||
logger.info("Running as the primary celery worker.")
|
logger.info("Running as the primary celery worker.")
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user