log attempt id, log elapsed since task execution start, remove log spam (#3539)

* log attempt id, log elapsed since task execution start, remove log spam

* diagnostic lock logs

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer
2024-12-21 15:03:50 -08:00
committed by GitHub
parent b9567eabd7
commit eb369caefb
2 changed files with 43 additions and 8 deletions

View File

@ -3,6 +3,7 @@ from datetime import datetime
from datetime import timezone from datetime import timezone
from http import HTTPStatus from http import HTTPStatus
from time import sleep from time import sleep
from typing import cast
import redis import redis
import sentry_sdk import sentry_sdk
@ -100,13 +101,37 @@ class IndexingCallback(IndexingHeartbeatInterface):
self.last_lock_reacquire = datetime.now(timezone.utc) self.last_lock_reacquire = datetime.now(timezone.utc)
except LockError: except LockError:
logger.exception( logger.exception(
f"IndexingCallback - lock.reacquire exceptioned. " f"IndexingCallback - lock.reacquire exceptioned: "
f"lock_timeout={self.redis_lock.timeout} " f"lock_timeout={self.redis_lock.timeout} "
f"start={self.started} " f"start={self.started} "
f"last_tag={self.last_tag} " f"last_tag={self.last_tag} "
f"last_reacquired={self.last_lock_reacquire} " f"last_reacquired={self.last_lock_reacquire} "
f"now={datetime.now(timezone.utc)}" f"now={datetime.now(timezone.utc)}"
) )
# diagnostic logging for lock errors
name = self.redis_lock.name
ttl = self.redis_client.ttl(name)
locked = self.redis_lock.locked()
owned = self.redis_lock.owned()
local_token: str | None = self.redis_lock.local.token # type: ignore
remote_token_raw = self.redis_client.get(self.redis_lock.name)
if remote_token_raw:
remote_token_bytes = cast(bytes, remote_token_raw)
remote_token = remote_token_bytes.decode("utf-8")
else:
remote_token = None
logger.warning(
f"IndexingCallback - lock diagnostics: "
f"name={name} "
f"locked={locked} "
f"owned={owned} "
f"local_token={local_token} "
f"remote_token={remote_token} "
f"ttl={ttl}"
)
raise raise
self.redis_client.incrby(self.generator_progress_key, amount) self.redis_client.incrby(self.generator_progress_key, amount)
@ -325,7 +350,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing # or be currently executing
try: try:
task_logger.info("Validating indexing fences...")
validate_indexing_fences( validate_indexing_fences(
tenant_id, self.app, redis_client, redis_client_celery, lock_beat tenant_id, self.app, redis_client, redis_client_celery, lock_beat
) )
@ -363,7 +387,7 @@ def validate_indexing_fences(
lock_beat: RedisLock, lock_beat: RedisLock,
) -> None: ) -> None:
reserved_indexing_tasks = celery_get_unacked_task_ids( reserved_indexing_tasks = celery_get_unacked_task_ids(
"connector_indexing", r_celery OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery
) )
# validate all existing indexing jobs # validate all existing indexing jobs

View File

@ -165,7 +165,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
lock_beat.release() lock_beat.release()
time_elapsed = time.monotonic() - time_start time_elapsed = time.monotonic() - time_start
task_logger.info(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}") task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return return
@ -637,15 +637,23 @@ def monitor_ccpair_indexing_taskset(
if not payload: if not payload:
return return
elapsed_started_str = None
if payload.started:
elapsed_started = datetime.now(timezone.utc) - payload.started
elapsed_started_str = f"{elapsed_started.total_seconds():.2f}"
elapsed_submitted = datetime.now(timezone.utc) - payload.submitted elapsed_submitted = datetime.now(timezone.utc) - payload.submitted
progress = redis_connector_index.get_progress() progress = redis_connector_index.get_progress()
if progress is not None: if progress is not None:
task_logger.info( task_logger.info(
f"Connector indexing progress: cc_pair={cc_pair_id} " f"Connector indexing progress: "
f"attempt={payload.index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} " f"search_settings={search_settings_id} "
f"progress={progress} " f"progress={progress} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"elapsed_started={elapsed_started_str}"
) )
if payload.index_attempt_id is None or payload.celery_task_id is None: if payload.index_attempt_id is None or payload.celery_task_id is None:
@ -716,11 +724,14 @@ def monitor_ccpair_indexing_taskset(
status_enum = HTTPStatus(status_int) status_enum = HTTPStatus(status_int)
task_logger.info( task_logger.info(
f"Connector indexing finished: cc_pair={cc_pair_id} " f"Connector indexing finished: "
f"attempt={payload.index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} " f"search_settings={search_settings_id} "
f"progress={progress} " f"progress={progress} "
f"status={status_enum.name} " f"status={status_enum.name} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}" f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"elapsed_started={elapsed_started_str}"
) )
redis_connector_index.reset() redis_connector_index.reset()