diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index fe25c1d80405..dbedc1396ac5 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -728,6 +728,10 @@ def cloud_check_alembic() -> bool | None: TODO: have the cloud migration script set an activity signal that this check uses to know it doesn't make sense to run a check at the present time. """ + + # Used as a placeholder if the alembic revision cannot be retrieved + ALEMBIC_NULL_REVISION = "000000000000" + time_start = time.monotonic() redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID) @@ -743,14 +747,14 @@ def cloud_check_alembic() -> bool | None: last_lock_time = time.monotonic() - tenant_to_revision: dict[str, str | None] = {} + tenant_to_revision: dict[str, str] = {} revision_counts: dict[str, int] = {} - out_of_date_tenants: dict[str, str | None] = {} + out_of_date_tenants: dict[str, str] = {} top_revision: str = "" tenant_ids: list[str] | list[None] = [] try: - # map each tenant_id to its revision + # map tenant_id to revision (or ALEMBIC_NULL_REVISION if the query fails) tenant_ids = get_all_tenant_ids() for tenant_id in tenant_ids: current_time = time.monotonic() @@ -762,43 +766,53 @@ def cloud_check_alembic() -> bool | None: continue with get_session_with_tenant(tenant_id=None) as session: - result = session.execute( - text(f'SELECT * FROM "{tenant_id}".alembic_version LIMIT 1') - ) + try: + result = session.execute( + text(f'SELECT * FROM "{tenant_id}".alembic_version LIMIT 1') + ) - result_scalar: str | None = result.scalar_one_or_none() - tenant_to_revision[tenant_id] = result_scalar + result_scalar: str | None = result.scalar_one_or_none() + if result_scalar is None: + raise ValueError("Alembic version should not be None.") + + tenant_to_revision[tenant_id] = result_scalar + except Exception: + task_logger.warning(f"Tenant {tenant_id} has no revision!") + tenant_to_revision[tenant_id] = ALEMBIC_NULL_REVISION # get the total count of each revision for k, v in tenant_to_revision.items(): - if v is None: - continue - revision_counts[v] = revision_counts.get(v, 0) + 1 + # error if any null revision tenants are found + if ALEMBIC_NULL_REVISION in revision_counts: + num_null_revisions = revision_counts[ALEMBIC_NULL_REVISION] + raise ValueError(f"No revision was found for {num_null_revisions} tenants!") + # get the revision with the most counts sorted_revision_counts = sorted( revision_counts.items(), key=lambda item: item[1], reverse=True ) if len(sorted_revision_counts) == 0: - task_logger.error( + raise ValueError( f"cloud_check_alembic - No revisions found for {len(tenant_ids)} tenant ids!" ) - else: - top_revision, _ = sorted_revision_counts[0] - # build a list of out of date tenants - for k, v in tenant_to_revision.items(): - if v == top_revision: - continue + top_revision, _ = sorted_revision_counts[0] - out_of_date_tenants[k] = v + # build a list of out of date tenants + for k, v in tenant_to_revision.items(): + if v == top_revision: + continue + + out_of_date_tenants[k] = v except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." ) + raise except Exception: task_logger.exception("Unexpected exception during cloud alembic check") raise @@ -816,6 +830,11 @@ def cloud_check_alembic() -> bool | None: f"num_tenants={len(tenant_ids)} " f"revision={top_revision}" ) + + num_to_log = min(5, len(out_of_date_tenants)) + task_logger.info( + f"Logging {num_to_log}/{len(out_of_date_tenants)} out of date tenants." + ) for k, v in islice(out_of_date_tenants.items(), 5): task_logger.info(f"Out of date tenant: tenant={k} revision={v}") else: