This commit is contained in:
pablonyx 2025-03-06 10:22:14 -08:00
parent 2b1ec9c454
commit 55b0b02068
2 changed files with 107 additions and 128 deletions

View File

@ -68,11 +68,42 @@ async def get_or_provision_tenant(
await submit_to_hubspot(email, referral_source, request)
try:
# First, check if the user already has a tenant
tenant_id = get_tenant_id_for_email(email)
return tenant_id
except exceptions.UserNotExists:
# If tenant does not exist and in Multi tenant mode, provision a new tenant
# User doesn't exist, so we need to create a new tenant or assign an existing one
try:
tenant_id = await create_tenant(email, referral_source)
# Try to get a pre-provisioned tenant
tenant_id = await get_available_tenant()
if tenant_id:
# If we have a pre-provisioned tenant, just add the user to it
add_users_to_tenant([email], tenant_id)
# Create milestone record
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
create_milestone_and_report(
user=None,
distinct_id=tenant_id,
event_type=MilestoneRecordType.TENANT_CREATED,
properties={
"email": email,
},
db_session=db_session,
)
# Notify control plane
if not DEV_MODE:
await notify_control_plane(tenant_id, email, referral_source)
logger.info(
f"Assigned pre-provisioned tenant {tenant_id} to user {email}"
)
else:
# If no pre-provisioned tenant is available, create a new one on-demand
tenant_id = await create_tenant(email, referral_source)
except Exception as e:
logger.error(f"Tenant provisioning failed: {e}")
raise HTTPException(status_code=500, detail="Failed to provision tenant.")
@ -86,31 +117,10 @@ async def get_or_provision_tenant(
async def create_tenant(email: str, referral_source: str | None = None) -> str:
# Try to get a pre-provisioned tenant first
tenant_id = await get_available_tenant()
if tenant_id:
try:
# Complete the setup of the pre-provisioned tenant
await complete_tenant_setup(tenant_id, email)
# Notify control plane
if not DEV_MODE:
await notify_control_plane(tenant_id, email, referral_source)
logger.info(
f"Successfully assigned pre-provisioned tenant {tenant_id} to user {email}"
)
return tenant_id
except Exception as e:
logger.error(
f"Error assigning pre-provisioned tenant {tenant_id} to user {email}: {e}"
)
# If there's an error, try the traditional provisioning method
# Fall back to traditional provisioning if no pre-provisioned tenant is available
# or if there was an error assigning the pre-provisioned tenant
"""
Create a new tenant on-demand when no pre-provisioned tenants are available.
This is the fallback method when we can't use a pre-provisioned tenant.
"""
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
try:
# Provision tenant on data plane
@ -138,46 +148,29 @@ async def provision_tenant(tenant_id: str, email: str) -> None:
token = None
try:
# Check if this is a pre-provisioned tenant
is_pre_provisioned = False
with Session(get_sqlalchemy_engine()) as db_session:
pre_provisioned = (
db_session.query(NewAvailableTenant)
.filter(NewAvailableTenant.tenant_id == tenant_id)
if not create_schema_if_not_exists(tenant_id):
logger.debug(f"Created schema for tenant {tenant_id}")
else:
logger.debug(f"Schema already exists for tenant {tenant_id}")
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Await the Alembic migrations
await asyncio.to_thread(run_alembic_migrations, tenant_id)
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
if pre_provisioned:
is_pre_provisioned = True
# Remove from available tenants
db_session.delete(pre_provisioned)
db_session.commit()
# If not pre-provisioned, create the schema and run migrations
if not is_pre_provisioned:
if not create_schema_if_not_exists(tenant_id):
logger.debug(f"Created schema for tenant {tenant_id}")
else:
logger.debug(f"Schema already exists for tenant {tenant_id}")
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Await the Alembic migrations
await asyncio.to_thread(run_alembic_migrations, tenant_id)
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type
== EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
# Add the user to the tenant
add_users_to_tenant([email], tenant_id)
@ -429,52 +422,3 @@ async def get_available_tenant() -> str | None:
logger.error(f"Error getting available tenant: {e}")
return None
async def complete_tenant_setup(tenant_id: str, email: str) -> None:
"""
Complete the setup of a pre-provisioned tenant by configuring default API keys,
setting up Onyx, and adding the user to the tenant.
"""
token = None
try:
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Configure default API keys and setup Onyx
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
# Add the user to the tenant
add_users_to_tenant([email], tenant_id)
# Create milestone record
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
create_milestone_and_report(
user=None,
distinct_id=tenant_id,
event_type=MilestoneRecordType.TENANT_CREATED,
properties={
"email": email,
},
db_session=db_session,
)
except Exception as e:
logger.exception(f"Failed to complete setup for tenant {tenant_id}")
raise HTTPException(
status_code=500, detail=f"Failed to complete tenant setup: {str(e)}"
)
finally:
if token is not None:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)

View File

@ -21,6 +21,7 @@ from onyx.db.engine import get_sqlalchemy_engine
from onyx.db.models import NewAvailableTenant
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX
from shared_configs.enums import EmbeddingProvider
# Default number of pre-provisioned tenants to maintain
DEFAULT_TARGET_AVAILABLE_TENANTS = 5
@ -107,6 +108,8 @@ def check_available_tenants(self: Task) -> None:
def pre_provision_tenant(self: Task) -> None:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.
This function fully sets up the tenant with all necessary configurations,
so it's ready to be assigned to a user immediately.
"""
if not MULTI_TENANT:
logger.debug("Multi-tenancy is not enabled, skipping tenant pre-provisioning")
@ -126,11 +129,17 @@ def pre_provision_tenant(self: Task) -> None:
try:
# Generate a new tenant ID
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
token = None
# Import here to avoid circular imports
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import run_alembic_migrations
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from ee.onyx.server.tenants.provisioning import configure_default_api_keys
from onyx.setup import setup_onyx
from onyx.db.models import SearchSettings, IndexModelStatus
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from onyx.db.engine import get_session_with_tenant
# Create the schema for the new tenant
if not create_schema_if_not_exists(tenant_id):
@ -138,23 +147,49 @@ def pre_provision_tenant(self: Task) -> None:
else:
logger.debug(f"Schema already exists for tenant {tenant_id}")
# Run Alembic migrations
asyncio.run(asyncio.to_thread(run_alembic_migrations, tenant_id))
try:
# Set the tenant context
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Get the current Alembic version
alembic_version = get_current_alembic_version(tenant_id)
# Run Alembic migrations
asyncio.run(asyncio.to_thread(run_alembic_migrations, tenant_id))
# Store the pre-provisioned tenant in the database
with Session(get_sqlalchemy_engine()) as db_session:
new_tenant = NewAvailableTenant(
tenant_id=tenant_id,
alembic_version=alembic_version,
date_created=datetime.datetime.now(),
)
db_session.add(new_tenant)
db_session.commit()
# Configure the tenant with default settings
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
# Configure default API keys
configure_default_api_keys(db_session)
logger.info(f"Successfully pre-provisioned tenant {tenant_id}")
# Set up Onyx with appropriate settings
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type
== EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
# Get the current Alembic version
alembic_version = get_current_alembic_version(tenant_id)
# Store the pre-provisioned tenant in the database
with Session(get_sqlalchemy_engine()) as db_session:
new_tenant = NewAvailableTenant(
tenant_id=tenant_id,
alembic_version=alembic_version,
date_created=datetime.datetime.now(),
)
db_session.add(new_tenant)
db_session.commit()
logger.info(f"Successfully pre-provisioned tenant {tenant_id}")
finally:
if token is not None:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
except Exception as e:
logger.exception(f"Error in pre_provision_tenant task: {e}")