diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 6f09ca15f..20782ae7f 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -68,20 +68,22 @@ class Metric(BaseModel): task_logger.info(json.dumps(data)) def emit(self, tenant_id: str | None) -> None: - # Convert value to appropriate type - float_value = ( - float(self.value) if isinstance(self.value, (int, float)) else None - ) - int_value = int(self.value) if isinstance(self.value, int) else None - string_value = str(self.value) if isinstance(self.value, str) else None - bool_value = bool(self.value) if isinstance(self.value, bool) else None - - if ( - float_value is None - and int_value is None - and string_value is None - and bool_value is None - ): + # Convert value to appropriate type based on the input value + bool_value = None + float_value = None + int_value = None + string_value = None + # NOTE: have to do bool first, since `isinstance(True, int)` is true + # e.g. bool is a subclass of int + if isinstance(self.value, bool): + bool_value = self.value + elif isinstance(self.value, int): + int_value = self.value + elif isinstance(self.value, float): + float_value = self.value + elif isinstance(self.value, str): + string_value = self.value + else: task_logger.error( f"Invalid metric value type: {type(self.value)} " f"({self.value}) for metric {self.name}." @@ -183,35 +185,41 @@ def _build_connector_start_latency_metric( ) -def _build_run_success_metric( - cc_pair: ConnectorCredentialPair, recent_attempt: IndexAttempt, redis_std: Redis -) -> Metric | None: - metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format( - cc_pair_id=cc_pair.id, - index_attempt_id=recent_attempt.id, - ) - - if _has_metric_been_emitted(redis_std, metric_key): - task_logger.info( - f"Skipping metric for connector {cc_pair.connector.id} " - f"index attempt {recent_attempt.id} because it has already been " - "emitted" - ) - return None - - if recent_attempt.status in [ - IndexingStatus.SUCCESS, - IndexingStatus.FAILED, - IndexingStatus.CANCELED, - ]: - return Metric( - key=metric_key, - name="connector_run_succeeded", - value=recent_attempt.status == IndexingStatus.SUCCESS, - tags={"source": str(cc_pair.connector.source)}, +def _build_run_success_metrics( + cc_pair: ConnectorCredentialPair, + recent_attempts: list[IndexAttempt], + redis_std: Redis, +) -> list[Metric]: + metrics = [] + for attempt in recent_attempts: + metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=attempt.id, ) - return None + if _has_metric_been_emitted(redis_std, metric_key): + task_logger.info( + f"Skipping metric for connector {cc_pair.connector.id} " + f"index attempt {attempt.id} because it has already been " + "emitted" + ) + continue + + if attempt.status in [ + IndexingStatus.SUCCESS, + IndexingStatus.FAILED, + IndexingStatus.CANCELED, + ]: + metrics.append( + Metric( + key=metric_key, + name="connector_run_succeeded", + value=attempt.status == IndexingStatus.SUCCESS, + tags={"source": str(cc_pair.connector.source)}, + ) + ) + + return metrics def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: @@ -224,7 +232,7 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me metrics = [] for cc_pair in cc_pairs: - # Get most recent attempt in the last hour + # Get all attempts in the last hour recent_attempts = ( db_session.query(IndexAttempt) .filter( @@ -232,31 +240,29 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me IndexAttempt.time_created >= one_hour_ago, ) .order_by(IndexAttempt.time_created.desc()) - .limit(2) .all() ) - recent_attempt = recent_attempts[0] if recent_attempts else None + most_recent_attempt = recent_attempts[0] if recent_attempts else None second_most_recent_attempt = ( recent_attempts[1] if len(recent_attempts) > 1 else None ) # if no metric to emit, skip - if not recent_attempt: + if most_recent_attempt is None: continue # Connector start latency start_latency_metric = _build_connector_start_latency_metric( - cc_pair, recent_attempt, second_most_recent_attempt, redis_std + cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std ) if start_latency_metric: metrics.append(start_latency_metric) # Connector run success/failure - run_success_metric = _build_run_success_metric( - cc_pair, recent_attempt, redis_std + run_success_metrics = _build_run_success_metrics( + cc_pair, recent_attempts, redis_std ) - if run_success_metric: - metrics.append(run_success_metric) + metrics.extend(run_success_metrics) return metrics