mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-07-15 23:53:19 +02:00
Add option to control # of slack threads (#4310)
This commit is contained in:
@ -420,6 +420,9 @@ EGNYTE_CLIENT_SECRET = os.getenv("EGNYTE_CLIENT_SECRET")
|
|||||||
LINEAR_CLIENT_ID = os.getenv("LINEAR_CLIENT_ID")
|
LINEAR_CLIENT_ID = os.getenv("LINEAR_CLIENT_ID")
|
||||||
LINEAR_CLIENT_SECRET = os.getenv("LINEAR_CLIENT_SECRET")
|
LINEAR_CLIENT_SECRET = os.getenv("LINEAR_CLIENT_SECRET")
|
||||||
|
|
||||||
|
# Slack specific configs
|
||||||
|
SLACK_NUM_THREADS = int(os.getenv("SLACK_NUM_THREADS") or 2)
|
||||||
|
|
||||||
DASK_JOB_CLIENT_ENABLED = (
|
DASK_JOB_CLIENT_ENABLED = (
|
||||||
os.environ.get("DASK_JOB_CLIENT_ENABLED", "").lower() == "true"
|
os.environ.get("DASK_JOB_CLIENT_ENABLED", "").lower() == "true"
|
||||||
)
|
)
|
||||||
|
@ -18,6 +18,7 @@ from typing_extensions import override
|
|||||||
|
|
||||||
from onyx.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS
|
from onyx.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS
|
||||||
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
||||||
|
from onyx.configs.app_configs import SLACK_NUM_THREADS
|
||||||
from onyx.configs.constants import DocumentSource
|
from onyx.configs.constants import DocumentSource
|
||||||
from onyx.connectors.exceptions import ConnectorValidationError
|
from onyx.connectors.exceptions import ConnectorValidationError
|
||||||
from onyx.connectors.exceptions import CredentialExpiredError
|
from onyx.connectors.exceptions import CredentialExpiredError
|
||||||
@ -486,7 +487,6 @@ def _process_message(
|
|||||||
|
|
||||||
|
|
||||||
class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]):
|
class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]):
|
||||||
MAX_WORKERS = 2
|
|
||||||
FAST_TIMEOUT = 1
|
FAST_TIMEOUT = 1
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -496,10 +496,12 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]):
|
|||||||
# regexes, and will only index channels that fully match the regexes
|
# regexes, and will only index channels that fully match the regexes
|
||||||
channel_regex_enabled: bool = False,
|
channel_regex_enabled: bool = False,
|
||||||
batch_size: int = INDEX_BATCH_SIZE,
|
batch_size: int = INDEX_BATCH_SIZE,
|
||||||
|
num_threads: int = SLACK_NUM_THREADS,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.channels = channels
|
self.channels = channels
|
||||||
self.channel_regex_enabled = channel_regex_enabled
|
self.channel_regex_enabled = channel_regex_enabled
|
||||||
self.batch_size = batch_size
|
self.batch_size = batch_size
|
||||||
|
self.num_threads = num_threads
|
||||||
self.client: WebClient | None = None
|
self.client: WebClient | None = None
|
||||||
self.fast_client: WebClient | None = None
|
self.fast_client: WebClient | None = None
|
||||||
# just used for efficiency
|
# just used for efficiency
|
||||||
@ -593,7 +595,7 @@ class SlackConnector(SlimConnector, CheckpointConnector[SlackCheckpoint]):
|
|||||||
new_latest = message_batch[-1]["ts"] if message_batch else latest
|
new_latest = message_batch[-1]["ts"] if message_batch else latest
|
||||||
|
|
||||||
# Process messages in parallel using ThreadPoolExecutor
|
# Process messages in parallel using ThreadPoolExecutor
|
||||||
with ThreadPoolExecutor(max_workers=SlackConnector.MAX_WORKERS) as executor:
|
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
|
||||||
futures: list[Future[ProcessedSlackMessage]] = []
|
futures: list[Future[ProcessedSlackMessage]] = []
|
||||||
for message in message_batch:
|
for message in message_batch:
|
||||||
# Capture the current context so that the thread gets the current tenant ID
|
# Capture the current context so that the thread gets the current tenant ID
|
||||||
|
Reference in New Issue
Block a user