mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-07 19:38:19 +02:00
nit
This commit is contained in:
parent
ee89591060
commit
05cb7bfd51
@ -1,178 +0,0 @@
|
||||
"""
|
||||
Periodic tasks for tenant pre-provisioning.
|
||||
"""
|
||||
import asyncio
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from celery import shared_task
|
||||
from celery import Task
|
||||
from redis.lock import Lock as RedisLock
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.background.celery.apps.app_base import task_logger
|
||||
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
|
||||
from onyx.configs.constants import OnyxCeleryPriority
|
||||
from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.engine import get_sqlalchemy_engine
|
||||
from onyx.db.models import AvailableTenant
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.configs import TENANT_ID_PREFIX
|
||||
|
||||
# Default number of pre-provisioned tenants to maintain
|
||||
DEFAULT_TARGET_AVAILABLE_TENANTS = 5
|
||||
|
||||
# Soft time limit for tenant pre-provisioning tasks (in seconds)
|
||||
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
|
||||
# Hard time limit for tenant pre-provisioning tasks (in seconds)
|
||||
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
|
||||
ignore_result=True,
|
||||
soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
|
||||
time_limit=_TENANT_PROVISIONING_TIME_LIMIT,
|
||||
queue=OnyxCeleryQueues.PRIMARY,
|
||||
bind=True,
|
||||
)
|
||||
def check_available_tenants(self: Task) -> None:
|
||||
"""
|
||||
Check if we have enough pre-provisioned tenants available.
|
||||
If not, trigger the pre-provisioning of new tenants.
|
||||
"""
|
||||
task_logger.warning("STARTING CHECK_AVAILABLE_TENANTS")
|
||||
if not MULTI_TENANT:
|
||||
task_logger.warning(
|
||||
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
|
||||
)
|
||||
return
|
||||
|
||||
r = get_redis_client()
|
||||
lock_check: RedisLock = r.lock(
|
||||
OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK,
|
||||
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
|
||||
)
|
||||
|
||||
# These tasks should never overlap
|
||||
if not lock_check.acquire(blocking=False):
|
||||
task_logger.warning(
|
||||
"Skipping check_available_tenants task because it is already running"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Get the current count of available tenants
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
available_tenants_count = db_session.query(AvailableTenant).count()
|
||||
|
||||
# Get the target number of available tenants
|
||||
target_available_tenants = getattr(
|
||||
TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS
|
||||
)
|
||||
|
||||
# Calculate how many new tenants we need to provision
|
||||
tenants_to_provision = max(
|
||||
0, target_available_tenants - available_tenants_count
|
||||
)
|
||||
|
||||
task_logger.warning(
|
||||
f"Available tenants: {available_tenants_count}, "
|
||||
f"Target: {target_available_tenants}, "
|
||||
f"To provision: {tenants_to_provision}"
|
||||
)
|
||||
|
||||
# Trigger pre-provisioning tasks for each tenant needed
|
||||
for _ in range(tenants_to_provision):
|
||||
pre_provision_tenant.apply_async(
|
||||
priority=OnyxCeleryPriority.LOW,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
task_logger.exception(f"Error in check_available_tenants task: {e}")
|
||||
|
||||
finally:
|
||||
lock_check.release()
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.PRE_PROVISION_TENANT,
|
||||
ignore_result=True,
|
||||
soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
|
||||
time_limit=_TENANT_PROVISIONING_TIME_LIMIT,
|
||||
queue=OnyxCeleryQueues.PRIMARY,
|
||||
bind=True,
|
||||
)
|
||||
def pre_provision_tenant(self: Task) -> None:
|
||||
"""
|
||||
Pre-provision a new tenant and store it in the NewAvailableTenant table.
|
||||
This function fully sets up the tenant with all necessary configurations,
|
||||
so it's ready to be assigned to a user immediately.
|
||||
"""
|
||||
task_logger.warning("STARTING PRE_PROVISION_TENANT")
|
||||
if not MULTI_TENANT:
|
||||
task_logger.warning(
|
||||
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
|
||||
)
|
||||
return
|
||||
r = get_redis_client()
|
||||
lock_provision: RedisLock = r.lock(
|
||||
OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK,
|
||||
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
|
||||
)
|
||||
|
||||
# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap
|
||||
if not lock_provision.acquire(blocking=False):
|
||||
task_logger.warning(
|
||||
"Skipping pre_provision_tenant task because it is already running"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Generate a new tenant ID
|
||||
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
|
||||
task_logger.warning(f"Starting pre-provisioning for tenant {tenant_id}")
|
||||
|
||||
# Import here to avoid circular imports
|
||||
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
|
||||
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
|
||||
from ee.onyx.server.tenants.provisioning import setup_tenant
|
||||
|
||||
# Create the schema for the new tenant
|
||||
schema_created = create_schema_if_not_exists(tenant_id)
|
||||
if schema_created:
|
||||
task_logger.warning(f"Created schema for tenant '{tenant_id}'")
|
||||
else:
|
||||
task_logger.warning(f"Schema already exists for tenant '{tenant_id}'")
|
||||
|
||||
# Set up the tenant with all necessary configurations
|
||||
task_logger.warning(f"Setting up tenant configuration for '{tenant_id}'")
|
||||
asyncio.run(setup_tenant(tenant_id))
|
||||
task_logger.warning(f"Tenant configuration completed for '{tenant_id}'")
|
||||
|
||||
# Get the current Alembic version
|
||||
alembic_version = get_current_alembic_version(tenant_id)
|
||||
task_logger.warning(
|
||||
f"Tenant '{tenant_id}' using Alembic version: {alembic_version}"
|
||||
)
|
||||
|
||||
# Store the pre-provisioned tenant in the database
|
||||
task_logger.warning(f"Storing pre-provisioned tenant '{tenant_id}' in database")
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
new_tenant = AvailableTenant(
|
||||
tenant_id=tenant_id,
|
||||
alembic_version=alembic_version,
|
||||
date_created=datetime.datetime.now(),
|
||||
)
|
||||
db_session.add(new_tenant)
|
||||
db_session.commit()
|
||||
|
||||
task_logger.warning(f"Successfully pre-provisioned tenant {tenant_id}")
|
||||
|
||||
except Exception as e:
|
||||
task_logger.exception(f"Error in pre_provision_tenant task: {e}")
|
||||
finally:
|
||||
lock_provision.release()
|
@ -139,8 +139,6 @@ def pre_provision_tenant(self: Task) -> None:
|
||||
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
|
||||
task_logger.info(f"Starting pre-provisioning for tenant {tenant_id}")
|
||||
|
||||
# Import here to avoid circular imports
|
||||
|
||||
# Create the schema for the new tenant
|
||||
schema_created = create_schema_if_not_exists(tenant_id)
|
||||
if schema_created:
|
||||
|
Loading…
x
Reference in New Issue
Block a user