Small background telemetry fix

This commit is contained in:
Weves
2025-01-18 11:32:08 -08:00
committed by Chris Weaver
parent 896e716d02
commit a72bd31f5d

View File

@@ -68,20 +68,22 @@ class Metric(BaseModel):
task_logger.info(json.dumps(data)) task_logger.info(json.dumps(data))
def emit(self, tenant_id: str | None) -> None: def emit(self, tenant_id: str | None) -> None:
# Convert value to appropriate type # Convert value to appropriate type based on the input value
float_value = ( bool_value = None
float(self.value) if isinstance(self.value, (int, float)) else None float_value = None
) int_value = None
int_value = int(self.value) if isinstance(self.value, int) else None string_value = None
string_value = str(self.value) if isinstance(self.value, str) else None # NOTE: have to do bool first, since `isinstance(True, int)` is true
bool_value = bool(self.value) if isinstance(self.value, bool) else None # e.g. bool is a subclass of int
if isinstance(self.value, bool):
if ( bool_value = self.value
float_value is None elif isinstance(self.value, int):
and int_value is None int_value = self.value
and string_value is None elif isinstance(self.value, float):
and bool_value is None float_value = self.value
): elif isinstance(self.value, str):
string_value = self.value
else:
task_logger.error( task_logger.error(
f"Invalid metric value type: {type(self.value)} " f"Invalid metric value type: {type(self.value)} "
f"({self.value}) for metric {self.name}." f"({self.value}) for metric {self.name}."
@@ -183,35 +185,41 @@ def _build_connector_start_latency_metric(
) )
def _build_run_success_metric( def _build_run_success_metrics(
cc_pair: ConnectorCredentialPair, recent_attempt: IndexAttempt, redis_std: Redis cc_pair: ConnectorCredentialPair,
) -> Metric | None: recent_attempts: list[IndexAttempt],
redis_std: Redis,
) -> list[Metric]:
metrics = []
for attempt in recent_attempts:
metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format( metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format(
cc_pair_id=cc_pair.id, cc_pair_id=cc_pair.id,
index_attempt_id=recent_attempt.id, index_attempt_id=attempt.id,
) )
if _has_metric_been_emitted(redis_std, metric_key): if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info( task_logger.info(
f"Skipping metric for connector {cc_pair.connector.id} " f"Skipping metric for connector {cc_pair.connector.id} "
f"index attempt {recent_attempt.id} because it has already been " f"index attempt {attempt.id} because it has already been "
"emitted" "emitted"
) )
return None continue
if recent_attempt.status in [ if attempt.status in [
IndexingStatus.SUCCESS, IndexingStatus.SUCCESS,
IndexingStatus.FAILED, IndexingStatus.FAILED,
IndexingStatus.CANCELED, IndexingStatus.CANCELED,
]: ]:
return Metric( metrics.append(
Metric(
key=metric_key, key=metric_key,
name="connector_run_succeeded", name="connector_run_succeeded",
value=recent_attempt.status == IndexingStatus.SUCCESS, value=attempt.status == IndexingStatus.SUCCESS,
tags={"source": str(cc_pair.connector.source)}, tags={"source": str(cc_pair.connector.source)},
) )
)
return None return metrics
def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]:
@@ -224,7 +232,7 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
metrics = [] metrics = []
for cc_pair in cc_pairs: for cc_pair in cc_pairs:
# Get most recent attempt in the last hour # Get all attempts in the last hour
recent_attempts = ( recent_attempts = (
db_session.query(IndexAttempt) db_session.query(IndexAttempt)
.filter( .filter(
@@ -232,31 +240,29 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
IndexAttempt.time_created >= one_hour_ago, IndexAttempt.time_created >= one_hour_ago,
) )
.order_by(IndexAttempt.time_created.desc()) .order_by(IndexAttempt.time_created.desc())
.limit(2)
.all() .all()
) )
recent_attempt = recent_attempts[0] if recent_attempts else None most_recent_attempt = recent_attempts[0] if recent_attempts else None
second_most_recent_attempt = ( second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None recent_attempts[1] if len(recent_attempts) > 1 else None
) )
# if no metric to emit, skip # if no metric to emit, skip
if not recent_attempt: if most_recent_attempt is None:
continue continue
# Connector start latency # Connector start latency
start_latency_metric = _build_connector_start_latency_metric( start_latency_metric = _build_connector_start_latency_metric(
cc_pair, recent_attempt, second_most_recent_attempt, redis_std cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
) )
if start_latency_metric: if start_latency_metric:
metrics.append(start_latency_metric) metrics.append(start_latency_metric)
# Connector run success/failure # Connector run success/failure
run_success_metric = _build_run_success_metric( run_success_metrics = _build_run_success_metrics(
cc_pair, recent_attempt, redis_std cc_pair, recent_attempts, redis_std
) )
if run_success_metric: metrics.extend(run_success_metrics)
metrics.append(run_success_metric)
return metrics return metrics