diff --git a/backend/ee/onyx/background/celery/tasks/beat_schedule.py b/backend/ee/onyx/background/celery/tasks/beat_schedule.py index fa2bc03306..00283e1795 100644 --- a/backend/ee/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/ee/onyx/background/celery/tasks/beat_schedule.py @@ -67,7 +67,7 @@ if not MULTI_TENANT: ] -def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: +def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]: beat_system_tasks = ee_beat_system_tasks + base_beat_system_tasks beat_task_templates = ee_beat_task_templates + base_beat_task_templates cloud_tasks = generate_cloud_tasks( diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 004d15b202..45fe040fc6 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -58,7 +58,7 @@ class DynamicTenantScheduler(PersistentScheduler): self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - task_logger.info("Reload interval reached, initiating task update") + task_logger.debug("Reload interval reached, initiating task update") try: self._try_updating_schedule() except (AttributeError, KeyError): @@ -71,7 +71,7 @@ class DynamicTenantScheduler(PersistentScheduler): return retval def _generate_schedule( - self, tenant_ids: list[str] | list[None], beat_multiplier: int + self, tenant_ids: list[str] | list[None], beat_multiplier: float ) -> dict[str, dict[str, Any]]: """Given a list of tenant id's, generates a new beat schedule for celery.""" new_schedule: dict[str, dict[str, Any]] = {} @@ -149,7 +149,13 @@ class DynamicTenantScheduler(PersistentScheduler): beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier") if beat_multiplier_raw is not None: - beat_multiplier = cast(int, beat_multiplier_raw) + try: + beat_multiplier_bytes = cast(bytes, beat_multiplier_raw) + beat_multiplier = float(beat_multiplier_bytes.decode()) + except ValueError: + task_logger.error( + f"Invalid beat_multiplier value: {beat_multiplier_raw}" + ) new_schedule = self._generate_schedule(tenant_ids, beat_multiplier) @@ -169,10 +175,15 @@ class DynamicTenantScheduler(PersistentScheduler): if not do_update: # exit early if nothing changed + task_logger.info( + f"_try_updating_schedule - Schedule unchanged: " + f"tasks={len(new_schedule)} " + f"beat_multiplier={beat_multiplier}" + ) return # schedule needs updating - task_logger.info( + task_logger.debug( "Schedule update required", extra={ "new_tasks": len(new_schedule), @@ -199,14 +210,16 @@ class DynamicTenantScheduler(PersistentScheduler): # Ensure changes are persisted self.sync() - self.last_beat_multiplier = beat_multiplier - task_logger.info( f"_try_updating_schedule - Schedule updated: " + f"prev_num_tasks={len(current_schedule)} " + f"prev_beat_multiplier={self.last_beat_multiplier} " f"tasks={len(new_schedule)} " f"beat_multiplier={beat_multiplier}" ) + self.last_beat_multiplier = beat_multiplier + @staticmethod def _compare_schedules(schedule1: dict, schedule2: dict) -> bool: """Compare schedules by task name only to determine if an update is needed. diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 34a0d26db7..ea483f469c 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -19,7 +19,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_MULTIPLIER_DEFAULT = 8 +CLOUD_BEAT_MULTIPLIER_DEFAULT = 8.0 # tasks that run in either self-hosted on cloud beat_task_templates: list[dict] = [] @@ -177,6 +177,9 @@ def generate_cloud_tasks( from incoming templates. """ + if beat_multiplier <= 0: + raise ValueError("beat_multiplier must be positive!") + # start with the incoming beat tasks cloud_tasks: list[dict] = copy.deepcopy(beat_tasks) @@ -192,7 +195,7 @@ def generate_cloud_tasks( return cloud_tasks -def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: +def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]: return generate_cloud_tasks(beat_system_tasks, beat_task_templates, beat_multiplier)