mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-07-09 14:11:33 +02:00
clear indexing fences with no celery tasks queued (#3482)
* allow beat tasks to expire. it isn't important that they all run * validate fences are in a good state and cancel/fail them if not * add function timings for important beat tasks * optimize lookups, add lots of comments * review changes --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com> Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
@ -31,6 +31,10 @@ class RedisConnectorIndex:
|
||||
|
||||
TERMINATE_PREFIX = PREFIX + "_terminate" # connectorindexing_terminate
|
||||
|
||||
# used to signal the overall workflow is still active
|
||||
# it's difficult to prevent
|
||||
ACTIVE_PREFIX = PREFIX + "_active"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tenant_id: str | None,
|
||||
@ -54,6 +58,7 @@ class RedisConnectorIndex:
|
||||
f"{self.GENERATOR_LOCK_PREFIX}_{id}/{search_settings_id}"
|
||||
)
|
||||
self.terminate_key = f"{self.TERMINATE_PREFIX}_{id}/{search_settings_id}"
|
||||
self.active_key = f"{self.ACTIVE_PREFIX}_{id}/{search_settings_id}"
|
||||
|
||||
@classmethod
|
||||
def fence_key_with_ids(cls, cc_pair_id: int, search_settings_id: int) -> str:
|
||||
@ -107,6 +112,26 @@ class RedisConnectorIndex:
|
||||
# 10 minute TTL is good.
|
||||
self.redis.set(f"{self.terminate_key}_{celery_task_id}", 0, ex=600)
|
||||
|
||||
def set_active(self) -> None:
|
||||
"""This sets a signal to keep the indexing flow from getting cleaned up within
|
||||
the expiration time.
|
||||
|
||||
The slack in timing is needed to avoid race conditions where simply checking
|
||||
the celery queue and task status could result in race conditions."""
|
||||
self.redis.set(self.active_key, 0, ex=300)
|
||||
|
||||
def active(self) -> bool:
|
||||
if self.redis.exists(self.active_key):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def generator_locked(self) -> bool:
|
||||
if self.redis.exists(self.generator_lock_key):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def set_generator_complete(self, payload: int | None) -> None:
|
||||
if not payload:
|
||||
self.redis.delete(self.generator_complete_key)
|
||||
@ -138,6 +163,7 @@ class RedisConnectorIndex:
|
||||
return status
|
||||
|
||||
def reset(self) -> None:
|
||||
self.redis.delete(self.active_key)
|
||||
self.redis.delete(self.generator_lock_key)
|
||||
self.redis.delete(self.generator_progress_key)
|
||||
self.redis.delete(self.generator_complete_key)
|
||||
|
Reference in New Issue
Block a user