fix reading redis values as floats

This commit is contained in:
Richard Kuo 2025-02-10 20:48:55 -08:00
parent 5a4d007cf9
commit 9740ed32b5
3 changed files with 25 additions and 9 deletions

View File

@ -67,7 +67,7 @@ if not MULTI_TENANT:
]
def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]:
def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]:
beat_system_tasks = ee_beat_system_tasks + base_beat_system_tasks
beat_task_templates = ee_beat_task_templates + base_beat_task_templates
cloud_tasks = generate_cloud_tasks(

View File

@ -58,7 +58,7 @@ class DynamicTenantScheduler(PersistentScheduler):
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
task_logger.info("Reload interval reached, initiating task update")
task_logger.debug("Reload interval reached, initiating task update")
try:
self._try_updating_schedule()
except (AttributeError, KeyError):
@ -71,7 +71,7 @@ class DynamicTenantScheduler(PersistentScheduler):
return retval
def _generate_schedule(
self, tenant_ids: list[str] | list[None], beat_multiplier: int
self, tenant_ids: list[str] | list[None], beat_multiplier: float
) -> dict[str, dict[str, Any]]:
"""Given a list of tenant id's, generates a new beat schedule for celery."""
new_schedule: dict[str, dict[str, Any]] = {}
@ -149,7 +149,13 @@ class DynamicTenantScheduler(PersistentScheduler):
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT
beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier")
if beat_multiplier_raw is not None:
beat_multiplier = cast(int, beat_multiplier_raw)
try:
beat_multiplier_bytes = cast(bytes, beat_multiplier_raw)
beat_multiplier = float(beat_multiplier_bytes.decode())
except ValueError:
task_logger.error(
f"Invalid beat_multiplier value: {beat_multiplier_raw}"
)
new_schedule = self._generate_schedule(tenant_ids, beat_multiplier)
@ -169,10 +175,15 @@ class DynamicTenantScheduler(PersistentScheduler):
if not do_update:
# exit early if nothing changed
task_logger.info(
f"_try_updating_schedule - Schedule unchanged: "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
)
return
# schedule needs updating
task_logger.info(
task_logger.debug(
"Schedule update required",
extra={
"new_tasks": len(new_schedule),
@ -199,14 +210,16 @@ class DynamicTenantScheduler(PersistentScheduler):
# Ensure changes are persisted
self.sync()
self.last_beat_multiplier = beat_multiplier
task_logger.info(
f"_try_updating_schedule - Schedule updated: "
f"prev_num_tasks={len(current_schedule)} "
f"prev_beat_multiplier={self.last_beat_multiplier} "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
)
self.last_beat_multiplier = beat_multiplier
@staticmethod
def _compare_schedules(schedule1: dict, schedule2: dict) -> bool:
"""Compare schedules by task name only to determine if an update is needed.

View File

@ -19,7 +19,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)
# hack to slow down task dispatch in the cloud until
# we have a better implementation (backpressure, etc)
CLOUD_BEAT_MULTIPLIER_DEFAULT = 8
CLOUD_BEAT_MULTIPLIER_DEFAULT = 8.0
# tasks that run in either self-hosted on cloud
beat_task_templates: list[dict] = []
@ -177,6 +177,9 @@ def generate_cloud_tasks(
from incoming templates.
"""
if beat_multiplier <= 0:
raise ValueError("beat_multiplier must be positive!")
# start with the incoming beat tasks
cloud_tasks: list[dict] = copy.deepcopy(beat_tasks)
@ -192,7 +195,7 @@ def generate_cloud_tasks(
return cloud_tasks
def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]:
def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]:
return generate_cloud_tasks(beat_system_tasks, beat_task_templates, beat_multiplier)