From 05cb7bfd51efcb6d9609293bb78f4d9d4bcdf173 Mon Sep 17 00:00:00 2001 From: pablonyx Date: Thu, 6 Mar 2025 16:39:39 -0800 Subject: [PATCH] nit --- .../tasks/periodic/tenant_provisioning.py | 178 ------------------ .../celery/tasks/tenant_provisioning/tasks.py | 2 - 2 files changed, 180 deletions(-) delete mode 100644 backend/onyx/background/celery/tasks/periodic/tenant_provisioning.py diff --git a/backend/onyx/background/celery/tasks/periodic/tenant_provisioning.py b/backend/onyx/background/celery/tasks/periodic/tenant_provisioning.py deleted file mode 100644 index 8e9b0c4df..000000000 --- a/backend/onyx/background/celery/tasks/periodic/tenant_provisioning.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Periodic tasks for tenant pre-provisioning. -""" -import asyncio -import datetime -import uuid - -from celery import shared_task -from celery import Task -from redis.lock import Lock as RedisLock -from sqlalchemy.orm import Session - -from onyx.background.celery.apps.app_base import task_logger -from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS -from onyx.configs.constants import OnyxCeleryPriority -from onyx.configs.constants import OnyxCeleryQueues -from onyx.configs.constants import OnyxCeleryTask -from onyx.configs.constants import OnyxRedisLocks -from onyx.db.engine import get_sqlalchemy_engine -from onyx.db.models import AvailableTenant -from onyx.redis.redis_pool import get_redis_client -from shared_configs.configs import MULTI_TENANT -from shared_configs.configs import TENANT_ID_PREFIX - -# Default number of pre-provisioned tenants to maintain -DEFAULT_TARGET_AVAILABLE_TENANTS = 5 - -# Soft time limit for tenant pre-provisioning tasks (in seconds) -_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes -# Hard time limit for tenant pre-provisioning tasks (in seconds) -_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes - - -@shared_task( - name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, - ignore_result=True, - soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, - time_limit=_TENANT_PROVISIONING_TIME_LIMIT, - queue=OnyxCeleryQueues.PRIMARY, - bind=True, -) -def check_available_tenants(self: Task) -> None: - """ - Check if we have enough pre-provisioned tenants available. - If not, trigger the pre-provisioning of new tenants. - """ - task_logger.warning("STARTING CHECK_AVAILABLE_TENANTS") - if not MULTI_TENANT: - task_logger.warning( - "Multi-tenancy is not enabled, skipping tenant pre-provisioning" - ) - return - - r = get_redis_client() - lock_check: RedisLock = r.lock( - OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK, - timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, - ) - - # These tasks should never overlap - if not lock_check.acquire(blocking=False): - task_logger.warning( - "Skipping check_available_tenants task because it is already running" - ) - return - - try: - # Get the current count of available tenants - with Session(get_sqlalchemy_engine()) as db_session: - available_tenants_count = db_session.query(AvailableTenant).count() - - # Get the target number of available tenants - target_available_tenants = getattr( - TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS - ) - - # Calculate how many new tenants we need to provision - tenants_to_provision = max( - 0, target_available_tenants - available_tenants_count - ) - - task_logger.warning( - f"Available tenants: {available_tenants_count}, " - f"Target: {target_available_tenants}, " - f"To provision: {tenants_to_provision}" - ) - - # Trigger pre-provisioning tasks for each tenant needed - for _ in range(tenants_to_provision): - pre_provision_tenant.apply_async( - priority=OnyxCeleryPriority.LOW, - ) - - except Exception as e: - task_logger.exception(f"Error in check_available_tenants task: {e}") - - finally: - lock_check.release() - - -@shared_task( - name=OnyxCeleryTask.PRE_PROVISION_TENANT, - ignore_result=True, - soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, - time_limit=_TENANT_PROVISIONING_TIME_LIMIT, - queue=OnyxCeleryQueues.PRIMARY, - bind=True, -) -def pre_provision_tenant(self: Task) -> None: - """ - Pre-provision a new tenant and store it in the NewAvailableTenant table. - This function fully sets up the tenant with all necessary configurations, - so it's ready to be assigned to a user immediately. - """ - task_logger.warning("STARTING PRE_PROVISION_TENANT") - if not MULTI_TENANT: - task_logger.warning( - "Multi-tenancy is not enabled, skipping tenant pre-provisioning" - ) - return - r = get_redis_client() - lock_provision: RedisLock = r.lock( - OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK, - timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, - ) - - # Allow multiple pre-provisioning tasks to run, but ensure they don't overlap - if not lock_provision.acquire(blocking=False): - task_logger.warning( - "Skipping pre_provision_tenant task because it is already running" - ) - return - - try: - # Generate a new tenant ID - tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4()) - task_logger.warning(f"Starting pre-provisioning for tenant {tenant_id}") - - # Import here to avoid circular imports - from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists - from ee.onyx.server.tenants.schema_management import get_current_alembic_version - from ee.onyx.server.tenants.provisioning import setup_tenant - - # Create the schema for the new tenant - schema_created = create_schema_if_not_exists(tenant_id) - if schema_created: - task_logger.warning(f"Created schema for tenant '{tenant_id}'") - else: - task_logger.warning(f"Schema already exists for tenant '{tenant_id}'") - - # Set up the tenant with all necessary configurations - task_logger.warning(f"Setting up tenant configuration for '{tenant_id}'") - asyncio.run(setup_tenant(tenant_id)) - task_logger.warning(f"Tenant configuration completed for '{tenant_id}'") - - # Get the current Alembic version - alembic_version = get_current_alembic_version(tenant_id) - task_logger.warning( - f"Tenant '{tenant_id}' using Alembic version: {alembic_version}" - ) - - # Store the pre-provisioned tenant in the database - task_logger.warning(f"Storing pre-provisioned tenant '{tenant_id}' in database") - with Session(get_sqlalchemy_engine()) as db_session: - new_tenant = AvailableTenant( - tenant_id=tenant_id, - alembic_version=alembic_version, - date_created=datetime.datetime.now(), - ) - db_session.add(new_tenant) - db_session.commit() - - task_logger.warning(f"Successfully pre-provisioned tenant {tenant_id}") - - except Exception as e: - task_logger.exception(f"Error in pre_provision_tenant task: {e}") - finally: - lock_provision.release() diff --git a/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py index d2bd134bf..62c10cf6f 100644 --- a/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py +++ b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py @@ -139,8 +139,6 @@ def pre_provision_tenant(self: Task) -> None: tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4()) task_logger.info(f"Starting pre-provisioning for tenant {tenant_id}") - # Import here to avoid circular imports - # Create the schema for the new tenant schema_created = create_schema_if_not_exists(tenant_id) if schema_created: