mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-27 20:38:32 +02:00
Bugfix/redis wait (#3169)
* rename to payload * log redis info replication on primary worker startup * fix mypy --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import multiprocessing
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
from celery import bootsteps # type: ignore
|
||||
from celery import Celery
|
||||
@@ -95,6 +96,15 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
||||
# by the primary worker. This is unnecessary in the multi tenant scenario
|
||||
r = get_redis_client(tenant_id=None)
|
||||
|
||||
# Log the role and slave count - being connected to a slave or slave count > 0 could be problematic
|
||||
info: dict[str, Any] = cast(dict, r.info("replication"))
|
||||
role: str = cast(str, info.get("role"))
|
||||
connected_slaves: int = info.get("connected_slaves", 0)
|
||||
|
||||
logger.info(
|
||||
f"Redis INFO REPLICATION: role={role} connected_slaves={connected_slaves}"
|
||||
)
|
||||
|
||||
# For the moment, we're assuming that we are the only primary worker
|
||||
# that should be running.
|
||||
# TODO: maybe check for or clean up another zombie primary worker if we detect it
|
||||
|
@@ -19,7 +19,7 @@ from danswer.db.engine import get_session_with_tenant
|
||||
from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.db.search_settings import get_all_search_settings
|
||||
from danswer.redis.redis_connector import RedisConnector
|
||||
from danswer.redis.redis_connector_delete import RedisConnectorDeletionFenceData
|
||||
from danswer.redis.redis_connector_delete import RedisConnectorDeletePayload
|
||||
from danswer.redis.redis_pool import get_redis_client
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
|
||||
return None
|
||||
|
||||
# set a basic fence to start
|
||||
fence_payload = RedisConnectorDeletionFenceData(
|
||||
fence_payload = RedisConnectorDeletePayload(
|
||||
num_tasks=None,
|
||||
submitted=datetime.now(timezone.utc),
|
||||
)
|
||||
|
@@ -17,7 +17,7 @@ from danswer.db.document import construct_document_select_for_connector_credenti
|
||||
from danswer.db.models import Document as DbDocument
|
||||
|
||||
|
||||
class RedisConnectorDeletionFenceData(BaseModel):
|
||||
class RedisConnectorDeletePayload(BaseModel):
|
||||
num_tasks: int | None
|
||||
submitted: datetime
|
||||
|
||||
@@ -54,20 +54,18 @@ class RedisConnectorDelete:
|
||||
return False
|
||||
|
||||
@property
|
||||
def payload(self) -> RedisConnectorDeletionFenceData | None:
|
||||
def payload(self) -> RedisConnectorDeletePayload | None:
|
||||
# read related data and evaluate/print task progress
|
||||
fence_bytes = cast(bytes, self.redis.get(self.fence_key))
|
||||
if fence_bytes is None:
|
||||
return None
|
||||
|
||||
fence_str = fence_bytes.decode("utf-8")
|
||||
payload = RedisConnectorDeletionFenceData.model_validate_json(
|
||||
cast(str, fence_str)
|
||||
)
|
||||
payload = RedisConnectorDeletePayload.model_validate_json(cast(str, fence_str))
|
||||
|
||||
return payload
|
||||
|
||||
def set_fence(self, payload: RedisConnectorDeletionFenceData | None) -> None:
|
||||
def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None:
|
||||
if not payload:
|
||||
self.redis.delete(self.fence_key)
|
||||
return
|
||||
|
Reference in New Issue
Block a user