From a222fae7c8414c1a119739d9f9b79af7a1b240c4 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Fri, 7 Feb 2025 22:57:57 -0800 Subject: [PATCH] Bugfix/beat templates (#3754) * WIP * migrate most beat tasks to fan out strategy * fix kwargs * migrate EE tasks * lock on the task_name level * typo fix * transform beat tasks for cloud * cloud multiplier is only for cloud tasks * bumpity --------- Co-authored-by: Richard Kuo (Danswer) --- README.md | 1 + .../background/celery/tasks/beat_schedule.py | 339 +++++++----------- 2 files changed, 128 insertions(+), 212 deletions(-) diff --git a/README.md b/README.md index 7188faed2..86b31be58 100644 --- a/README.md +++ b/README.md @@ -133,3 +133,4 @@ Looking to contribute? Please check out the [Contribution Guide](CONTRIBUTING.md ## ⭐Star History [![Star History Chart](https://api.star-history.com/svg?repos=onyx-dot-app/onyx&type=Date)](https://star-history.com/#onyx-dot-app/onyx&Date) + diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 6db15cd83..0309611ac 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -20,236 +20,151 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # we have a better implementation (backpressure, etc) CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 +# tasks that run in either self-hosted on cloud +beat_task_templates: list[dict] = [] + +beat_task_templates.extend( + [ + { + "name": "check-for-indexing", + "task": OnyxCeleryTask.CHECK_FOR_INDEXING, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-connector-deletion", + "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-vespa-sync", + "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-pruning", + "task": OnyxCeleryTask.CHECK_FOR_PRUNING, + "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "monitor-vespa-sync", + "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, + "schedule": timedelta(seconds=5), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-doc-permissions-sync", + "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, + "schedule": timedelta(seconds=30), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-external-group-sync", + "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "monitor-background-processes", + "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + "schedule": timedelta(minutes=5), + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + "queue": OnyxCeleryQueues.MONITORING, + }, + }, + ] +) + +# Only add the LLM model update task if the API URL is configured +if LLM_MODEL_UPDATE_API_URL: + beat_task_templates.append( + { + "name": "check-for-llm-model-update", + "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, + "schedule": timedelta(hours=1), # Check every hour + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + }, + } + ) + + +def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]: + cloud_task: dict[str, Any] = {} + + # constant options for cloud beat task generators + task_schedule: timedelta = task["schedule"] + cloud_task["schedule"] = task_schedule * CLOUD_BEAT_SCHEDULE_MULTIPLIER + cloud_task["options"] = {} + cloud_task["options"]["priority"] = OnyxCeleryPriority.HIGHEST + cloud_task["options"]["expires"] = BEAT_EXPIRES_DEFAULT + + # settings dependent on the original task + cloud_task["name"] = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_{task['name']}" + cloud_task["task"] = OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR + cloud_task["kwargs"] = {} + cloud_task["kwargs"]["task_name"] = task["task"] + + optional_fields = ["queue", "priority", "expires"] + for field in optional_fields: + if field in task["options"]: + cloud_task["kwargs"][field] = task["options"][field] + + return cloud_task + + # tasks that only run in the cloud # the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered # by the DynamicTenantScheduler -cloud_tasks_to_schedule = [ +cloud_tasks_to_schedule: list[dict] = [ # cloud specific tasks { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", "task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC, - "schedule": timedelta(hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), + "schedule": timedelta(hours=1), "options": { "queue": OnyxCeleryQueues.MONITORING, "priority": OnyxCeleryPriority.HIGH, "expires": BEAT_EXPIRES_DEFAULT, }, }, - # remaining tasks are cloud generators for per tenant tasks - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_INDEXING, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-connector-deletion", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-vespa-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-prune", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_PRUNING, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-vespa-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.MONITOR_VESPA_SYNC, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-doc-permissions-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=30 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-external-group-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-background-processes", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(minutes=5 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "queue": OnyxCeleryQueues.MONITORING, - "priority": OnyxCeleryPriority.LOW, - }, - }, ] -if LLM_MODEL_UPDATE_API_URL: - cloud_tasks_to_schedule.append( - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-llm-model-update", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta( - hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER - ), # Check every hour - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, - "priority": OnyxCeleryPriority.LOW, - }, - } - ) +# generate our cloud and self-hosted beat tasks from the templates +for beat_task_template in beat_task_templates: + cloud_task = make_cloud_generator_task(beat_task_template) + cloud_tasks_to_schedule.append(cloud_task) -# tasks that run in either self-hosted on cloud tasks_to_schedule: list[dict] = [] - if not MULTI_TENANT: - tasks_to_schedule.extend( - [ - { - "name": "check-for-indexing", - "task": OnyxCeleryTask.CHECK_FOR_INDEXING, - "schedule": timedelta(seconds=15), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-connector-deletion", - "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-vespa-sync", - "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-pruning", - "task": OnyxCeleryTask.CHECK_FOR_PRUNING, - "schedule": timedelta(hours=1), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-vespa-sync", - "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, - "schedule": timedelta(seconds=5), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-doc-permissions-sync", - "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, - "schedule": timedelta(seconds=30), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-external-group-sync", - "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-background-processes", - "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "schedule": timedelta(minutes=15), - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, - "queue": OnyxCeleryQueues.MONITORING, - }, - }, - ] - ) - - # Only add the LLM model update task if the API URL is configured - if LLM_MODEL_UPDATE_API_URL: - tasks_to_schedule.append( - { - "name": "check-for-llm-model-update", - "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, - "schedule": timedelta(hours=1), # Check every hour - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, - }, - } - ) + tasks_to_schedule = beat_task_templates def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: