Merge pull request #3566 from onyx-dot-app/bugfix/primary_task_timings

re-enable celery task execution logging in primary worker
This commit is contained in:
rkuo-danswer
2024-12-31 01:29:05 -08:00
committed by GitHub
7 changed files with 38 additions and 13 deletions

View File

@@ -414,11 +414,21 @@ def on_setup_logging(
task_logger.setLevel(loglevel) task_logger.setLevel(loglevel)
task_logger.propagate = False task_logger.propagate = False
# Hide celery task received and succeeded/failed messages # hide celery task received spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received"
strategy.logger.setLevel(logging.WARNING) strategy.logger.setLevel(logging.WARNING)
# uncomment this to hide celery task succeeded/failed spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None"
trace.logger.setLevel(logging.WARNING) trace.logger.setLevel(logging.WARNING)
def set_task_finished_log_level(logLevel: int) -> None:
"""call this to override the setLevel in on_setup_logging. We are interested
in the task timings in the cloud but it can be spammy for self hosted."""
trace.logger.setLevel(logLevel)
class TenantContextFilter(logging.Filter): class TenantContextFilter(logging.Filter):
"""Logging filter to inject tenant ID into the logger's name.""" """Logging filter to inject tenant ID into the logger's name."""

View File

@@ -1,3 +1,4 @@
import logging
import multiprocessing import multiprocessing
from typing import Any from typing import Any
from typing import cast from typing import cast
@@ -194,6 +195,10 @@ def on_setup_logging(
) -> None: ) -> None:
app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs) app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs)
# this can be spammy, so just enable it in the cloud for now
if MULTI_TENANT:
app_base.set_task_finished_log_level(logging.INFO)
class HubPeriodicTask(bootsteps.StartStopStep): class HubPeriodicTask(bootsteps.StartStopStep):
"""Regularly reacquires the primary worker lock outside of the task queue. """Regularly reacquires the primary worker lock outside of the task queue.

View File

@@ -34,7 +34,9 @@ class TaskDependencyError(RuntimeError):
trail=False, trail=False,
bind=True, bind=True,
) )
def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None: def check_for_connector_deletion_task(
self: Task, *, tenant_id: str | None
) -> bool | None:
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock( lock_beat: RedisLock = r.lock(
@@ -45,7 +47,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
try: try:
# these tasks should never overlap # these tasks should never overlap
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
return return None
# collect cc_pair_ids # collect cc_pair_ids
cc_pair_ids: list[int] = [] cc_pair_ids: list[int] = []
@@ -81,6 +83,8 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
return True
def try_generate_document_cc_pair_cleanup_tasks( def try_generate_document_cc_pair_cleanup_tasks(
app: Celery, app: Celery,

View File

@@ -91,7 +91,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
soft_time_limit=JOB_TIMEOUT, soft_time_limit=JOB_TIMEOUT,
bind=True, bind=True,
) )
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None: def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock( lock_beat: RedisLock = r.lock(
@@ -102,7 +102,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
try: try:
# these tasks should never overlap # these tasks should never overlap
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
return return None
# get all cc pairs that need to be synced # get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = [] cc_pair_ids_to_sync: list[int] = []
@@ -131,6 +131,8 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
return True
def try_creating_permissions_sync_task( def try_creating_permissions_sync_task(
app: Celery, app: Celery,

View File

@@ -94,7 +94,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT, soft_time_limit=JOB_TIMEOUT,
bind=True, bind=True,
) )
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock( lock_beat: RedisLock = r.lock(
@@ -105,7 +105,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
try: try:
# these tasks should never overlap # these tasks should never overlap
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
return return None
cc_pair_ids_to_sync: list[int] = [] cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
@@ -149,6 +149,8 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
return True
def try_creating_external_group_sync_task( def try_creating_external_group_sync_task(
app: Celery, app: Celery,

View File

@@ -81,10 +81,10 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT, soft_time_limit=JOB_TIMEOUT,
bind=True, bind=True,
) )
def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id) r = get_redis_client(tenant_id=tenant_id)
lock_beat = r.lock( lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK, OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
) )
@@ -92,7 +92,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
try: try:
# these tasks should never overlap # these tasks should never overlap
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
return return None
cc_pair_ids: list[int] = [] cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
@@ -127,6 +127,8 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned(): if lock_beat.owned():
lock_beat.release() lock_beat.release()
return True
def try_creating_prune_generator_task( def try_creating_prune_generator_task(
celery_app: Celery, celery_app: Celery,

View File

@@ -88,7 +88,7 @@ logger = setup_logger()
trail=False, trail=False,
bind=True, bind=True,
) )
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | None:
"""Runs periodically to check if any document needs syncing. """Runs periodically to check if any document needs syncing.
Generates sets of tasks for Celery if syncing is needed.""" Generates sets of tasks for Celery if syncing is needed."""
time_start = time.monotonic() time_start = time.monotonic()
@@ -103,7 +103,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
try: try:
# these tasks should never overlap # these tasks should never overlap
if not lock_beat.acquire(blocking=False): if not lock_beat.acquire(blocking=False):
return return None
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
try_generate_stale_document_sync_tasks( try_generate_stale_document_sync_tasks(
@@ -166,7 +166,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
time_elapsed = time.monotonic() - time_start time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}") task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return return True
def try_generate_stale_document_sync_tasks( def try_generate_stale_document_sync_tasks(