update slack

This commit is contained in:
pablodanswer 2024-10-31 13:38:43 -07:00
parent 06f937cf93
commit 4b152aa3a7
2 changed files with 19 additions and 12 deletions

View File

@ -1,3 +1,5 @@
import os
from sqlalchemy.orm import Session
from danswer.db.models import SlackBotConfig
@ -59,3 +61,5 @@ TENANT_HEARTBEAT_EXPIRATION = 180 # How long before a tenant's heartbeat expire
TENANT_ACQUISITION_INTERVAL = (
60 # How often pods attempt to acquire unprocessed tenants
)
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))

View File

@ -1,4 +1,5 @@
import asyncio
import os
import signal
import sys
import threading
@ -23,6 +24,7 @@ from danswer.configs.danswerbot_configs import DANSWER_BOT_RESPOND_EVERY_CHANNEL
from danswer.configs.danswerbot_configs import NOTIFY_SLACKBOT_NO_ANSWER
from danswer.connectors.slack.utils import expert_info_from_slack_id
from danswer.danswerbot.slack.config import get_slack_bot_config_for_channel
from danswer.danswerbot.slack.config import MAX_TENANTS_PER_POD
from danswer.danswerbot.slack.config import TENANT_ACQUISITION_INTERVAL
from danswer.danswerbot.slack.config import TENANT_HEARTBEAT_EXPIRATION
from danswer.danswerbot.slack.config import TENANT_HEARTBEAT_INTERVAL
@ -104,9 +106,9 @@ _OFFICIAL_SLACKBOT_USER_ID = "USLACKBOT"
class SlackbotHandler:
def __init__(self) -> None:
logger.info("Initializing SlackbotHandler")
self.tenant_ids: Set[str] = set()
self.socket_clients: Dict[str, TenantSocketModeClient] = {}
self.slack_bot_tokens: Dict[str, SlackBotTokens] = {}
self.tenant_ids: Set[str | None] = set()
self.socket_clients: Dict[str | None, TenantSocketModeClient] = {}
self.slack_bot_tokens: Dict[str | None, SlackBotTokens] = {}
self.running = True
self.pod_id = self.get_pod_id()
logger.info(f"Pod ID: {self.pod_id}")
@ -128,21 +130,19 @@ class SlackbotHandler:
logger.info("Background threads started")
def get_pod_id(self) -> str:
import os
pod_id = os.environ.get("HOSTNAME", "unknown_pod")
logger.info(f"Retrieved pod ID: {pod_id}")
return pod_id
def acquire_tenants_loop(self) -> None:
logger.info("Starting tenant acquisition loop")
logger.info("Starting acquisition loop")
while self.running:
try:
self.acquire_tenants()
active_tenants_gauge.set(len(self.tenant_ids))
logger.debug(f"Current active tenants: {len(self.tenant_ids)}")
except Exception as e:
logger.exception(f"Error in tenant acquisition: {e}")
logger.exception(f"Error in Slack acquisition: {e}")
Event().wait(timeout=TENANT_ACQUISITION_INTERVAL)
def heartbeat_loop(self) -> None:
@ -160,6 +160,12 @@ class SlackbotHandler:
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
for tenant_id in tenant_ids:
if len(self.tenant_ids) >= MAX_TENANTS_PER_POD:
logger.info(
f"Max tenants per pod reached ({MAX_TENANTS_PER_POD}) Not acquiring any more tenants"
)
break
redis_client = get_redis_client(tenant_id=tenant_id)
pod_id = self.pod_id
acquired = redis_client.set(
@ -169,7 +175,7 @@ class SlackbotHandler:
ex=TENANT_LOCK_EXPIRATION,
)
if not acquired:
continue # Another pod holds the lock
continue
with get_session_with_tenant(tenant_id) as db_session:
try:
@ -194,7 +200,6 @@ class SlackbotHandler:
f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting"
)
else:
# Initial setup for this tenant
search_settings = get_current_search_settings(db_session)
embedding_model = EmbeddingModel.from_db_model(
search_settings=search_settings,
@ -206,10 +211,8 @@ class SlackbotHandler:
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
if tenant_id in self.socket_clients:
# Close the existing socket client
asyncio.run(self.socket_clients[tenant_id].close())
# Start a new socket client for the tenant
self.start_socket_client(tenant_id, slack_bot_tokens)
except KvKeyNotFoundError:
@ -234,7 +237,7 @@ class SlackbotHandler:
)
def start_socket_client(
self, tenant_id: str, slack_bot_tokens: SlackBotTokens
self, tenant_id: str | None, slack_bot_tokens: SlackBotTokens
) -> None:
logger.info(f"Starting socket client for tenant {tenant_id}")
socket_client = _get_socket_client(slack_bot_tokens, tenant_id)