diff --git a/backend/ee/onyx/background/celery/tasks/beat_schedule.py b/backend/ee/onyx/background/celery/tasks/beat_schedule.py index 2dbda2398..06d311bd6 100644 --- a/backend/ee/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/ee/onyx/background/celery/tasks/beat_schedule.py @@ -1,30 +1,70 @@ from datetime import timedelta from typing import Any +from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT from onyx.background.celery.tasks.beat_schedule import ( cloud_tasks_to_schedule as base_cloud_tasks_to_schedule, ) from onyx.background.celery.tasks.beat_schedule import ( tasks_to_schedule as base_tasks_to_schedule, ) +from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX +from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask +from shared_configs.configs import MULTI_TENANT -ee_tasks_to_schedule = [ +ee_cloud_tasks_to_schedule = [ { - "name": "autogenerate-usage-report", - "task": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, - "schedule": timedelta(days=30), # TODO: change this to config flag + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_autogenerate-usage-report", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(days=30), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, + }, }, { - "name": "check-ttl-management", - "task": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK, + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-ttl-management", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK, + }, }, ] +if not MULTI_TENANT: + ee_tasks_to_schedule = [ + { + "name": "autogenerate-usage-report", + "task": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, + "schedule": timedelta(days=30), # TODO: change this to config flag + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-ttl-management", + "task": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK, + "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + ] + def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: - return base_cloud_tasks_to_schedule + return ee_cloud_tasks_to_schedule + base_cloud_tasks_to_schedule def get_tasks_to_schedule() -> list[dict[str, Any]]: diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 02b24147d..4e866b3db 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -81,7 +81,7 @@ class DynamicTenantScheduler(PersistentScheduler): cloud_task = { "task": task["task"], "schedule": task["schedule"], - "kwargs": {}, + "kwargs": task.get("kwargs", {}), } if options := task.get("options"): logger.debug(f"Adding options to task {task_name}: {options}") diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 7d0aa9e9c..d2c01e159 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -17,134 +17,234 @@ from shared_configs.configs import MULTI_TENANT BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # tasks that only run in the cloud -# the name attribute must start with ONYX_CELERY_CLOUD_PREFIX = "cloud" to be filtered +# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered # by the DynamicTenantScheduler cloud_tasks_to_schedule = [ - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", - "task": OnyxCeleryTask.CLOUD_CHECK_FOR_INDEXING, - "schedule": timedelta(seconds=15), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, + # cloud specific tasks { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", "task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC, "schedule": timedelta(hours=1), "options": { + "queue": OnyxCeleryQueues.MONITORING, "priority": OnyxCeleryPriority.HIGH, "expires": BEAT_EXPIRES_DEFAULT, - "queue": OnyxCeleryQueues.MONITORING, }, }, -] - -# tasks that run in either self-hosted on cloud -tasks_to_schedule = [ + # remaining tasks are cloud generators for per tenant tasks { - "name": "check-for-vespa-sync", - "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-connector-deletion", - "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-prune", - "task": OnyxCeleryTask.CHECK_FOR_PRUNING, + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, "schedule": timedelta(seconds=15), "options": { - "priority": OnyxCeleryPriority.MEDIUM, + "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_INDEXING, + }, }, { - "name": "kombu-message-cleanup", - "task": OnyxCeleryTask.KOMBU_MESSAGE_CLEANUP_TASK, - "schedule": timedelta(seconds=3600), - "options": { - "priority": OnyxCeleryPriority.LOWEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-vespa-sync", - "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, - "schedule": timedelta(seconds=5), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-background-processes", - "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "schedule": timedelta(minutes=5), - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, - "queue": OnyxCeleryQueues.MONITORING, - }, - }, - { - "name": "check-for-doc-permissions-sync", - "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, - "schedule": timedelta(seconds=30), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-external-group-sync", - "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-connector-deletion", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, "schedule": timedelta(seconds=20), "options": { - "priority": OnyxCeleryPriority.MEDIUM, + "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-vespa-sync", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-prune", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_PRUNING, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-vespa-sync", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(seconds=5), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.MONITOR_VESPA_SYNC, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-doc-permissions-sync", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(seconds=30), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-external-group-sync", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + }, + }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-background-processes", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(minutes=5), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + "kwargs": { + "task_name": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + "queue": OnyxCeleryQueues.MONITORING, + "priority": OnyxCeleryPriority.LOW, + }, }, ] -if not MULTI_TENANT: - tasks_to_schedule.append( +if LLM_MODEL_UPDATE_API_URL: + cloud_tasks_to_schedule.append( { - "name": "check-for-indexing", - "task": OnyxCeleryTask.CHECK_FOR_INDEXING, - "schedule": timedelta(seconds=15), + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-llm-model-update", + "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + "schedule": timedelta(hours=1), # Check every hour "options": { - "priority": OnyxCeleryPriority.MEDIUM, + "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, }, + "kwargs": { + "task_name": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, + "priority": OnyxCeleryPriority.LOW, + }, } ) -# Only add the LLM model update task if the API URL is configured -if LLM_MODEL_UPDATE_API_URL: - tasks_to_schedule.append( - { - "name": "check-for-llm-model-update", - "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, - "schedule": timedelta(hours=1), # Check every hour - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, +# tasks that run in either self-hosted on cloud +tasks_to_schedule: list[dict] = [] + +if not MULTI_TENANT: + tasks_to_schedule.extend( + [ + { + "name": "check-for-indexing", + "task": OnyxCeleryTask.CHECK_FOR_INDEXING, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - } + { + "name": "check-for-connector-deletion", + "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-vespa-sync", + "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-pruning", + "task": OnyxCeleryTask.CHECK_FOR_PRUNING, + "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "monitor-vespa-sync", + "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, + "schedule": timedelta(seconds=5), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-doc-permissions-sync", + "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, + "schedule": timedelta(seconds=30), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "check-for-external-group-sync", + "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, + { + "name": "monitor-background-processes", + "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + "schedule": timedelta(minutes=5), + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + "queue": OnyxCeleryQueues.MONITORING, + }, + }, + ] ) + # Only add the LLM model update task if the API URL is configured + if LLM_MODEL_UPDATE_API_URL: + tasks_to_schedule.append( + { + "name": "check-for-llm-model-update", + "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, + "schedule": timedelta(hours=1), # Check every hour + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + }, + } + ) + def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: return cloud_tasks_to_schedule diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index e3987e490..309e03818 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -15,7 +15,6 @@ from redis import Redis from redis.lock import Lock as RedisLock from onyx.background.celery.apps.app_base import task_logger -from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT from onyx.background.celery.tasks.indexing.utils import _should_index from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids from onyx.background.celery.tasks.indexing.utils import IndexingCallback @@ -26,15 +25,12 @@ from onyx.background.indexing.run_indexing import run_indexing_entrypoint from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT -from onyx.configs.constants import ONYX_CLOUD_TENANT_ID -from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_ccpair_with_indexing_trigger from onyx.db.connector_credential_pair import fetch_connector_credential_pairs from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id -from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import get_session_with_tenant from onyx.db.enums import IndexingMode from onyx.db.index_attempt import get_index_attempt @@ -687,61 +683,3 @@ def connector_indexing_proxy_task( f"search_settings={search_settings_id}" ) return - - -@shared_task( - name=OnyxCeleryTask.CLOUD_CHECK_FOR_INDEXING, - trail=False, - bind=True, -) -def cloud_check_for_indexing(self: Task) -> bool | None: - """a lightweight task used to kick off individual check tasks for each tenant.""" - time_start = time.monotonic() - - redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID) - - lock_beat: RedisLock = redis_client.lock( - OnyxRedisLocks.CLOUD_CHECK_INDEXING_BEAT_LOCK, - timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, - ) - - # these tasks should never overlap - if not lock_beat.acquire(blocking=False): - return None - - last_lock_time = time.monotonic() - - try: - tenant_ids = get_all_tenant_ids() - for tenant_id in tenant_ids: - current_time = time.monotonic() - if current_time - last_lock_time >= (CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4): - lock_beat.reacquire() - last_lock_time = current_time - - self.app.send_task( - OnyxCeleryTask.CHECK_FOR_INDEXING, - kwargs=dict( - tenant_id=tenant_id, - ), - priority=OnyxCeleryPriority.HIGH, - expires=BEAT_EXPIRES_DEFAULT, - ) - except SoftTimeLimitExceeded: - task_logger.info( - "Soft time limit exceeded, task is being terminated gracefully." - ) - except Exception: - task_logger.exception("Unexpected exception during cloud indexing check") - finally: - if lock_beat.owned(): - lock_beat.release() - else: - task_logger.error("cloud_check_for_indexing - Lock not owned on completion") - redis_lock_dump(lock_beat, redis_client) - - time_elapsed = time.monotonic() - time_start - task_logger.info( - f"cloud_check_for_indexing finished: num_tenants={len(tenant_ids)} elapsed={time_elapsed:.2f}" - ) - return True diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index b078c282b..0dba3e6dc 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -1,15 +1,22 @@ +import time from http import HTTPStatus import httpx from celery import shared_task from celery import Task from celery.exceptions import SoftTimeLimitExceeded +from redis.lock import Lock as RedisLock from tenacity import RetryError from onyx.access.access import get_access_for_document from onyx.background.celery.apps.app_base import task_logger +from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import ONYX_CLOUD_TENANT_ID +from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisLocks from onyx.db.document import delete_document_by_connector_credential_pair__no_commit from onyx.db.document import delete_documents_complete__no_commit from onyx.db.document import fetch_chunk_count_for_document @@ -18,10 +25,13 @@ from onyx.db.document import get_document_connector_count from onyx.db.document import mark_document_as_modified from onyx.db.document import mark_document_as_synced from onyx.db.document_set import fetch_document_sets_for_document +from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import get_session_with_tenant from onyx.document_index.document_index_utils import get_both_index_names from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import VespaDocumentFields +from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import redis_lock_dump from onyx.server.documents.models import ConnectorCredentialPairIdentifier DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3 @@ -199,3 +209,73 @@ def document_by_cc_pair_cleanup_task( return False return True + + +@shared_task( + name=OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, + trail=False, + bind=True, +) +def cloud_beat_task_generator( + self: Task, + task_name: str, + queue: str = OnyxCeleryTask.DEFAULT, + priority: int = OnyxCeleryPriority.MEDIUM, + expires: int = BEAT_EXPIRES_DEFAULT, +) -> bool | None: + """a lightweight task used to kick off individual beat tasks per tenant.""" + time_start = time.monotonic() + + redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID) + + lock_beat: RedisLock = redis_client.lock( + f"{OnyxRedisLocks.CLOUD_BEAT_TASK_GENERATOR_LOCK}:{task_name}", + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, + ) + + # these tasks should never overlap + if not lock_beat.acquire(blocking=False): + return None + + last_lock_time = time.monotonic() + + try: + tenant_ids = get_all_tenant_ids() + for tenant_id in tenant_ids: + current_time = time.monotonic() + if current_time - last_lock_time >= (CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4): + lock_beat.reacquire() + last_lock_time = current_time + + self.app.send_task( + task_name, + kwargs=dict( + tenant_id=tenant_id, + ), + queue=queue, + priority=priority, + expires=expires, + ) + except SoftTimeLimitExceeded: + task_logger.info( + "Soft time limit exceeded, task is being terminated gracefully." + ) + except Exception: + task_logger.exception("Unexpected exception during cloud_beat_task_generator") + finally: + if not lock_beat.owned(): + task_logger.error( + "cloud_beat_task_generator - Lock not owned on completion" + ) + redis_lock_dump(lock_beat, redis_client) + else: + lock_beat.release() + + time_elapsed = time.monotonic() - time_start + task_logger.info( + f"cloud_beat_task_generator finished: " + f"task={task_name} " + f"num_tenants={len(tenant_ids)} " + f"elapsed={time_elapsed:.2f}" + ) + return True diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 34156324c..09a90a9ee 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -294,7 +294,7 @@ class OnyxRedisLocks: SLACK_BOT_HEARTBEAT_PREFIX = "da_heartbeat:slack_bot" ANONYMOUS_USER_ENABLED = "anonymous_user_enabled" - CLOUD_CHECK_INDEXING_BEAT_LOCK = "da_lock:cloud_check_indexing_beat" + CLOUD_BEAT_TASK_GENERATOR_LOCK = "da_lock:cloud_beat_task_generator" CLOUD_CHECK_ALEMBIC_BEAT_LOCK = "da_lock:cloud_check_alembic" @@ -318,6 +318,11 @@ ONYX_CLOUD_TENANT_ID = "cloud" class OnyxCeleryTask: + DEFAULT = "celery" + + CLOUD_BEAT_TASK_GENERATOR = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_generate_beat_tasks" + CLOUD_CHECK_ALEMBIC = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_alembic" + CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task" CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task" CHECK_FOR_INDEXING = "check_for_indexing" @@ -325,8 +330,10 @@ class OnyxCeleryTask: CHECK_FOR_DOC_PERMISSIONS_SYNC = "check_for_doc_permissions_sync" CHECK_FOR_EXTERNAL_GROUP_SYNC = "check_for_external_group_sync" CHECK_FOR_LLM_MODEL_UPDATE = "check_for_llm_model_update" + MONITOR_VESPA_SYNC = "monitor_vespa_sync" MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes" + KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task" CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = ( "connector_permission_sync_generator_task" @@ -344,9 +351,6 @@ class OnyxCeleryTask: CHECK_TTL_MANAGEMENT_TASK = "check_ttl_management_task" AUTOGENERATE_USAGE_REPORT_TASK = "autogenerate_usage_report_task" - CLOUD_CHECK_FOR_INDEXING = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_for_indexing" - CLOUD_CHECK_ALEMBIC = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_alembic" - REDIS_SOCKET_KEEPALIVE_OPTIONS = {} REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15