add some return types to distinguish when the task is actually performing work

This commit is contained in:
Richard Kuo (Danswer) 2024-12-31 00:10:33 -08:00
parent 1955c1d67b
commit b43a8e48c6
5 changed files with 22 additions and 12 deletions

View File

@ -34,7 +34,9 @@ class TaskDependencyError(RuntimeError):
trail=False,
bind=True,
)
def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_connector_deletion_task(
self: Task, *, tenant_id: str | None
) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock(
@ -45,7 +47,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None
# collect cc_pair_ids
cc_pair_ids: list[int] = []
@ -81,6 +83,8 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
if lock_beat.owned():
lock_beat.release()
return True
def try_generate_document_cc_pair_cleanup_tasks(
app: Celery,

View File

@ -91,7 +91,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock(
@ -102,7 +102,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None
# get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = []
@ -131,6 +131,8 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
if lock_beat.owned():
lock_beat.release()
return True
def try_creating_permissions_sync_task(
app: Celery,

View File

@ -94,7 +94,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
lock_beat: RedisLock = r.lock(
@ -105,7 +105,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None
cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
@ -149,6 +149,8 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()
return True
def try_creating_external_group_sync_task(
app: Celery,

View File

@ -81,10 +81,10 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)
@ -92,7 +92,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
@ -127,6 +127,8 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()
return True
def try_creating_prune_generator_task(
celery_app: Celery,

View File

@ -88,7 +88,7 @@ logger = setup_logger()
trail=False,
bind=True,
)
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | None:
"""Runs periodically to check if any document needs syncing.
Generates sets of tasks for Celery if syncing is needed."""
time_start = time.monotonic()
@ -103,7 +103,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None
with get_session_with_tenant(tenant_id) as db_session:
try_generate_stale_document_sync_tasks(
@ -166,7 +166,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return
return True
def try_generate_stale_document_sync_tasks(