diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index c3a0b4e80e1..14da1066448 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -420,6 +420,9 @@ EGNYTE_CLIENT_SECRET = os.getenv("EGNYTE_CLIENT_SECRET") LINEAR_CLIENT_ID = os.getenv("LINEAR_CLIENT_ID") LINEAR_CLIENT_SECRET = os.getenv("LINEAR_CLIENT_SECRET") +# Slack specific configs +SLACK_NUM_THREADS = int(os.getenv("SLACK_NUM_THREADS") or 2) + DASK_JOB_CLIENT_ENABLED = ( os.environ.get("DASK_JOB_CLIENT_ENABLED", "").lower() == "true" ) diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index d220020b9fc..83e52c410dc 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -18,6 +18,7 @@ from typing_extensions import override from onyx.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS from onyx.configs.app_configs import INDEX_BATCH_SIZE +from onyx.configs.app_configs import SLACK_NUM_THREADS from onyx.configs.constants import DocumentSource from onyx.connectors.exceptions import ConnectorValidationError from onyx.connectors.exceptions import CredentialExpiredError @@ -486,7 +487,6 @@ def _process_message( class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): - MAX_WORKERS = 2 FAST_TIMEOUT = 1 def __init__( @@ -496,10 +496,12 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): # regexes, and will only index channels that fully match the regexes channel_regex_enabled: bool = False, batch_size: int = INDEX_BATCH_SIZE, + num_threads: int = SLACK_NUM_THREADS, ) -> None: self.channels = channels self.channel_regex_enabled = channel_regex_enabled self.batch_size = batch_size + self.num_threads = num_threads self.client: WebClient | None = None self.fast_client: WebClient | None = None # just used for efficiency @@ -593,7 +595,7 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]): new_latest = message_batch[-1]["ts"] if message_batch else latest # Process messages in parallel using ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=SlackConnector.MAX_WORKERS) as executor: + with ThreadPoolExecutor(max_workers=self.num_threads) as executor: futures: list[Future[ProcessedSlackMessage]] = [] for message in message_batch: # Capture the current context so that the thread gets the current tenant ID