more aggressive handling of tasks blocking deletion (#4093)

* more aggressive handling of tasks blocking deletion

* comment updated

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-02-24 14:41:13 -08:00 committed by GitHub
parent 7d40676398
commit c9a3b45ad4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 4 deletions

View File

@ -52,6 +52,51 @@ class TaskDependencyError(RuntimeError):
with connector deletion."""
def revoke_tasks_blocking_deletion(
redis_connector: RedisConnector, db_session: Session, app: Celery
) -> None:
search_settings_list = get_all_search_settings(db_session)
for search_settings in search_settings_list:
redis_connector_index = redis_connector.new_index(search_settings.id)
try:
index_payload = redis_connector_index.payload
if index_payload and index_payload.celery_task_id:
app.control.revoke(index_payload.celery_task_id)
task_logger.info(
f"Revoked indexing task {index_payload.celery_task_id}."
)
except Exception:
task_logger.exception("Exception while revoking indexing task")
try:
permissions_sync_payload = redis_connector.permissions.payload
if permissions_sync_payload and permissions_sync_payload.celery_task_id:
app.control.revoke(permissions_sync_payload.celery_task_id)
task_logger.info(
f"Revoked permissions sync task {permissions_sync_payload.celery_task_id}."
)
except Exception:
task_logger.exception("Exception while revoking pruning task")
try:
prune_payload = redis_connector.prune.payload
if prune_payload and prune_payload.celery_task_id:
app.control.revoke(prune_payload.celery_task_id)
task_logger.info(f"Revoked pruning task {prune_payload.celery_task_id}.")
except Exception:
task_logger.exception("Exception while revoking permissions sync task")
try:
external_group_sync_payload = redis_connector.external_group_sync.payload
if external_group_sync_payload and external_group_sync_payload.celery_task_id:
app.control.revoke(external_group_sync_payload.celery_task_id)
task_logger.info(
f"Revoked external group sync task {external_group_sync_payload.celery_task_id}."
)
except Exception:
task_logger.exception("Exception while revoking external group sync task")
@shared_task(
name=OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
ignore_result=True,
@ -70,7 +115,7 @@ def check_for_connector_deletion_task(
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)
# these tasks should never overlap
# Prevent this task from overlapping with itself
if not lock_beat.acquire(blocking=False):
return None
@ -92,9 +137,38 @@ def check_for_connector_deletion_task(
)
except TaskDependencyError as e:
# this means we wanted to start deleting but dependent tasks were running
# Leave a stop signal to clear indexing and pruning tasks more quickly
# on the first error, we set a stop signal and revoke the dependent tasks
# on subsequent errors, we hard reset blocking fences after our specified timeout
# is exceeded
task_logger.info(str(e))
redis_connector.stop.set_fence(True)
if not redis_connector.stop.fenced:
# one time revoke of celery tasks
task_logger.info("Revoking any tasks blocking deletion.")
revoke_tasks_blocking_deletion(
redis_connector, db_session, self.app
)
redis_connector.stop.set_fence(True)
redis_connector.stop.set_timeout()
else:
# stop signal already set
if redis_connector.stop.timed_out:
# waiting too long, just reset blocking fences
task_logger.info(
"Timed out waiting for tasks blocking deletion. Resetting blocking fences."
)
search_settings_list = get_all_search_settings(db_session)
for search_settings in search_settings_list:
redis_connector_index = redis_connector.new_index(
search_settings.id
)
redis_connector_index.reset()
redis_connector.prune.reset()
redis_connector.permissions.reset()
redis_connector.external_group_sync.reset()
else:
# just wait
pass
else:
# clear the stop signal if it exists ... no longer needed
redis_connector.stop.set_fence(False)

View File

@ -5,7 +5,13 @@ class RedisConnectorStop:
"""Manages interactions with redis for stop signaling. Should only be accessed
through RedisConnector."""
FENCE_PREFIX = "connectorstop_fence"
PREFIX = "connectorstop"
FENCE_PREFIX = f"{PREFIX}_fence"
# if this timeout is exceeded, the caller may decide to take more
# drastic measures
TIMEOUT_PREFIX = f"{PREFIX}_timeout"
TIMEOUT_TTL = 300
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
self.tenant_id: str | None = tenant_id
@ -13,6 +19,7 @@ class RedisConnectorStop:
self.redis = redis
self.fence_key: str = f"{self.FENCE_PREFIX}_{id}"
self.timeout_key: str = f"{self.TIMEOUT_PREFIX}_{id}"
@property
def fenced(self) -> bool:
@ -28,7 +35,22 @@ class RedisConnectorStop:
self.redis.set(self.fence_key, 0)
@property
def timed_out(self) -> bool:
if self.redis.exists(self.timeout_key):
return False
return True
def set_timeout(self) -> None:
"""After calling this, call timed_out to determine if the timeout has been
exceeded."""
self.redis.set(f"{self.timeout_key}", 0, ex=self.TIMEOUT_TTL)
@staticmethod
def reset_all(r: redis.Redis) -> None:
for key in r.scan_iter(RedisConnectorStop.FENCE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorStop.TIMEOUT_PREFIX + "*"):
r.delete(key)