diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index af8ee833d27e..561c96960c36 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -1,5 +1,6 @@ import multiprocessing from typing import Any +from typing import cast from celery import bootsteps # type: ignore from celery import Celery @@ -95,6 +96,15 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: # by the primary worker. This is unnecessary in the multi tenant scenario r = get_redis_client(tenant_id=None) + # Log the role and slave count - being connected to a slave or slave count > 0 could be problematic + info: dict[str, Any] = cast(dict, r.info("replication")) + role: str = cast(str, info.get("role")) + connected_slaves: int = info.get("connected_slaves", 0) + + logger.info( + f"Redis INFO REPLICATION: role={role} connected_slaves={connected_slaves}" + ) + # For the moment, we're assuming that we are the only primary worker # that should be running. # TODO: maybe check for or clean up another zombie primary worker if we detect it diff --git a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py index 9b2bee8e4cca..9413dd978545 100644 --- a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py @@ -19,7 +19,7 @@ from danswer.db.engine import get_session_with_tenant from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.search_settings import get_all_search_settings from danswer.redis.redis_connector import RedisConnector -from danswer.redis.redis_connector_delete import RedisConnectorDeletionFenceData +from danswer.redis.redis_connector_delete import RedisConnectorDeletePayload from danswer.redis.redis_pool import get_redis_client @@ -118,7 +118,7 @@ def try_generate_document_cc_pair_cleanup_tasks( return None # set a basic fence to start - fence_payload = RedisConnectorDeletionFenceData( + fence_payload = RedisConnectorDeletePayload( num_tasks=None, submitted=datetime.now(timezone.utc), ) diff --git a/backend/danswer/redis/redis_connector_delete.py b/backend/danswer/redis/redis_connector_delete.py index 51b59ca92ed2..1b7a440b2e52 100644 --- a/backend/danswer/redis/redis_connector_delete.py +++ b/backend/danswer/redis/redis_connector_delete.py @@ -17,7 +17,7 @@ from danswer.db.document import construct_document_select_for_connector_credenti from danswer.db.models import Document as DbDocument -class RedisConnectorDeletionFenceData(BaseModel): +class RedisConnectorDeletePayload(BaseModel): num_tasks: int | None submitted: datetime @@ -54,20 +54,18 @@ class RedisConnectorDelete: return False @property - def payload(self) -> RedisConnectorDeletionFenceData | None: + def payload(self) -> RedisConnectorDeletePayload | None: # read related data and evaluate/print task progress fence_bytes = cast(bytes, self.redis.get(self.fence_key)) if fence_bytes is None: return None fence_str = fence_bytes.decode("utf-8") - payload = RedisConnectorDeletionFenceData.model_validate_json( - cast(str, fence_str) - ) + payload = RedisConnectorDeletePayload.model_validate_json(cast(str, fence_str)) return payload - def set_fence(self, payload: RedisConnectorDeletionFenceData | None) -> None: + def set_fence(self, payload: RedisConnectorDeletePayload | None) -> None: if not payload: self.redis.delete(self.fence_key) return