mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-27 20:38:32 +02:00
alert if revisions are null or query fails (#3910)
* alert if revisions are null or query fails * comment * mypy --------- Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
@@ -728,6 +728,10 @@ def cloud_check_alembic() -> bool | None:
|
|||||||
TODO: have the cloud migration script set an activity signal that this check
|
TODO: have the cloud migration script set an activity signal that this check
|
||||||
uses to know it doesn't make sense to run a check at the present time.
|
uses to know it doesn't make sense to run a check at the present time.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Used as a placeholder if the alembic revision cannot be retrieved
|
||||||
|
ALEMBIC_NULL_REVISION = "000000000000"
|
||||||
|
|
||||||
time_start = time.monotonic()
|
time_start = time.monotonic()
|
||||||
|
|
||||||
redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||||
@@ -743,14 +747,14 @@ def cloud_check_alembic() -> bool | None:
|
|||||||
|
|
||||||
last_lock_time = time.monotonic()
|
last_lock_time = time.monotonic()
|
||||||
|
|
||||||
tenant_to_revision: dict[str, str | None] = {}
|
tenant_to_revision: dict[str, str] = {}
|
||||||
revision_counts: dict[str, int] = {}
|
revision_counts: dict[str, int] = {}
|
||||||
out_of_date_tenants: dict[str, str | None] = {}
|
out_of_date_tenants: dict[str, str] = {}
|
||||||
top_revision: str = ""
|
top_revision: str = ""
|
||||||
tenant_ids: list[str] | list[None] = []
|
tenant_ids: list[str] | list[None] = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# map each tenant_id to its revision
|
# map tenant_id to revision (or ALEMBIC_NULL_REVISION if the query fails)
|
||||||
tenant_ids = get_all_tenant_ids()
|
tenant_ids = get_all_tenant_ids()
|
||||||
for tenant_id in tenant_ids:
|
for tenant_id in tenant_ids:
|
||||||
current_time = time.monotonic()
|
current_time = time.monotonic()
|
||||||
@@ -762,43 +766,53 @@ def cloud_check_alembic() -> bool | None:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
with get_session_with_tenant(tenant_id=None) as session:
|
with get_session_with_tenant(tenant_id=None) as session:
|
||||||
result = session.execute(
|
try:
|
||||||
text(f'SELECT * FROM "{tenant_id}".alembic_version LIMIT 1')
|
result = session.execute(
|
||||||
)
|
text(f'SELECT * FROM "{tenant_id}".alembic_version LIMIT 1')
|
||||||
|
)
|
||||||
|
|
||||||
result_scalar: str | None = result.scalar_one_or_none()
|
result_scalar: str | None = result.scalar_one_or_none()
|
||||||
tenant_to_revision[tenant_id] = result_scalar
|
if result_scalar is None:
|
||||||
|
raise ValueError("Alembic version should not be None.")
|
||||||
|
|
||||||
|
tenant_to_revision[tenant_id] = result_scalar
|
||||||
|
except Exception:
|
||||||
|
task_logger.warning(f"Tenant {tenant_id} has no revision!")
|
||||||
|
tenant_to_revision[tenant_id] = ALEMBIC_NULL_REVISION
|
||||||
|
|
||||||
# get the total count of each revision
|
# get the total count of each revision
|
||||||
for k, v in tenant_to_revision.items():
|
for k, v in tenant_to_revision.items():
|
||||||
if v is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
revision_counts[v] = revision_counts.get(v, 0) + 1
|
revision_counts[v] = revision_counts.get(v, 0) + 1
|
||||||
|
|
||||||
|
# error if any null revision tenants are found
|
||||||
|
if ALEMBIC_NULL_REVISION in revision_counts:
|
||||||
|
num_null_revisions = revision_counts[ALEMBIC_NULL_REVISION]
|
||||||
|
raise ValueError(f"No revision was found for {num_null_revisions} tenants!")
|
||||||
|
|
||||||
# get the revision with the most counts
|
# get the revision with the most counts
|
||||||
sorted_revision_counts = sorted(
|
sorted_revision_counts = sorted(
|
||||||
revision_counts.items(), key=lambda item: item[1], reverse=True
|
revision_counts.items(), key=lambda item: item[1], reverse=True
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(sorted_revision_counts) == 0:
|
if len(sorted_revision_counts) == 0:
|
||||||
task_logger.error(
|
raise ValueError(
|
||||||
f"cloud_check_alembic - No revisions found for {len(tenant_ids)} tenant ids!"
|
f"cloud_check_alembic - No revisions found for {len(tenant_ids)} tenant ids!"
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
top_revision, _ = sorted_revision_counts[0]
|
|
||||||
|
|
||||||
# build a list of out of date tenants
|
top_revision, _ = sorted_revision_counts[0]
|
||||||
for k, v in tenant_to_revision.items():
|
|
||||||
if v == top_revision:
|
|
||||||
continue
|
|
||||||
|
|
||||||
out_of_date_tenants[k] = v
|
# build a list of out of date tenants
|
||||||
|
for k, v in tenant_to_revision.items():
|
||||||
|
if v == top_revision:
|
||||||
|
continue
|
||||||
|
|
||||||
|
out_of_date_tenants[k] = v
|
||||||
|
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
"Soft time limit exceeded, task is being terminated gracefully."
|
"Soft time limit exceeded, task is being terminated gracefully."
|
||||||
)
|
)
|
||||||
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
task_logger.exception("Unexpected exception during cloud alembic check")
|
task_logger.exception("Unexpected exception during cloud alembic check")
|
||||||
raise
|
raise
|
||||||
@@ -816,6 +830,11 @@ def cloud_check_alembic() -> bool | None:
|
|||||||
f"num_tenants={len(tenant_ids)} "
|
f"num_tenants={len(tenant_ids)} "
|
||||||
f"revision={top_revision}"
|
f"revision={top_revision}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
num_to_log = min(5, len(out_of_date_tenants))
|
||||||
|
task_logger.info(
|
||||||
|
f"Logging {num_to_log}/{len(out_of_date_tenants)} out of date tenants."
|
||||||
|
)
|
||||||
for k, v in islice(out_of_date_tenants.items(), 5):
|
for k, v in islice(out_of_date_tenants.items(), 5):
|
||||||
task_logger.info(f"Out of date tenant: tenant={k} revision={v}")
|
task_logger.info(f"Out of date tenant: tenant={k} revision={v}")
|
||||||
else:
|
else:
|
||||||
|
Reference in New Issue
Block a user