From 0292ca2445a8d0fb8089d1f1e9e67d9cadb826d6 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Thu, 20 Mar 2025 09:56:05 -0700 Subject: [PATCH] Add option to control # of slack threads (#4310) --- backend/onyx/configs/app_configs.py | 3 +++ backend/onyx/connectors/slack/connector.py | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index c3a0b4e80e..14da106644 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -420,6 +420,9 @@ EGNYTE_CLIENT_SECRET = os.getenv("EGNYTE_CLIENT_SECRET") LINEAR_CLIENT_ID = os.getenv("LINEAR_CLIENT_ID") LINEAR_CLIENT_SECRET = os.getenv("LINEAR_CLIENT_SECRET") +# Slack specific configs +SLACK_NUM_THREADS = int(os.getenv("SLACK_NUM_THREADS") or 2) + DASK_JOB_CLIENT_ENABLED = ( os.environ.get("DASK_JOB_CLIENT_ENABLED", "").lower() == "true" ) diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index d220020b9f..83e52c410d 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -18,6 +18,7 @@ from typing_extensions import override from onyx.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS from onyx.configs.app_configs import INDEX_BATCH_SIZE +from onyx.configs.app_configs import SLACK_NUM_THREADS from onyx.configs.constants import DocumentSource from onyx.connectors.exceptions import ConnectorValidationError from onyx.connectors.exceptions import CredentialExpiredError @@ -486,7 +487,6 @@ def _process_message( class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): - MAX_WORKERS = 2 FAST_TIMEOUT = 1 def __init__( @@ -496,10 +496,12 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): # regexes, and will only index channels that fully match the regexes channel_regex_enabled: bool = False, batch_size: int = INDEX_BATCH_SIZE, + num_threads: int = SLACK_NUM_THREADS, ) -> None: self.channels = channels self.channel_regex_enabled = channel_regex_enabled self.batch_size = batch_size + self.num_threads = num_threads self.client: WebClient | None = None self.fast_client: WebClient | None = None # just used for efficiency @@ -593,7 +595,7 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): new_latest = message_batch[-1]["ts"] if message_batch else latest # Process messages in parallel using ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=SlackConnector.MAX_WORKERS) as executor: + with ThreadPoolExecutor(max_workers=self.num_threads) as executor: futures: list[Future[ProcessedSlackMessage]] = [] for message in message_batch: # Capture the current context so that the thread gets the current tenant ID