From 187a7d2da2a02ed986a9cc587c659ae0f94fb2b2 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 31 Oct 2024 15:17:35 -0700 Subject: [PATCH] validated approach --- backend/danswer/danswerbot/slack/listener.py | 48 +++++++++-- backend/ee/danswer/server/tenants/api.py | 1 + .../slackbot-service-deployment.yaml | 80 +++++++++++++++++++ 3 files changed, 121 insertions(+), 8 deletions(-) create mode 100644 deployment/kubernetes/slackbot-service-deployment.yaml diff --git a/backend/danswer/danswerbot/slack/listener.py b/backend/danswer/danswerbot/slack/listener.py index 2c4358cdf..9f8a854f0 100644 --- a/backend/danswer/danswerbot/slack/listener.py +++ b/backend/danswer/danswerbot/slack/listener.py @@ -111,6 +111,7 @@ class SlackbotHandler: self.slack_bot_tokens: Dict[str | None, SlackBotTokens] = {} self.running = True self.pod_id = self.get_pod_id() + self._shutdown_event = Event() logger.info(f"Pod ID: {self.pod_id}") # Set up signal handlers for graceful shutdown @@ -125,8 +126,15 @@ class SlackbotHandler: # Start background threads logger.info("Starting background threads") - threading.Thread(target=self.acquire_tenants_loop, daemon=True).start() - threading.Thread(target=self.heartbeat_loop, daemon=True).start() + self.acquire_thread = threading.Thread( + target=self.acquire_tenants_loop, daemon=True + ) + self.heartbeat_thread = threading.Thread( + target=self.heartbeat_loop, daemon=True + ) + + self.acquire_thread.start() + self.heartbeat_thread.start() logger.info("Background threads started") def get_pod_id(self) -> str: @@ -135,31 +143,33 @@ class SlackbotHandler: return pod_id def acquire_tenants_loop(self) -> None: - logger.info("Starting acquisition loop") - while self.running: + while not self._shutdown_event.is_set(): try: self.acquire_tenants() active_tenants_gauge.set(len(self.tenant_ids)) logger.debug(f"Current active tenants: {len(self.tenant_ids)}") except Exception as e: logger.exception(f"Error in Slack acquisition: {e}") - Event().wait(timeout=TENANT_ACQUISITION_INTERVAL) + self._shutdown_event.wait(timeout=TENANT_ACQUISITION_INTERVAL) def heartbeat_loop(self) -> None: - logger.info("Starting heartbeat loop") - while self.running: + while not self._shutdown_event.is_set(): try: self.send_heartbeats() logger.debug(f"Sent heartbeats for {len(self.tenant_ids)} tenants") except Exception as e: logger.exception(f"Error in heartbeat loop: {e}") - Event().wait(timeout=TENANT_HEARTBEAT_INTERVAL) + self._shutdown_event.wait(timeout=TENANT_HEARTBEAT_INTERVAL) def acquire_tenants(self) -> None: tenant_ids = get_all_tenant_ids() logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres") for tenant_id in tenant_ids: + if tenant_id in self.tenant_ids: + logger.debug(f"Tenant {tenant_id} already in self.tenant_ids") + continue + if len(self.tenant_ids) >= MAX_TENANTS_PER_POD: logger.info( f"Max tenants per pod reached ({MAX_TENANTS_PER_POD}) Not acquiring any more tenants" @@ -175,13 +185,22 @@ class SlackbotHandler: ex=TENANT_LOCK_EXPIRATION, ) if not acquired: + logger.debug(f"Another pod holds the lock for tenant {tenant_id}") continue + logger.debug(f"Acquired lock for tenant {tenant_id}") with get_session_with_tenant(tenant_id) as db_session: try: + logger.debug( + f"Setting tenant ID context variable for tenant {tenant_id}" + ) token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id or "public") slack_bot_tokens = fetch_tokens() + logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}") CURRENT_TENANT_ID_CONTEXTVAR.reset(token) + logger.debug( + f"Reset tenant ID context variable for tenant {tenant_id}" + ) if not slack_bot_tokens: logger.debug(f"No Slack bot token found for tenant {tenant_id}") @@ -259,9 +278,22 @@ class SlackbotHandler: logger.info(f"Stopped SocketModeClient for tenant {tenant_id}") def shutdown(self, signum: int | None, frame: FrameType | None) -> None: + if not self.running: + return + logger.info("Shutting down gracefully") self.running = False + self._shutdown_event.set() + + # Stop all socket clients + logger.info(f"Stopping {len(self.socket_clients)} socket clients") self.stop_socket_clients() + + # Wait for background threads to finish (with timeout) + logger.info("Waiting for background threads to finish...") + self.acquire_thread.join(timeout=5) + self.heartbeat_thread.join(timeout=5) + logger.info("Shutdown complete") sys.exit(0) diff --git a/backend/ee/danswer/server/tenants/api.py b/backend/ee/danswer/server/tenants/api.py index 8e79c0b37..de2a6e47a 100644 --- a/backend/ee/danswer/server/tenants/api.py +++ b/backend/ee/danswer/server/tenants/api.py @@ -56,6 +56,7 @@ def create_tenant( status_code=409, detail="User already belongs to an organization" ) + print(f"Ensuring schema exists for tenant {tenant_id}") try: if not ensure_schema_exists(tenant_id): logger.info(f"Created schema for tenant {tenant_id}") diff --git a/deployment/kubernetes/slackbot-service-deployment.yaml b/deployment/kubernetes/slackbot-service-deployment.yaml new file mode 100644 index 000000000..2986525c2 --- /dev/null +++ b/deployment/kubernetes/slackbot-service-deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: slack-bot-deployment + labels: + app: slack-bot +spec: + replicas: 1 + selector: + matchLabels: + app: slack-bot + template: + metadata: + labels: + app: slack-bot + spec: + containers: + - name: slack-bot + image: danswer/danswer-backend:latest + imagePullPolicy: IfNotPresent + command: ["python", "danswer/danswerbot/slack/listener.py"] + ports: + - containerPort: 8000 + resources: + requests: + cpu: "100m" + memory: "200Mi" + limits: + cpu: "500m" + memory: "500Mi" + readinessProbe: + httpGet: + path: /metrics + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /metrics + port: 8000 + initialDelaySeconds: 15 + periodSeconds: 20 + +--- +apiVersion: v1 +kind: Service +metadata: + name: slack-bot-service + labels: + app: slack-bot +spec: + selector: + app: slack-bot + ports: + # Port exposed for Prometheus metrics + - protocol: TCP + port: 8000 + targetPort: 8000 + type: ClusterIP + +--- +apiVersion: autoscaling/v2beta2 +kind: HorizontalPodAutoscaler + + name: slack-bot-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: slack-bot-deployment + minReplicas: 1 + maxReplicas: 10 + metrics: + - type: Pods + pods: + metric: + name: active_tenants + target: + type: AverageValue + averageValue: "40"