mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-29 17:20:44 +02:00
Bugfix/beat templates (#3754)
* WIP * migrate most beat tasks to fan out strategy * fix kwargs * migrate EE tasks * lock on the task_name level * typo fix * transform beat tasks for cloud * cloud multiplier is only for cloud tasks * bumpity --------- Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
@ -133,3 +133,4 @@ Looking to contribute? Please check out the [Contribution Guide](CONTRIBUTING.md
|
|||||||
## ⭐Star History
|
## ⭐Star History
|
||||||
|
|
||||||
[](https://star-history.com/#onyx-dot-app/onyx&Date)
|
[](https://star-history.com/#onyx-dot-app/onyx&Date)
|
||||||
|
|
||||||
|
@ -20,236 +20,151 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)
|
|||||||
# we have a better implementation (backpressure, etc)
|
# we have a better implementation (backpressure, etc)
|
||||||
CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4
|
CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4
|
||||||
|
|
||||||
|
# tasks that run in either self-hosted on cloud
|
||||||
|
beat_task_templates: list[dict] = []
|
||||||
|
|
||||||
|
beat_task_templates.extend(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "check-for-indexing",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_INDEXING,
|
||||||
|
"schedule": timedelta(seconds=15),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "check-for-connector-deletion",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
|
||||||
|
"schedule": timedelta(seconds=20),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "check-for-vespa-sync",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
|
||||||
|
"schedule": timedelta(seconds=20),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "check-for-pruning",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_PRUNING,
|
||||||
|
"schedule": timedelta(hours=1),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "monitor-vespa-sync",
|
||||||
|
"task": OnyxCeleryTask.MONITOR_VESPA_SYNC,
|
||||||
|
"schedule": timedelta(seconds=5),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "check-for-doc-permissions-sync",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
|
||||||
|
"schedule": timedelta(seconds=30),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "check-for-external-group-sync",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
|
||||||
|
"schedule": timedelta(seconds=20),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.MEDIUM,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "monitor-background-processes",
|
||||||
|
"task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
|
||||||
|
"schedule": timedelta(minutes=5),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.LOW,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
"queue": OnyxCeleryQueues.MONITORING,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only add the LLM model update task if the API URL is configured
|
||||||
|
if LLM_MODEL_UPDATE_API_URL:
|
||||||
|
beat_task_templates.append(
|
||||||
|
{
|
||||||
|
"name": "check-for-llm-model-update",
|
||||||
|
"task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE,
|
||||||
|
"schedule": timedelta(hours=1), # Check every hour
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.LOW,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
cloud_task: dict[str, Any] = {}
|
||||||
|
|
||||||
|
# constant options for cloud beat task generators
|
||||||
|
task_schedule: timedelta = task["schedule"]
|
||||||
|
cloud_task["schedule"] = task_schedule * CLOUD_BEAT_SCHEDULE_MULTIPLIER
|
||||||
|
cloud_task["options"] = {}
|
||||||
|
cloud_task["options"]["priority"] = OnyxCeleryPriority.HIGHEST
|
||||||
|
cloud_task["options"]["expires"] = BEAT_EXPIRES_DEFAULT
|
||||||
|
|
||||||
|
# settings dependent on the original task
|
||||||
|
cloud_task["name"] = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_{task['name']}"
|
||||||
|
cloud_task["task"] = OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR
|
||||||
|
cloud_task["kwargs"] = {}
|
||||||
|
cloud_task["kwargs"]["task_name"] = task["task"]
|
||||||
|
|
||||||
|
optional_fields = ["queue", "priority", "expires"]
|
||||||
|
for field in optional_fields:
|
||||||
|
if field in task["options"]:
|
||||||
|
cloud_task["kwargs"][field] = task["options"][field]
|
||||||
|
|
||||||
|
return cloud_task
|
||||||
|
|
||||||
|
|
||||||
# tasks that only run in the cloud
|
# tasks that only run in the cloud
|
||||||
# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered
|
# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered
|
||||||
# by the DynamicTenantScheduler
|
# by the DynamicTenantScheduler
|
||||||
cloud_tasks_to_schedule = [
|
cloud_tasks_to_schedule: list[dict] = [
|
||||||
# cloud specific tasks
|
# cloud specific tasks
|
||||||
{
|
{
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic",
|
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic",
|
||||||
"task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC,
|
"task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC,
|
||||||
"schedule": timedelta(hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
"schedule": timedelta(hours=1),
|
||||||
"options": {
|
"options": {
|
||||||
"queue": OnyxCeleryQueues.MONITORING,
|
"queue": OnyxCeleryQueues.MONITORING,
|
||||||
"priority": OnyxCeleryPriority.HIGH,
|
"priority": OnyxCeleryPriority.HIGH,
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
# remaining tasks are cloud generators for per tenant tasks
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_INDEXING,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-connector-deletion",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-vespa-sync",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-prune",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_PRUNING,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-vespa-sync",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.MONITOR_VESPA_SYNC,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-doc-permissions-sync",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=30 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-external-group-sync",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-background-processes",
|
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(minutes=5 * CLOUD_BEAT_SCHEDULE_MULTIPLIER),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
|
|
||||||
"queue": OnyxCeleryQueues.MONITORING,
|
|
||||||
"priority": OnyxCeleryPriority.LOW,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
]
|
]
|
||||||
|
|
||||||
if LLM_MODEL_UPDATE_API_URL:
|
# generate our cloud and self-hosted beat tasks from the templates
|
||||||
cloud_tasks_to_schedule.append(
|
for beat_task_template in beat_task_templates:
|
||||||
{
|
cloud_task = make_cloud_generator_task(beat_task_template)
|
||||||
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-llm-model-update",
|
cloud_tasks_to_schedule.append(cloud_task)
|
||||||
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
|
|
||||||
"schedule": timedelta(
|
|
||||||
hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER
|
|
||||||
), # Check every hour
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.HIGHEST,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
"kwargs": {
|
|
||||||
"task_name": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE,
|
|
||||||
"priority": OnyxCeleryPriority.LOW,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# tasks that run in either self-hosted on cloud
|
|
||||||
tasks_to_schedule: list[dict] = []
|
tasks_to_schedule: list[dict] = []
|
||||||
|
|
||||||
if not MULTI_TENANT:
|
if not MULTI_TENANT:
|
||||||
tasks_to_schedule.extend(
|
tasks_to_schedule = beat_task_templates
|
||||||
[
|
|
||||||
{
|
|
||||||
"name": "check-for-indexing",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_INDEXING,
|
|
||||||
"schedule": timedelta(seconds=15),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "check-for-connector-deletion",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
|
|
||||||
"schedule": timedelta(seconds=20),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "check-for-vespa-sync",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
|
|
||||||
"schedule": timedelta(seconds=20),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "check-for-pruning",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_PRUNING,
|
|
||||||
"schedule": timedelta(hours=1),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "monitor-vespa-sync",
|
|
||||||
"task": OnyxCeleryTask.MONITOR_VESPA_SYNC,
|
|
||||||
"schedule": timedelta(seconds=5),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "check-for-doc-permissions-sync",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
|
|
||||||
"schedule": timedelta(seconds=30),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "check-for-external-group-sync",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
|
|
||||||
"schedule": timedelta(seconds=20),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.MEDIUM,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "monitor-background-processes",
|
|
||||||
"task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
|
|
||||||
"schedule": timedelta(minutes=15),
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.LOW,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
"queue": OnyxCeleryQueues.MONITORING,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Only add the LLM model update task if the API URL is configured
|
|
||||||
if LLM_MODEL_UPDATE_API_URL:
|
|
||||||
tasks_to_schedule.append(
|
|
||||||
{
|
|
||||||
"name": "check-for-llm-model-update",
|
|
||||||
"task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE,
|
|
||||||
"schedule": timedelta(hours=1), # Check every hour
|
|
||||||
"options": {
|
|
||||||
"priority": OnyxCeleryPriority.LOW,
|
|
||||||
"expires": BEAT_EXPIRES_DEFAULT,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]:
|
def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]:
|
||||||
|
Reference in New Issue
Block a user