From 5e32f9d9229f0f585cdc4a9039a189f719bf3dc6 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 10 Feb 2025 11:20:38 -0800 Subject: [PATCH 1/5] refactoring and update multiplier in real time --- .../background/celery/tasks/beat_schedule.py | 67 +++++----- backend/onyx/background/celery/apps/beat.py | 120 ++++++++++-------- .../background/celery/tasks/beat_schedule.py | 44 +++++-- backend/onyx/configs/constants.py | 3 + 4 files changed, 142 insertions(+), 92 deletions(-) diff --git a/backend/ee/onyx/background/celery/tasks/beat_schedule.py b/backend/ee/onyx/background/celery/tasks/beat_schedule.py index ab7c483af..fa2bc0330 100644 --- a/backend/ee/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/ee/onyx/background/celery/tasks/beat_schedule.py @@ -3,42 +3,44 @@ from typing import Any from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT from onyx.background.celery.tasks.beat_schedule import ( - cloud_tasks_to_schedule as base_cloud_tasks_to_schedule, + beat_system_tasks as base_beat_system_tasks, ) from onyx.background.celery.tasks.beat_schedule import ( - tasks_to_schedule as base_tasks_to_schedule, + beat_task_templates as base_beat_task_templates, +) +from onyx.background.celery.tasks.beat_schedule import generate_cloud_tasks +from onyx.background.celery.tasks.beat_schedule import ( + get_tasks_to_schedule as base_get_tasks_to_schedule, ) -from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask from shared_configs.configs import MULTI_TENANT -ee_cloud_tasks_to_schedule = [ - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_autogenerate-usage-report", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(days=30), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, +ee_beat_system_tasks: list[dict] = [] + +ee_beat_task_templates: list[dict] = [] +ee_beat_task_templates.extend( + [ + { + "name": "autogenerate-usage-report", + "task": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, + "schedule": timedelta(days=30), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - "kwargs": { - "task_name": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, + { + "name": "check-ttl-management", + "task": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK, + "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-ttl-management", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(hours=1), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK, - }, - }, -] + ] +) ee_tasks_to_schedule: list[dict] = [] @@ -65,9 +67,14 @@ if not MULTI_TENANT: ] -def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: - return ee_cloud_tasks_to_schedule + base_cloud_tasks_to_schedule +def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: + beat_system_tasks = ee_beat_system_tasks + base_beat_system_tasks + beat_task_templates = ee_beat_task_templates + base_beat_task_templates + cloud_tasks = generate_cloud_tasks( + beat_system_tasks, beat_task_templates, beat_multiplier + ) + return cloud_tasks def get_tasks_to_schedule() -> list[dict[str, Any]]: - return ee_tasks_to_schedule + base_tasks_to_schedule + return ee_tasks_to_schedule + base_get_tasks_to_schedule() diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 16a533239..004d15b20 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -1,41 +1,55 @@ from datetime import timedelta from typing import Any +from typing import cast from celery import Celery from celery import signals from celery.beat import PersistentScheduler # type: ignore from celery.signals import beat_init +from celery.utils.log import get_task_logger import onyx.background.celery.apps.app_base as app_base +from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT +from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME +from onyx.configs.constants import ONYX_CLOUD_TENANT_ID from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import SqlEngine -from onyx.utils.logger import setup_logger +from onyx.redis.redis_pool import get_redis_replica_client from onyx.utils.variable_functionality import fetch_versioned_implementation from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST from shared_configs.configs import MULTI_TENANT -logger = setup_logger(__name__) +task_logger = get_task_logger(__name__) celery_app = Celery(__name__) celery_app.config_from_object("onyx.background.celery.configs.beat") class DynamicTenantScheduler(PersistentScheduler): + """This scheduler is useful because we can dynamically adjust task generation rates + through it.""" + + RELOAD_INTERVAL = 120 + def __init__(self, *args: Any, **kwargs: Any) -> None: - logger.info("Initializing DynamicTenantScheduler") + task_logger.info("Initializing DynamicTenantScheduler") super().__init__(*args, **kwargs) - self._reload_interval = timedelta(minutes=2) + + self.last_beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + self._reload_interval = timedelta( + seconds=DynamicTenantScheduler.RELOAD_INTERVAL + ) self._last_reload = self.app.now() - self._reload_interval + # Let the parent class handle store initialization self.setup_schedule() self._try_updating_schedule() - logger.info(f"Set reload interval to {self._reload_interval}") + task_logger.info(f"Setting reload interval to {self._reload_interval}") def setup_schedule(self) -> None: - logger.info("Setting up initial schedule") super().setup_schedule() - logger.info("Initial schedule setup complete") def tick(self) -> float: retval = super().tick() @@ -44,24 +58,22 @@ class DynamicTenantScheduler(PersistentScheduler): self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - logger.info("Reload interval reached, initiating task update") + task_logger.info("Reload interval reached, initiating task update") try: self._try_updating_schedule() - except (AttributeError, KeyError) as e: - logger.exception(f"Failed to process task configuration: {str(e)}") - except Exception as e: - logger.exception(f"Unexpected error updating tasks: {str(e)}") + except (AttributeError, KeyError): + task_logger.exception("Failed to process task configuration") + except Exception: + task_logger.exception("Unexpected error updating tasks") self._last_reload = now - logger.info("Task update completed, reset reload timer") + return retval def _generate_schedule( - self, tenant_ids: list[str] | list[None] + self, tenant_ids: list[str] | list[None], beat_multiplier: int ) -> dict[str, dict[str, Any]]: """Given a list of tenant id's, generates a new beat schedule for celery.""" - logger.info("Fetching tasks to schedule") - new_schedule: dict[str, dict[str, Any]] = {} if MULTI_TENANT: @@ -71,9 +83,9 @@ class DynamicTenantScheduler(PersistentScheduler): "get_cloud_tasks_to_schedule", ) - cloud_tasks_to_schedule: list[ - dict[str, Any] - ] = get_cloud_tasks_to_schedule() + cloud_tasks_to_schedule: list[dict[str, Any]] = get_cloud_tasks_to_schedule( + beat_multiplier + ) for task in cloud_tasks_to_schedule: task_name = task["name"] cloud_task = { @@ -82,7 +94,7 @@ class DynamicTenantScheduler(PersistentScheduler): "kwargs": task.get("kwargs", {}), } if options := task.get("options"): - logger.debug(f"Adding options to task {task_name}: {options}") + task_logger.debug(f"Adding options to task {task_name}: {options}") cloud_task["options"] = options new_schedule[task_name] = cloud_task @@ -95,7 +107,7 @@ class DynamicTenantScheduler(PersistentScheduler): for tenant_id in tenant_ids: if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST: - logger.info( + task_logger.debug( f"Skipping tenant {tenant_id} as it is in the ignored syncing list" ) continue @@ -104,14 +116,14 @@ class DynamicTenantScheduler(PersistentScheduler): task_name = task["name"] tenant_task_name = f"{task['name']}-{tenant_id}" - logger.debug(f"Creating task configuration for {tenant_task_name}") + task_logger.debug(f"Creating task configuration for {tenant_task_name}") tenant_task = { "task": task["task"], "schedule": task["schedule"], "kwargs": {"tenant_id": tenant_id}, } if options := task.get("options"): - logger.debug( + task_logger.debug( f"Adding options to task {tenant_task_name}: {options}" ) tenant_task["options"] = options @@ -121,44 +133,46 @@ class DynamicTenantScheduler(PersistentScheduler): def _try_updating_schedule(self) -> None: """Only updates the actual beat schedule on the celery app when it changes""" + do_update = False - logger.info("_try_updating_schedule starting") + r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID) + + task_logger.debug("_try_updating_schedule starting") tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} IDs") + task_logger.debug(f"Found {len(tenant_ids)} IDs") # get current schedule and extract current tenants current_schedule = self.schedule.items() - # there are no more per tenant beat tasks, so comment this out - # NOTE: we may not actualy need this scheduler any more and should - # test reverting to a regular beat schedule implementation + # get potential new state + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier") + if beat_multiplier_raw is not None: + beat_multiplier = cast(int, beat_multiplier_raw) - # current_tenants = set() - # for task_name, _ in current_schedule: - # task_name = cast(str, task_name) - # if task_name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX): - # continue + new_schedule = self._generate_schedule(tenant_ids, beat_multiplier) - # if "_" in task_name: - # # example: "check-for-condition-tenant_12345678-abcd-efgh-ijkl-12345678" - # # -> "12345678-abcd-efgh-ijkl-12345678" - # current_tenants.add(task_name.split("_")[-1]) - # logger.info(f"Found {len(current_tenants)} existing items in schedule") + # if the schedule or beat multiplier has changed, update + while True: + if beat_multiplier != self.last_beat_multiplier: + do_update = True + break - # for tenant_id in tenant_ids: - # if tenant_id not in current_tenants: - # logger.info(f"Processing new tenant: {tenant_id}") + if not DynamicTenantScheduler._compare_schedules( + current_schedule, new_schedule + ): + do_update = True + break - new_schedule = self._generate_schedule(tenant_ids) + break - if DynamicTenantScheduler._compare_schedules(current_schedule, new_schedule): - logger.info( - "_try_updating_schedule: Current schedule is up to date, no changes needed" - ) + if not do_update: + # exit early if nothing changed return - logger.info( + # schedule needs updating + task_logger.info( "Schedule update required", extra={ "new_tasks": len(new_schedule), @@ -185,11 +199,17 @@ class DynamicTenantScheduler(PersistentScheduler): # Ensure changes are persisted self.sync() - logger.info("_try_updating_schedule: Schedule updated successfully") + self.last_beat_multiplier = beat_multiplier + + task_logger.info( + f"_try_updating_schedule - Schedule updated: " + f"tasks={len(new_schedule)} " + f"beat_multiplier={beat_multiplier}" + ) @staticmethod def _compare_schedules(schedule1: dict, schedule2: dict) -> bool: - """Compare schedules to determine if an update is needed. + """Compare schedules by task name only to determine if an update is needed. True if equivalent, False if not.""" current_tasks = set(name for name, _ in schedule1) new_tasks = set(schedule2.keys()) @@ -201,7 +221,7 @@ class DynamicTenantScheduler(PersistentScheduler): @beat_init.connect def on_beat_init(sender: Any, **kwargs: Any) -> None: - logger.info("beat_init signal received.") + task_logger.info("beat_init signal received.") # Celery beat shouldn't touch the db at all. But just setting a low minimum here. SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index e87ea9d49..40db8500c 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -1,3 +1,4 @@ +import copy from datetime import timedelta from typing import Any @@ -18,7 +19,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 +CLOUD_BEAT_MULTIPLIER_DEFAULT = 8 # tasks that run in either self-hosted on cloud beat_task_templates: list[dict] = [] @@ -121,7 +122,7 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]: # constant options for cloud beat task generators task_schedule: timedelta = task["schedule"] - cloud_task["schedule"] = task_schedule * CLOUD_BEAT_SCHEDULE_MULTIPLIER + cloud_task["schedule"] = task_schedule cloud_task["options"] = {} cloud_task["options"]["priority"] = OnyxCeleryPriority.HIGHEST cloud_task["options"]["expires"] = BEAT_EXPIRES_DEFAULT @@ -141,9 +142,9 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]: # tasks that only run in the cloud -# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered -# by the DynamicTenantScheduler -cloud_tasks_to_schedule: list[dict] = [ +# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be seen +# by the DynamicTenantScheduler as system wide task and not a per tenant task +beat_system_tasks: list[dict] = [ # cloud specific tasks { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", @@ -157,18 +158,37 @@ cloud_tasks_to_schedule: list[dict] = [ }, ] -# generate our cloud and self-hosted beat tasks from the templates -for beat_task_template in beat_task_templates: - cloud_task = make_cloud_generator_task(beat_task_template) - cloud_tasks_to_schedule.append(cloud_task) - tasks_to_schedule: list[dict] = [] if not MULTI_TENANT: tasks_to_schedule = beat_task_templates -def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: - return cloud_tasks_to_schedule +def generate_cloud_tasks( + beat_tasks: list[dict], beat_templates: list[dict], beat_multiplier: float +) -> list[dict[str, Any]]: + """ + beat_tasks: system wide tasks that can be sent as is + beat_templates: task templates that will be transformed into per tenant tasks via + the cloud_beat_task_generator + beat_multiplier: a multiplier that can be applied on top of the task schedule + to speed up or slow down the task generation rate. useful in production. + """ + cloud_tasks: list[dict] = copy.deepcopy(beat_tasks) + + # generate our cloud and self-hosted beat tasks from the templates + for beat_template in beat_templates: + cloud_task = make_cloud_generator_task(beat_template) + cloud_tasks.append(cloud_task) + + # factor in the cloud multiplier + for cloud_task in cloud_tasks: + cloud_task["schedule"] = cloud_task["schedule"] * beat_multiplier + + return cloud_tasks + + +def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: + return generate_cloud_tasks(beat_system_tasks, beat_task_templates, beat_multiplier) def get_tasks_to_schedule() -> list[dict[str, Any]]: diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index a85c70c5c..322195683 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -346,6 +346,9 @@ ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" # the tenant id we use for system level redis operations ONYX_CLOUD_TENANT_ID = "cloud" +# the redis namespace for runtime variables +ONYX_CLOUD_REDIS_RUNTIME = "runtime" + class OnyxCeleryTask: DEFAULT = "celery" From 5a4d007cf95b06dee9e3029dded2d4c6f3d349a4 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 10 Feb 2025 15:03:59 -0800 Subject: [PATCH 2/5] comments --- backend/onyx/background/celery/tasks/beat_schedule.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 40db8500c..34a0d26db 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -172,10 +172,15 @@ def generate_cloud_tasks( the cloud_beat_task_generator beat_multiplier: a multiplier that can be applied on top of the task schedule to speed up or slow down the task generation rate. useful in production. + + Returns a list of cloud tasks, which consists of incoming tasks + tasks generated + from incoming templates. """ + + # start with the incoming beat tasks cloud_tasks: list[dict] = copy.deepcopy(beat_tasks) - # generate our cloud and self-hosted beat tasks from the templates + # generate our cloud tasks from the templates for beat_template in beat_templates: cloud_task = make_cloud_generator_task(beat_template) cloud_tasks.append(cloud_task) From 9740ed32b5a906b2840b7be8605bcd7843046d19 Mon Sep 17 00:00:00 2001 From: Richard Kuo Date: Mon, 10 Feb 2025 20:48:55 -0800 Subject: [PATCH 3/5] fix reading redis values as floats --- .../background/celery/tasks/beat_schedule.py | 2 +- backend/onyx/background/celery/apps/beat.py | 25 ++++++++++++++----- .../background/celery/tasks/beat_schedule.py | 7 ++++-- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/backend/ee/onyx/background/celery/tasks/beat_schedule.py b/backend/ee/onyx/background/celery/tasks/beat_schedule.py index fa2bc0330..00283e179 100644 --- a/backend/ee/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/ee/onyx/background/celery/tasks/beat_schedule.py @@ -67,7 +67,7 @@ if not MULTI_TENANT: ] -def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: +def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]: beat_system_tasks = ee_beat_system_tasks + base_beat_system_tasks beat_task_templates = ee_beat_task_templates + base_beat_task_templates cloud_tasks = generate_cloud_tasks( diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 004d15b20..45fe040fc 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -58,7 +58,7 @@ class DynamicTenantScheduler(PersistentScheduler): self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - task_logger.info("Reload interval reached, initiating task update") + task_logger.debug("Reload interval reached, initiating task update") try: self._try_updating_schedule() except (AttributeError, KeyError): @@ -71,7 +71,7 @@ class DynamicTenantScheduler(PersistentScheduler): return retval def _generate_schedule( - self, tenant_ids: list[str] | list[None], beat_multiplier: int + self, tenant_ids: list[str] | list[None], beat_multiplier: float ) -> dict[str, dict[str, Any]]: """Given a list of tenant id's, generates a new beat schedule for celery.""" new_schedule: dict[str, dict[str, Any]] = {} @@ -149,7 +149,13 @@ class DynamicTenantScheduler(PersistentScheduler): beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier") if beat_multiplier_raw is not None: - beat_multiplier = cast(int, beat_multiplier_raw) + try: + beat_multiplier_bytes = cast(bytes, beat_multiplier_raw) + beat_multiplier = float(beat_multiplier_bytes.decode()) + except ValueError: + task_logger.error( + f"Invalid beat_multiplier value: {beat_multiplier_raw}" + ) new_schedule = self._generate_schedule(tenant_ids, beat_multiplier) @@ -169,10 +175,15 @@ class DynamicTenantScheduler(PersistentScheduler): if not do_update: # exit early if nothing changed + task_logger.info( + f"_try_updating_schedule - Schedule unchanged: " + f"tasks={len(new_schedule)} " + f"beat_multiplier={beat_multiplier}" + ) return # schedule needs updating - task_logger.info( + task_logger.debug( "Schedule update required", extra={ "new_tasks": len(new_schedule), @@ -199,14 +210,16 @@ class DynamicTenantScheduler(PersistentScheduler): # Ensure changes are persisted self.sync() - self.last_beat_multiplier = beat_multiplier - task_logger.info( f"_try_updating_schedule - Schedule updated: " + f"prev_num_tasks={len(current_schedule)} " + f"prev_beat_multiplier={self.last_beat_multiplier} " f"tasks={len(new_schedule)} " f"beat_multiplier={beat_multiplier}" ) + self.last_beat_multiplier = beat_multiplier + @staticmethod def _compare_schedules(schedule1: dict, schedule2: dict) -> bool: """Compare schedules by task name only to determine if an update is needed. diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 34a0d26db..ea483f469 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -19,7 +19,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_MULTIPLIER_DEFAULT = 8 +CLOUD_BEAT_MULTIPLIER_DEFAULT = 8.0 # tasks that run in either self-hosted on cloud beat_task_templates: list[dict] = [] @@ -177,6 +177,9 @@ def generate_cloud_tasks( from incoming templates. """ + if beat_multiplier <= 0: + raise ValueError("beat_multiplier must be positive!") + # start with the incoming beat tasks cloud_tasks: list[dict] = copy.deepcopy(beat_tasks) @@ -192,7 +195,7 @@ def generate_cloud_tasks( return cloud_tasks -def get_cloud_tasks_to_schedule(beat_multiplier: int) -> list[dict[str, Any]]: +def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]: return generate_cloud_tasks(beat_system_tasks, beat_task_templates, beat_multiplier) From 16d6d708f6d1ca51fcbe4c3405c8bdc4a12d96fc Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 11 Feb 2025 09:15:39 -0800 Subject: [PATCH 4/5] update logging --- backend/onyx/background/celery/apps/beat.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 45fe040fc..ab2d522d1 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -30,10 +30,9 @@ class DynamicTenantScheduler(PersistentScheduler): """This scheduler is useful because we can dynamically adjust task generation rates through it.""" - RELOAD_INTERVAL = 120 + RELOAD_INTERVAL = 60 def __init__(self, *args: Any, **kwargs: Any) -> None: - task_logger.info("Initializing DynamicTenantScheduler") super().__init__(*args, **kwargs) self.last_beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT @@ -46,7 +45,9 @@ class DynamicTenantScheduler(PersistentScheduler): # Let the parent class handle store initialization self.setup_schedule() self._try_updating_schedule() - task_logger.info(f"Setting reload interval to {self._reload_interval}") + task_logger.info( + f"DynamicTenantScheduler initialized: reload_interval={self._reload_interval}" + ) def setup_schedule(self) -> None: super().setup_schedule() From 5bf520d8b868e4505d1d294e6e53bb12018539bb Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 11 Feb 2025 14:04:49 -0800 Subject: [PATCH 5/5] comments --- backend/onyx/background/celery/apps/beat.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index ab2d522d1..8633f4169 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -78,7 +78,8 @@ class DynamicTenantScheduler(PersistentScheduler): new_schedule: dict[str, dict[str, Any]] = {} if MULTI_TENANT: - # cloud tasks only need the single task beat across all tenants + # cloud tasks are system wide and thus only need to be on the beat schedule + # once for all tenants get_cloud_tasks_to_schedule = fetch_versioned_implementation( "onyx.background.celery.tasks.beat_schedule", "get_cloud_tasks_to_schedule", @@ -100,6 +101,9 @@ class DynamicTenantScheduler(PersistentScheduler): new_schedule[task_name] = cloud_task # regular task beats are multiplied across all tenants + # note that currently this just schedules for a single tenant in self hosted + # and doesn't do anything in the cloud because it's much more scalable + # to schedule a single cloud beat task to dispatch per tenant tasks. get_tasks_to_schedule = fetch_versioned_implementation( "onyx.background.celery.tasks.beat_schedule", "get_tasks_to_schedule" )