mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-08-31 00:10:28 +02:00
add probe signals for beat (#4760)
Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
@@ -2,7 +2,6 @@ import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
@@ -24,6 +23,7 @@ from sqlalchemy.orm import Session
|
||||
from onyx.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
|
||||
from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
|
||||
from onyx.background.celery.celery_utils import celery_is_worker_primary
|
||||
from onyx.background.celery.celery_utils import make_probe_path
|
||||
from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.engine import get_sqlalchemy_engine
|
||||
@@ -83,19 +83,6 @@ class TenantAwareTask(Task):
|
||||
CURRENT_TENANT_ID_CONTEXTVAR.set(None)
|
||||
|
||||
|
||||
def _make_probe_path(probe: str, hostname: str) -> Path:
|
||||
hostname_parts = hostname.split("@")
|
||||
if len(hostname_parts) != 2:
|
||||
raise ValueError(f"hostname could not be split! {hostname=}")
|
||||
|
||||
name = hostname_parts[0]
|
||||
if not name:
|
||||
raise ValueError(f"name cannot be empty! {name=}")
|
||||
|
||||
safe_name = "".join(c for c in name if c.isalnum()).rstrip()
|
||||
return Path(f"/tmp/onyx_k8s_{safe_name}_{probe}.txt")
|
||||
|
||||
|
||||
@task_prerun.connect
|
||||
def on_task_prerun(
|
||||
sender: Any | None = None,
|
||||
@@ -355,12 +342,12 @@ def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None:
|
||||
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
|
||||
task_logger.info("worker_ready signal received.")
|
||||
|
||||
#
|
||||
# file based way to do readiness/liveness probes
|
||||
# https://medium.com/ambient-innovation/health-checks-for-celery-in-kubernetes-cf3274a3e106
|
||||
# https://github.com/celery/celery/issues/4079#issuecomment-1270085680
|
||||
|
||||
hostname: str = cast(str, sender.hostname)
|
||||
path = _make_probe_path("readiness", hostname)
|
||||
path = make_probe_path("readiness", hostname)
|
||||
path.touch()
|
||||
logger.info(f"Readiness signal touched at {path}.")
|
||||
|
||||
@@ -369,7 +356,7 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
|
||||
HttpxPool.close_all()
|
||||
|
||||
hostname: str = cast(str, sender.hostname)
|
||||
path = _make_probe_path("readiness", hostname)
|
||||
path = make_probe_path("readiness", hostname)
|
||||
path.unlink(missing_ok=True)
|
||||
|
||||
if not celery_is_worker_primary(sender):
|
||||
@@ -521,7 +508,7 @@ class LivenessProbe(bootsteps.StartStopStep):
|
||||
super().__init__(worker, **kwargs)
|
||||
self.requests: list[Any] = []
|
||||
self.task_tref = None
|
||||
self.path = _make_probe_path("liveness", worker.hostname)
|
||||
self.path = make_probe_path("liveness", worker.hostname)
|
||||
|
||||
def start(self, worker: Any) -> None:
|
||||
self.task_tref = worker.timer.call_repeatedly(
|
||||
|
@@ -8,6 +8,7 @@ from celery.signals import beat_init
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
import onyx.background.celery.apps.app_base as app_base
|
||||
from onyx.background.celery.celery_utils import make_probe_path
|
||||
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
|
||||
from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME
|
||||
from onyx.db.engine import get_all_tenant_ids
|
||||
@@ -45,6 +46,8 @@ class DynamicTenantScheduler(PersistentScheduler):
|
||||
f"DynamicTenantScheduler initialized: reload_interval={self._reload_interval}"
|
||||
)
|
||||
|
||||
self._liveness_probe_path = make_probe_path("liveness", "beat@hostname")
|
||||
|
||||
# do not set the initial schedule here because we don't have db access yet.
|
||||
# do it in beat_init after the db engine is initialized
|
||||
|
||||
@@ -62,6 +65,8 @@ class DynamicTenantScheduler(PersistentScheduler):
|
||||
or (now - self._last_reload) > self._reload_interval
|
||||
):
|
||||
task_logger.debug("Reload interval reached, initiating task update")
|
||||
self._liveness_probe_path.touch()
|
||||
|
||||
try:
|
||||
self._try_updating_schedule()
|
||||
except (AttributeError, KeyError):
|
||||
@@ -241,6 +246,9 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None:
|
||||
SqlEngine.init_engine(pool_size=2, max_overflow=0)
|
||||
|
||||
app_base.wait_for_redis(sender, **kwargs)
|
||||
path = make_probe_path("readiness", "beat@hostname")
|
||||
path.touch()
|
||||
task_logger.info(f"Readiness signal touched at {path}.")
|
||||
|
||||
# first time init of the scheduler after db has been init'ed
|
||||
scheduler: DynamicTenantScheduler = sender.scheduler
|
||||
|
@@ -1,5 +1,6 @@
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
@@ -121,3 +122,20 @@ def httpx_init_vespa_pool(
|
||||
http2=False,
|
||||
limits=httpx.Limits(max_keepalive_connections=max_keepalive_connections),
|
||||
)
|
||||
|
||||
|
||||
def make_probe_path(probe: str, hostname: str) -> Path:
|
||||
"""templates the path for a k8s probe file.
|
||||
|
||||
e.g. /tmp/onyx_k8s_indexing_readiness.txt
|
||||
"""
|
||||
hostname_parts = hostname.split("@")
|
||||
if len(hostname_parts) != 2:
|
||||
raise ValueError(f"hostname could not be split! {hostname=}")
|
||||
|
||||
name = hostname_parts[0]
|
||||
if not name:
|
||||
raise ValueError(f"name cannot be empty! {name=}")
|
||||
|
||||
safe_name = "".join(c for c in name if c.isalnum()).rstrip()
|
||||
return Path(f"/tmp/onyx_k8s_{safe_name}_{probe}.txt")
|
||||
|
Reference in New Issue
Block a user