validated approach

This commit is contained in:
pablodanswer 2024-10-31 15:17:35 -07:00
parent 4b152aa3a7
commit 187a7d2da2
3 changed files with 121 additions and 8 deletions

View File

@ -111,6 +111,7 @@ class SlackbotHandler:
self.slack_bot_tokens: Dict[str | None, SlackBotTokens] = {}
self.running = True
self.pod_id = self.get_pod_id()
self._shutdown_event = Event()
logger.info(f"Pod ID: {self.pod_id}")
# Set up signal handlers for graceful shutdown
@ -125,8 +126,15 @@ class SlackbotHandler:
# Start background threads
logger.info("Starting background threads")
threading.Thread(target=self.acquire_tenants_loop, daemon=True).start()
threading.Thread(target=self.heartbeat_loop, daemon=True).start()
self.acquire_thread = threading.Thread(
target=self.acquire_tenants_loop, daemon=True
)
self.heartbeat_thread = threading.Thread(
target=self.heartbeat_loop, daemon=True
)
self.acquire_thread.start()
self.heartbeat_thread.start()
logger.info("Background threads started")
def get_pod_id(self) -> str:
@ -135,31 +143,33 @@ class SlackbotHandler:
return pod_id
def acquire_tenants_loop(self) -> None:
logger.info("Starting acquisition loop")
while self.running:
while not self._shutdown_event.is_set():
try:
self.acquire_tenants()
active_tenants_gauge.set(len(self.tenant_ids))
logger.debug(f"Current active tenants: {len(self.tenant_ids)}")
except Exception as e:
logger.exception(f"Error in Slack acquisition: {e}")
Event().wait(timeout=TENANT_ACQUISITION_INTERVAL)
self._shutdown_event.wait(timeout=TENANT_ACQUISITION_INTERVAL)
def heartbeat_loop(self) -> None:
logger.info("Starting heartbeat loop")
while self.running:
while not self._shutdown_event.is_set():
try:
self.send_heartbeats()
logger.debug(f"Sent heartbeats for {len(self.tenant_ids)} tenants")
except Exception as e:
logger.exception(f"Error in heartbeat loop: {e}")
Event().wait(timeout=TENANT_HEARTBEAT_INTERVAL)
self._shutdown_event.wait(timeout=TENANT_HEARTBEAT_INTERVAL)
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
for tenant_id in tenant_ids:
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue
if len(self.tenant_ids) >= MAX_TENANTS_PER_POD:
logger.info(
f"Max tenants per pod reached ({MAX_TENANTS_PER_POD}) Not acquiring any more tenants"
@ -175,13 +185,22 @@ class SlackbotHandler:
ex=TENANT_LOCK_EXPIRATION,
)
if not acquired:
logger.debug(f"Another pod holds the lock for tenant {tenant_id}")
continue
logger.debug(f"Acquired lock for tenant {tenant_id}")
with get_session_with_tenant(tenant_id) as db_session:
try:
logger.debug(
f"Setting tenant ID context variable for tenant {tenant_id}"
)
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id or "public")
slack_bot_tokens = fetch_tokens()
logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}")
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
logger.debug(
f"Reset tenant ID context variable for tenant {tenant_id}"
)
if not slack_bot_tokens:
logger.debug(f"No Slack bot token found for tenant {tenant_id}")
@ -259,9 +278,22 @@ class SlackbotHandler:
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
if not self.running:
return
logger.info("Shutting down gracefully")
self.running = False
self._shutdown_event.set()
# Stop all socket clients
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
self.stop_socket_clients()
# Wait for background threads to finish (with timeout)
logger.info("Waiting for background threads to finish...")
self.acquire_thread.join(timeout=5)
self.heartbeat_thread.join(timeout=5)
logger.info("Shutdown complete")
sys.exit(0)

View File

@ -56,6 +56,7 @@ def create_tenant(
status_code=409, detail="User already belongs to an organization"
)
print(f"Ensuring schema exists for tenant {tenant_id}")
try:
if not ensure_schema_exists(tenant_id):
logger.info(f"Created schema for tenant {tenant_id}")

View File

@ -0,0 +1,80 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: slack-bot-deployment
labels:
app: slack-bot
spec:
replicas: 1
selector:
matchLabels:
app: slack-bot
template:
metadata:
labels:
app: slack-bot
spec:
containers:
- name: slack-bot
image: danswer/danswer-backend:latest
imagePullPolicy: IfNotPresent
command: ["python", "danswer/danswerbot/slack/listener.py"]
ports:
- containerPort: 8000
resources:
requests:
cpu: "100m"
memory: "200Mi"
limits:
cpu: "500m"
memory: "500Mi"
readinessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 10
periodSeconds: 10
livenessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
name: slack-bot-service
labels:
app: slack-bot
spec:
selector:
app: slack-bot
ports:
# Port exposed for Prometheus metrics
- protocol: TCP
port: 8000
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
name: slack-bot-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: slack-bot-deployment
minReplicas: 1
maxReplicas: 10
metrics:
- type: Pods
pods:
metric:
name: active_tenants
target:
type: AverageValue
averageValue: "40"