Sync status improvements (#3782)

* minor improvments / clarity

* additional comment for clarity

* typing

* quick updates to monitoring

* connector deletion

* quick nit

* fix typing

* update values

* quick nit

* functioning

* improvements to monitoring

* update

* minutes -> seconds
This commit is contained in:
pablonyx 2025-01-26 09:35:26 -08:00 committed by GitHub
parent d8a17a7238
commit 70795a4047
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 355 additions and 164 deletions

View File

@ -227,7 +227,7 @@ if not MULTI_TENANT:
{
"name": "monitor-background-processes",
"task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
"schedule": timedelta(minutes=5),
"schedule": timedelta(minutes=15),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,

View File

@ -139,13 +139,6 @@ def try_generate_document_cc_pair_cleanup_tasks(
submitted=datetime.now(timezone.utc),
)
# create before setting fence to avoid race condition where the monitoring
# task updates the sync record before it is created
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.CONNECTOR_DELETION,
)
redis_connector.delete.set_fence(fence_payload)
try:
@ -184,6 +177,13 @@ def try_generate_document_cc_pair_cleanup_tasks(
)
if tasks_generated is None:
raise ValueError("RedisConnectorDeletion.generate_tasks returned None")
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.CONNECTOR_DELETION,
)
except TaskDependencyError:
redis_connector.delete.set_fence(None)
raise

View File

@ -4,6 +4,7 @@ from collections.abc import Callable
from datetime import timedelta
from itertools import islice
from typing import Any
from typing import Literal
from celery import shared_task
from celery import Task
@ -26,6 +27,7 @@ from onyx.db.engine import get_all_tenant_ids
from onyx.db.engine import get_db_current_time
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import IndexingStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import DocumentSet
@ -38,6 +40,7 @@ from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
_MONITORING_TIME_LIMIT = _MONITORING_SOFT_TIME_LIMIT + 60 # 6 minutes
@ -49,6 +52,12 @@ _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT = (
"monitoring_connector_index_attempt_run_success:{cc_pair_id}:{index_attempt_id}"
)
_FINAL_METRIC_KEY_FMT = "sync_final_metrics:{sync_type}:{entity_id}:{sync_record_id}"
_SYNC_START_LATENCY_KEY_FMT = (
"sync_start_latency:{sync_type}:{entity_id}:{sync_record_id}"
)
def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None:
"""Mark a metric as having been emitted by setting a Redis key with expiration"""
@ -111,6 +120,7 @@ class Metric(BaseModel):
}.items()
if v is not None
}
task_logger.info(f"Emitting metric: {data}")
optional_telemetry(
record_type=RecordType.METRIC,
data=data,
@ -189,237 +199,368 @@ def _build_connector_start_latency_metric(
f"Start latency for index attempt {recent_attempt.id}: {start_latency:.2f}s "
f"(desired: {desired_start_time}, actual: {recent_attempt.time_started})"
)
job_id = build_job_id("connector", str(cc_pair.id), str(recent_attempt.id))
return Metric(
key=metric_key,
name="connector_start_latency",
value=start_latency,
tags={},
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
def _build_run_success_metrics(
def _build_connector_final_metrics(
cc_pair: ConnectorCredentialPair,
recent_attempts: list[IndexAttempt],
redis_std: Redis,
) -> list[Metric]:
"""
Final metrics for connector index attempts:
- Boolean success/fail metric
- If success, emit:
* duration (seconds)
* doc_count
"""
metrics = []
for attempt in recent_attempts:
metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format(
cc_pair_id=cc_pair.id,
index_attempt_id=attempt.id,
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping metric for connector {cc_pair.connector.id} "
f"index attempt {attempt.id} because it has already been "
"emitted"
f"Skipping final metrics for connector {cc_pair.connector.id} "
f"index attempt {attempt.id}, already emitted."
)
continue
if attempt.status in [
# We only emit final metrics if the attempt is in a terminal state
if attempt.status not in [
IndexingStatus.SUCCESS,
IndexingStatus.FAILED,
IndexingStatus.CANCELED,
]:
task_logger.info(
f"Adding run success metric for index attempt {attempt.id} with status {attempt.status}"
# Not finished; skip
continue
job_id = build_job_id("connector", str(cc_pair.id), str(attempt.id))
success = attempt.status == IndexingStatus.SUCCESS
metrics.append(
Metric(
key=metric_key, # We'll mark the same key for any final metrics
name="connector_run_succeeded",
value=success,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
"status": attempt.status.value,
},
)
)
if success:
# Make sure we have valid time_started
if attempt.time_started and attempt.time_updated:
duration_seconds = (
attempt.time_updated - attempt.time_started
).total_seconds()
metrics.append(
Metric(
key=None, # No need for a new key, or you can reuse the same if you prefer
name="connector_index_duration_seconds",
value=duration_seconds,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)
else:
task_logger.error(
f"Index attempt {attempt.id} succeeded but has missing time "
f"(time_started={attempt.time_started}, time_updated={attempt.time_updated})."
)
# For doc counts, choose whichever field is more relevant
doc_count = attempt.total_docs_indexed or 0
metrics.append(
Metric(
key=metric_key,
name="connector_run_succeeded",
value=attempt.status == IndexingStatus.SUCCESS,
tags={"source": str(cc_pair.connector.source)},
key=None,
name="connector_index_doc_count",
value=doc_count,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)
_mark_metric_as_emitted(redis_std, metric_key)
return metrics
def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]:
"""Collect metrics about connector runs from the past hour"""
# NOTE: use get_db_current_time since the IndexAttempt times are set based on DB time
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)
# Get all connector credential pairs
cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all()
# Might be more than one search setting, or just one
active_search_settings = get_active_search_settings(db_session)
metrics = []
for cc_pair, search_settings in zip(cc_pairs, active_search_settings):
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.search_settings_id == search_settings.id,
# If you want to process each cc_pair against each search setting:
for cc_pair in cc_pairs:
for search_settings in active_search_settings:
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.search_settings_id == search_settings.id,
)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
if not recent_attempts:
continue
most_recent_attempt = recent_attempts[0]
second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None
)
if not recent_attempts:
continue
if one_hour_ago > most_recent_attempt.time_created:
continue
most_recent_attempt = recent_attempts[0]
second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None
)
# Connector start latency
start_latency_metric = _build_connector_start_latency_metric(
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
)
if start_latency_metric:
metrics.append(start_latency_metric)
if one_hour_ago > most_recent_attempt.time_created:
continue
# Connector run success/failure
run_success_metrics = _build_run_success_metrics(
cc_pair, recent_attempts, redis_std
)
metrics.extend(run_success_metrics)
# Connector start latency
start_latency_metric = _build_connector_start_latency_metric(
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
)
if start_latency_metric:
metrics.append(start_latency_metric)
# Connector run success/failure
final_metrics = _build_connector_final_metrics(
cc_pair, recent_attempts, redis_std
)
metrics.extend(final_metrics)
return metrics
def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]:
"""Collect metrics about document set and group syncing speed"""
# NOTE: use get_db_current_time since the SyncRecord times are set based on DB time
"""
Collect metrics for document set and group syncing:
- Success/failure status
- Start latency (always)
- Duration & doc count (only if success)
- Throughput (docs/min) (only if success)
"""
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)
# Get all sync records from the last hour
# Get all sync records that ended in the last hour
recent_sync_records = db_session.scalars(
select(SyncRecord)
.where(SyncRecord.sync_start_time >= one_hour_ago)
.order_by(SyncRecord.sync_start_time.desc())
.where(SyncRecord.sync_end_time.isnot(None))
.where(SyncRecord.sync_end_time >= one_hour_ago)
.order_by(SyncRecord.sync_end_time.desc())
).all()
task_logger.info(
f"Collecting sync metrics for {len(recent_sync_records)} sync records"
)
metrics = []
for sync_record in recent_sync_records:
# Skip if no end time (sync still in progress)
if not sync_record.sync_end_time:
continue
# Build a job_id for correlation
job_id = build_job_id("sync_record", str(sync_record.id))
# Check if we already emitted a metric for this sync record
metric_key = (
f"sync_speed:{sync_record.sync_type}:"
f"{sync_record.entity_id}:{sync_record.id}"
# Emit a SUCCESS/FAIL boolean metric
# Use a single Redis key to avoid re-emitting final metrics
final_metric_key = _FINAL_METRIC_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping metric for sync record {sync_record.id} "
"because it has already been emitted"
if not _has_metric_been_emitted(redis_std, final_metric_key):
# Evaluate success
sync_succeeded = sync_record.sync_status == SyncStatus.SUCCESS
metrics.append(
Metric(
key=final_metric_key,
name="sync_run_succeeded",
value=sync_succeeded,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
"status": str(sync_record.sync_status),
},
)
)
continue
# Calculate sync duration in minutes
sync_duration_mins = (
sync_record.sync_end_time - sync_record.sync_start_time
).total_seconds() / 60.0
# If successful, emit additional metrics
if sync_succeeded:
if sync_record.sync_end_time and sync_record.sync_start_time:
duration_seconds = (
sync_record.sync_end_time - sync_record.sync_start_time
).total_seconds()
else:
task_logger.error(
f"Invalid times for sync record {sync_record.id}: "
f"start={sync_record.sync_start_time}, end={sync_record.sync_end_time}"
)
duration_seconds = None
# Calculate sync speed (docs/min) - avoid division by zero
sync_speed = (
sync_record.num_docs_synced / sync_duration_mins
if sync_duration_mins > 0
else None
doc_count = sync_record.num_docs_synced or 0
sync_speed = None
if duration_seconds and duration_seconds > 0:
duration_mins = duration_seconds / 60.0
sync_speed = (
doc_count / duration_mins if duration_mins > 0 else None
)
# Emit duration, doc count, speed
if duration_seconds is not None:
metrics.append(
Metric(
key=None,
name="sync_duration_seconds",
value=duration_seconds,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
else:
task_logger.error(
f"Invalid sync record {sync_record.id} with no duration"
)
metrics.append(
Metric(
key=None,
name="sync_doc_count",
value=doc_count,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
if sync_speed is not None:
metrics.append(
Metric(
key=None,
name="sync_speed_docs_per_min",
value=sync_speed,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
else:
task_logger.error(
f"Invalid sync record {sync_record.id} with no duration"
)
# Mark final metrics as emitted so we don't re-emit
_mark_metric_as_emitted(redis_std, final_metric_key)
# Emit start latency
start_latency_key = _SYNC_START_LATENCY_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
if not _has_metric_been_emitted(redis_std, start_latency_key):
# Get the entity's last update time based on sync type
entity: DocumentSet | UserGroup | None = None
if sync_record.sync_type == SyncType.DOCUMENT_SET:
entity = db_session.scalar(
select(DocumentSet).where(DocumentSet.id == sync_record.entity_id)
)
elif sync_record.sync_type == SyncType.USER_GROUP:
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
task_logger.info(
f"Skipping sync record {sync_record.id} of type {sync_record.sync_type}."
)
continue
if sync_speed is None:
task_logger.error(
f"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}, duration: {sync_duration_mins}, "
f"docs synced: {sync_record.num_docs_synced}"
)
continue
if entity is None:
task_logger.error(
f"Could not find entity for sync record {sync_record.id} "
f"(type={sync_record.sync_type}, id={sync_record.entity_id})."
)
continue
task_logger.info(
f"Calculated sync speed for record {sync_record.id}: {sync_speed} docs/min"
)
metrics.append(
Metric(
key=metric_key,
name="sync_speed_docs_per_min",
value=sync_speed,
tags={
"sync_type": str(sync_record.sync_type),
"status": str(sync_record.sync_status),
},
)
)
# Calculate start latency in seconds:
# (actual sync start) - (last modified time)
if entity.time_last_modified_by_user and sync_record.sync_start_time:
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
# Add sync start latency metric
start_latency_key = (
f"sync_start_latency:{sync_record.sync_type}"
f":{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, start_latency_key):
task_logger.info(
f"Skipping start latency metric for sync record {sync_record.id} "
"because it has already been emitted"
)
continue
if start_latency < 0:
task_logger.error(
f"Negative start latency for sync record {sync_record.id} "
f"(start={sync_record.sync_start_time}, entity_modified={entity.time_last_modified_by_user})"
)
continue
# Get the entity's last update time based on sync type
entity: DocumentSet | UserGroup | None = None
if sync_record.sync_type == SyncType.DOCUMENT_SET:
entity = db_session.scalar(
select(DocumentSet).where(DocumentSet.id == sync_record.entity_id)
)
elif sync_record.sync_type == SyncType.USER_GROUP:
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
# Skip other sync types
task_logger.info(
f"Skipping sync record {sync_record.id} "
f"with type {sync_record.sync_type} "
f"and id {sync_record.entity_id} "
"because it is not a document set or user group"
)
continue
metrics.append(
Metric(
key=start_latency_key,
name="sync_start_latency_seconds",
value=start_latency,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
if entity is None:
task_logger.error(
f"Could not find entity for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}"
)
continue
# Calculate start latency in seconds
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
task_logger.info(
f"Calculated start latency for sync record {sync_record.id}: {start_latency} seconds"
)
if start_latency < 0:
task_logger.error(
f"Start latency is negative for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}. "
f"Sync start time: {sync_record.sync_start_time}, "
f"Entity last modified: {entity.time_last_modified_by_user}"
)
continue
metrics.append(
Metric(
key=start_latency_key,
name="sync_start_latency_seconds",
value=start_latency,
tags={
"sync_type": str(sync_record.sync_type),
},
)
)
_mark_metric_as_emitted(redis_std, start_latency_key)
return metrics
def build_job_id(
job_type: Literal["connector", "sync_record"],
primary_id: str,
secondary_id: str | None = None,
) -> str:
if job_type == "connector":
if secondary_id is None:
raise ValueError(
"secondary_id (attempt_id) is required for connector job_type"
)
return f"connector:{primary_id}:attempt:{secondary_id}"
elif job_type == "sync_record":
return f"sync_record:{primary_id}"
@shared_task(
name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
soft_time_limit=_MONITORING_SOFT_TIME_LIMIT,
@ -459,6 +600,7 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
lambda: _collect_connector_metrics(db_session, redis_std),
lambda: _collect_sync_metrics(db_session, redis_std),
]
# Collect and log each metric
with get_session_with_tenant(tenant_id) as db_session:
for metric_fn in metric_functions:

View File

@ -8,20 +8,64 @@ from sqlalchemy.orm import Session
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import SyncRecord
from onyx.setup import setup_logger
logger = setup_logger()
def insert_sync_record(
db_session: Session,
entity_id: int | None,
entity_id: int,
sync_type: SyncType,
) -> SyncRecord:
"""Insert a new sync record into the database.
"""Insert a new sync record into the database, cancelling any existing in-progress records.
Args:
db_session: The database session to use
entity_id: The ID of the entity being synced (document set ID, user group ID, etc.)
sync_type: The type of sync operation
"""
# If an existing in-progress sync record exists, mark as cancelled
existing_in_progress_sync_record = fetch_latest_sync_record(
db_session, entity_id, sync_type, sync_status=SyncStatus.IN_PROGRESS
)
if existing_in_progress_sync_record is not None:
logger.info(
f"Cancelling existing in-progress sync record {existing_in_progress_sync_record.id} "
f"for entity_id={entity_id} sync_type={sync_type}"
)
mark_sync_records_as_cancelled(db_session, entity_id, sync_type)
return _create_sync_record(db_session, entity_id, sync_type)
def mark_sync_records_as_cancelled(
db_session: Session,
entity_id: int | None,
sync_type: SyncType,
) -> None:
stmt = (
update(SyncRecord)
.where(
and_(
SyncRecord.entity_id == entity_id,
SyncRecord.sync_type == sync_type,
SyncRecord.sync_status == SyncStatus.IN_PROGRESS,
)
)
.values(sync_status=SyncStatus.CANCELED)
)
db_session.execute(stmt)
db_session.commit()
def _create_sync_record(
db_session: Session,
entity_id: int | None,
sync_type: SyncType,
) -> SyncRecord:
"""Create and insert a new sync record into the database."""
sync_record = SyncRecord(
entity_id=entity_id,
sync_type=sync_type,
@ -39,6 +83,7 @@ def fetch_latest_sync_record(
db_session: Session,
entity_id: int,
sync_type: SyncType,
sync_status: SyncStatus | None = None,
) -> SyncRecord | None:
"""Fetch the most recent sync record for a given entity ID and status.
@ -59,6 +104,9 @@ def fetch_latest_sync_record(
.limit(1)
)
if sync_status is not None:
stmt = stmt.where(SyncRecord.sync_status == sync_status)
result = db_session.execute(stmt)
return result.scalar_one_or_none()

View File

@ -123,6 +123,7 @@ def optional_telemetry(
headers={"Content-Type": "application/json"},
json=payload,
)
except Exception:
# This way it silences all thread level logging as well
pass

View File