mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-01 16:38:11 +02:00
A new setting 'is_ephemeral' has been added to the Slack channel configurations. Key features/effects: - if is_ephemeral is set for standard channel (and a Search Assistant is chosen): - the answer is only shown to user as an ephemeral message - the user has access to his private documents for a search (as the answer is only shown to them) - the user has the ability to share the answer with the channel or keep private - a recipient list cannot be defined if the channel is set up as ephemeral - if is_ephemeral is set and DM with bot: - the user has access to private docs in searches - the message is not sent as ephemeral, as it is a 1:1 discussion with bot - if is_ephemeral is not set but recipient list is set: - the user search does *not* have access to their private documents as the information goes to the recipient list team members, and they may have different access rights - Overall: - Unless the channel is set to is_ephemeral or it is a direct conversation with the Bot, only public docs are accessible - The ACL is never bypassed, also not in cases where the admin explicitly attached a document set to the bot config.
701 lines
24 KiB
Python
701 lines
24 KiB
Python
import logging
|
|
import random
|
|
import re
|
|
import string
|
|
import time
|
|
import uuid
|
|
from collections.abc import Generator
|
|
from contextlib import contextmanager
|
|
from typing import Any
|
|
from typing import cast
|
|
|
|
from retry import retry
|
|
from slack_sdk import WebClient
|
|
from slack_sdk.errors import SlackApiError
|
|
from slack_sdk.models.blocks import Block
|
|
from slack_sdk.models.blocks import SectionBlock
|
|
from slack_sdk.models.metadata import Metadata
|
|
from slack_sdk.socket_mode import SocketModeClient
|
|
|
|
from onyx.configs.app_configs import DISABLE_TELEMETRY
|
|
from onyx.configs.constants import ID_SEPARATOR
|
|
from onyx.configs.constants import MessageType
|
|
from onyx.configs.onyxbot_configs import DANSWER_BOT_FEEDBACK_VISIBILITY
|
|
from onyx.configs.onyxbot_configs import DANSWER_BOT_MAX_QPM
|
|
from onyx.configs.onyxbot_configs import DANSWER_BOT_MAX_WAIT_TIME
|
|
from onyx.configs.onyxbot_configs import DANSWER_BOT_NUM_RETRIES
|
|
from onyx.configs.onyxbot_configs import (
|
|
DANSWER_BOT_RESPONSE_LIMIT_PER_TIME_PERIOD,
|
|
)
|
|
from onyx.configs.onyxbot_configs import (
|
|
DANSWER_BOT_RESPONSE_LIMIT_TIME_PERIOD_SECONDS,
|
|
)
|
|
from onyx.connectors.slack.utils import make_slack_api_rate_limited
|
|
from onyx.connectors.slack.utils import SlackTextCleaner
|
|
from onyx.db.engine import get_session_with_current_tenant
|
|
from onyx.db.users import get_user_by_email
|
|
from onyx.llm.exceptions import GenAIDisabledException
|
|
from onyx.llm.factory import get_default_llms
|
|
from onyx.llm.utils import dict_based_prompt_to_langchain_prompt
|
|
from onyx.llm.utils import message_to_string
|
|
from onyx.onyxbot.slack.constants import FeedbackVisibility
|
|
from onyx.onyxbot.slack.models import ThreadMessage
|
|
from onyx.prompts.miscellaneous_prompts import SLACK_LANGUAGE_REPHRASE_PROMPT
|
|
from onyx.utils.logger import setup_logger
|
|
from onyx.utils.telemetry import optional_telemetry
|
|
from onyx.utils.telemetry import RecordType
|
|
from onyx.utils.text_processing import replace_whitespaces_w_space
|
|
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
_DANSWER_BOT_SLACK_BOT_ID: str | None = None
|
|
_DANSWER_BOT_MESSAGE_COUNT: int = 0
|
|
_DANSWER_BOT_COUNT_START_TIME: float = time.time()
|
|
|
|
|
|
def get_onyx_bot_slack_bot_id(web_client: WebClient) -> Any:
|
|
global _DANSWER_BOT_SLACK_BOT_ID
|
|
if _DANSWER_BOT_SLACK_BOT_ID is None:
|
|
_DANSWER_BOT_SLACK_BOT_ID = web_client.auth_test().get("user_id")
|
|
return _DANSWER_BOT_SLACK_BOT_ID
|
|
|
|
|
|
def check_message_limit() -> bool:
|
|
"""
|
|
This isnt a perfect solution.
|
|
High traffic at the end of one period and start of another could cause
|
|
the limit to be exceeded.
|
|
"""
|
|
if DANSWER_BOT_RESPONSE_LIMIT_PER_TIME_PERIOD == 0:
|
|
return True
|
|
global _DANSWER_BOT_MESSAGE_COUNT
|
|
global _DANSWER_BOT_COUNT_START_TIME
|
|
time_since_start = time.time() - _DANSWER_BOT_COUNT_START_TIME
|
|
if time_since_start > DANSWER_BOT_RESPONSE_LIMIT_TIME_PERIOD_SECONDS:
|
|
_DANSWER_BOT_MESSAGE_COUNT = 0
|
|
_DANSWER_BOT_COUNT_START_TIME = time.time()
|
|
if (_DANSWER_BOT_MESSAGE_COUNT + 1) > DANSWER_BOT_RESPONSE_LIMIT_PER_TIME_PERIOD:
|
|
logger.error(
|
|
f"OnyxBot has reached the message limit {DANSWER_BOT_RESPONSE_LIMIT_PER_TIME_PERIOD}"
|
|
f" for the time period {DANSWER_BOT_RESPONSE_LIMIT_TIME_PERIOD_SECONDS} seconds."
|
|
" These limits are configurable in backend/onyx/configs/onyxbot_configs.py"
|
|
)
|
|
return False
|
|
_DANSWER_BOT_MESSAGE_COUNT += 1
|
|
return True
|
|
|
|
|
|
def rephrase_slack_message(msg: str) -> str:
|
|
def _get_rephrase_message() -> list[dict[str, str]]:
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": SLACK_LANGUAGE_REPHRASE_PROMPT.format(query=msg),
|
|
},
|
|
]
|
|
|
|
return messages
|
|
|
|
try:
|
|
llm, _ = get_default_llms(timeout=5)
|
|
except GenAIDisabledException:
|
|
logger.warning("Unable to rephrase Slack user message, Gen AI disabled")
|
|
return msg
|
|
messages = _get_rephrase_message()
|
|
filled_llm_prompt = dict_based_prompt_to_langchain_prompt(messages)
|
|
model_output = message_to_string(llm.invoke(filled_llm_prompt))
|
|
logger.debug(model_output)
|
|
|
|
return model_output
|
|
|
|
|
|
def update_emote_react(
|
|
emoji: str,
|
|
channel: str,
|
|
message_ts: str | None,
|
|
remove: bool,
|
|
client: WebClient,
|
|
) -> None:
|
|
try:
|
|
if not message_ts:
|
|
logger.error(
|
|
f"Tried to remove a react in {channel} but no message specified"
|
|
)
|
|
return
|
|
|
|
func = client.reactions_remove if remove else client.reactions_add
|
|
slack_call = make_slack_api_rate_limited(func) # type: ignore
|
|
slack_call(
|
|
name=emoji,
|
|
channel=channel,
|
|
timestamp=message_ts,
|
|
)
|
|
except SlackApiError as e:
|
|
if remove:
|
|
logger.error(f"Failed to remove Reaction due to: {e}")
|
|
else:
|
|
logger.error(f"Was not able to react to user message due to: {e}")
|
|
|
|
|
|
def remove_onyx_bot_tag(message_str: str, client: WebClient) -> str:
|
|
bot_tag_id = get_onyx_bot_slack_bot_id(web_client=client)
|
|
return re.sub(rf"<@{bot_tag_id}>\s", "", message_str)
|
|
|
|
|
|
def _check_for_url_in_block(block: Block) -> bool:
|
|
"""
|
|
Check if the block has a key that contains "url" in it
|
|
"""
|
|
block_dict = block.to_dict()
|
|
|
|
def check_dict_for_url(d: dict) -> bool:
|
|
for key, value in d.items():
|
|
if "url" in key.lower():
|
|
return True
|
|
if isinstance(value, dict):
|
|
if check_dict_for_url(value):
|
|
return True
|
|
elif isinstance(value, list):
|
|
for item in value:
|
|
if isinstance(item, dict) and check_dict_for_url(item):
|
|
return True
|
|
return False
|
|
|
|
return check_dict_for_url(block_dict)
|
|
|
|
|
|
def _build_error_block(error_message: str) -> Block:
|
|
"""
|
|
Build an error block to display in slack so that the user can see
|
|
the error without completely breaking
|
|
"""
|
|
display_text = (
|
|
"There was an error displaying all of the Onyx answers."
|
|
f" Please let an admin or an onyx developer know. Error: {error_message}"
|
|
)
|
|
return SectionBlock(text=display_text)
|
|
|
|
|
|
@retry(
|
|
tries=DANSWER_BOT_NUM_RETRIES,
|
|
delay=0.25,
|
|
backoff=2,
|
|
logger=cast(logging.Logger, logger),
|
|
)
|
|
def respond_in_thread_or_channel(
|
|
client: WebClient,
|
|
channel: str,
|
|
thread_ts: str | None,
|
|
text: str | None = None,
|
|
blocks: list[Block] | None = None,
|
|
receiver_ids: list[str] | None = None,
|
|
metadata: Metadata | None = None,
|
|
unfurl: bool = True,
|
|
send_as_ephemeral: bool | None = True,
|
|
) -> list[str]:
|
|
if not text and not blocks:
|
|
raise ValueError("One of `text` or `blocks` must be provided")
|
|
|
|
message_ids: list[str] = []
|
|
if not receiver_ids:
|
|
slack_call = make_slack_api_rate_limited(client.chat_postMessage)
|
|
try:
|
|
response = slack_call(
|
|
channel=channel,
|
|
text=text,
|
|
blocks=blocks,
|
|
thread_ts=thread_ts,
|
|
metadata=metadata,
|
|
unfurl_links=unfurl,
|
|
unfurl_media=unfurl,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to post message: {e} \n blocks: {blocks}")
|
|
logger.warning("Trying again without blocks that have urls")
|
|
|
|
if not blocks:
|
|
raise e
|
|
|
|
blocks_without_urls = [
|
|
block for block in blocks if not _check_for_url_in_block(block)
|
|
]
|
|
blocks_without_urls.append(_build_error_block(str(e)))
|
|
|
|
# Try again wtihout blocks containing url
|
|
response = slack_call(
|
|
channel=channel,
|
|
text=text,
|
|
blocks=blocks_without_urls,
|
|
thread_ts=thread_ts,
|
|
metadata=metadata,
|
|
unfurl_links=unfurl,
|
|
unfurl_media=unfurl,
|
|
)
|
|
|
|
message_ids.append(response["message_ts"])
|
|
else:
|
|
slack_call = make_slack_api_rate_limited(client.chat_postEphemeral)
|
|
|
|
for receiver in receiver_ids:
|
|
try:
|
|
response = slack_call(
|
|
channel=channel,
|
|
user=receiver,
|
|
text=text,
|
|
blocks=blocks,
|
|
thread_ts=thread_ts,
|
|
metadata=metadata,
|
|
unfurl_links=unfurl,
|
|
unfurl_media=unfurl,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to post message: {e} \n blocks: {blocks}")
|
|
logger.warning("Trying again without blocks that have urls")
|
|
|
|
if not blocks:
|
|
raise e
|
|
|
|
blocks_without_urls = [
|
|
block for block in blocks if not _check_for_url_in_block(block)
|
|
]
|
|
blocks_without_urls.append(_build_error_block(str(e)))
|
|
|
|
# Try again wtihout blocks containing url
|
|
response = slack_call(
|
|
channel=channel,
|
|
user=receiver,
|
|
text=text,
|
|
blocks=blocks_without_urls,
|
|
thread_ts=thread_ts,
|
|
metadata=metadata,
|
|
unfurl_links=unfurl,
|
|
unfurl_media=unfurl,
|
|
)
|
|
|
|
message_ids.append(response["message_ts"])
|
|
|
|
return message_ids
|
|
|
|
|
|
def build_feedback_id(
|
|
message_id: int,
|
|
document_id: str | None = None,
|
|
document_rank: int | None = None,
|
|
) -> str:
|
|
unique_prefix = "".join(random.choice(string.ascii_letters) for _ in range(10))
|
|
if document_id is not None:
|
|
if not document_id or document_rank is None:
|
|
raise ValueError("Invalid document, missing information")
|
|
if ID_SEPARATOR in document_id:
|
|
raise ValueError(
|
|
"Separator pattern should not already exist in document id"
|
|
)
|
|
feedback_id = ID_SEPARATOR.join(
|
|
[str(message_id), document_id, str(document_rank)]
|
|
)
|
|
else:
|
|
feedback_id = str(message_id)
|
|
|
|
return unique_prefix + ID_SEPARATOR + feedback_id
|
|
|
|
|
|
def build_publish_ephemeral_message_id(
|
|
original_question_ts: str,
|
|
) -> str:
|
|
return "publish_ephemeral_message__" + original_question_ts
|
|
|
|
|
|
def build_continue_in_web_ui_id(
|
|
message_id: int,
|
|
) -> str:
|
|
unique_prefix = str(uuid.uuid4())[:10]
|
|
return unique_prefix + ID_SEPARATOR + str(message_id)
|
|
|
|
|
|
def decompose_action_id(feedback_id: str) -> tuple[int, str | None, int | None]:
|
|
"""Decompose into query_id, document_id, document_rank, see above function"""
|
|
try:
|
|
components = feedback_id.split(ID_SEPARATOR)
|
|
if len(components) != 2 and len(components) != 4:
|
|
raise ValueError("Feedback ID does not contain right number of elements")
|
|
|
|
if len(components) == 2:
|
|
return int(components[-1]), None, None
|
|
|
|
return int(components[1]), components[2], int(components[3])
|
|
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise ValueError("Received invalid Feedback Identifier")
|
|
|
|
|
|
def get_view_values(state_values: dict[str, Any]) -> dict[str, str]:
|
|
"""Extract view values
|
|
|
|
Args:
|
|
state_values (dict): The Slack view-submission values
|
|
|
|
Returns:
|
|
dict: keys/values of the view state content
|
|
"""
|
|
view_values = {}
|
|
for _, view_data in state_values.items():
|
|
for k, v in view_data.items():
|
|
if (
|
|
"selected_option" in v
|
|
and isinstance(v["selected_option"], dict)
|
|
and "value" in v["selected_option"]
|
|
):
|
|
view_values[k] = v["selected_option"]["value"]
|
|
elif "selected_options" in v and isinstance(v["selected_options"], list):
|
|
view_values[k] = [
|
|
x["value"] for x in v["selected_options"] if "value" in x
|
|
]
|
|
elif "selected_date" in v:
|
|
view_values[k] = v["selected_date"]
|
|
elif "value" in v:
|
|
view_values[k] = v["value"]
|
|
return view_values
|
|
|
|
|
|
def translate_vespa_highlight_to_slack(match_strs: list[str], used_chars: int) -> str:
|
|
def _replace_highlight(s: str) -> str:
|
|
s = re.sub(r"(?<=[^\s])<hi>(.*?)</hi>", r"\1", s)
|
|
s = s.replace("</hi>", "*").replace("<hi>", "*")
|
|
return s
|
|
|
|
final_matches = [
|
|
replace_whitespaces_w_space(_replace_highlight(match_str)).strip()
|
|
for match_str in match_strs
|
|
if match_str
|
|
]
|
|
combined = "... ".join(final_matches)
|
|
|
|
# Slack introduces "Show More" after 300 on desktop which is ugly
|
|
# But don't trim the message if there is still a highlight after 300 chars
|
|
remaining = 300 - used_chars
|
|
if len(combined) > remaining and "*" not in combined[remaining:]:
|
|
combined = combined[: remaining - 3] + "..."
|
|
|
|
return combined
|
|
|
|
|
|
def remove_slack_text_interactions(slack_str: str) -> str:
|
|
slack_str = SlackTextCleaner.replace_tags_basic(slack_str)
|
|
slack_str = SlackTextCleaner.replace_channels_basic(slack_str)
|
|
slack_str = SlackTextCleaner.replace_special_mentions(slack_str)
|
|
slack_str = SlackTextCleaner.replace_special_catchall(slack_str)
|
|
slack_str = SlackTextCleaner.add_zero_width_whitespace_after_tag(slack_str)
|
|
return slack_str
|
|
|
|
|
|
def get_channel_from_id(client: WebClient, channel_id: str) -> dict[str, Any]:
|
|
response = client.conversations_info(channel=channel_id)
|
|
response.validate()
|
|
return response["channel"]
|
|
|
|
|
|
def get_channel_name_from_id(
|
|
client: WebClient, channel_id: str
|
|
) -> tuple[str | None, bool]:
|
|
try:
|
|
channel_info = get_channel_from_id(client, channel_id)
|
|
name = channel_info.get("name")
|
|
is_dm = any([channel_info.get("is_im"), channel_info.get("is_mpim")])
|
|
return name, is_dm
|
|
except SlackApiError as e:
|
|
logger.exception(f"Couldn't fetch channel name from id: {channel_id}")
|
|
raise e
|
|
|
|
|
|
def fetch_slack_user_ids_from_emails(
|
|
user_emails: list[str], client: WebClient
|
|
) -> tuple[list[str], list[str]]:
|
|
user_ids: list[str] = []
|
|
failed_to_find: list[str] = []
|
|
for email in user_emails:
|
|
try:
|
|
user = client.users_lookupByEmail(email=email)
|
|
user_ids.append(user.data["user"]["id"]) # type: ignore
|
|
except Exception:
|
|
logger.error(f"Was not able to find slack user by email: {email}")
|
|
failed_to_find.append(email)
|
|
|
|
return user_ids, failed_to_find
|
|
|
|
|
|
def fetch_user_ids_from_groups(
|
|
given_names: list[str], client: WebClient
|
|
) -> tuple[list[str], list[str]]:
|
|
user_ids: list[str] = []
|
|
failed_to_find: list[str] = []
|
|
try:
|
|
response = client.usergroups_list()
|
|
if not isinstance(response.data, dict):
|
|
logger.error("Error fetching user groups")
|
|
return user_ids, given_names
|
|
|
|
all_group_data = response.data.get("usergroups", [])
|
|
name_id_map = {d["name"]: d["id"] for d in all_group_data}
|
|
handle_id_map = {d["handle"]: d["id"] for d in all_group_data}
|
|
for given_name in given_names:
|
|
group_id = name_id_map.get(given_name) or handle_id_map.get(
|
|
given_name.lstrip("@")
|
|
)
|
|
if not group_id:
|
|
failed_to_find.append(given_name)
|
|
continue
|
|
try:
|
|
response = client.usergroups_users_list(usergroup=group_id)
|
|
if isinstance(response.data, dict):
|
|
user_ids.extend(response.data.get("users", []))
|
|
else:
|
|
failed_to_find.append(given_name)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching user group ids: {str(e)}")
|
|
failed_to_find.append(given_name)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching user groups: {str(e)}")
|
|
failed_to_find = given_names
|
|
|
|
return user_ids, failed_to_find
|
|
|
|
|
|
def fetch_group_ids_from_names(
|
|
given_names: list[str], client: WebClient
|
|
) -> tuple[list[str], list[str]]:
|
|
group_data: list[str] = []
|
|
failed_to_find: list[str] = []
|
|
|
|
try:
|
|
response = client.usergroups_list()
|
|
if not isinstance(response.data, dict):
|
|
logger.error("Error fetching user groups")
|
|
return group_data, given_names
|
|
|
|
all_group_data = response.data.get("usergroups", [])
|
|
|
|
name_id_map = {d["name"]: d["id"] for d in all_group_data}
|
|
handle_id_map = {d["handle"]: d["id"] for d in all_group_data}
|
|
|
|
for given_name in given_names:
|
|
id = handle_id_map.get(given_name.lstrip("@"))
|
|
id = id or name_id_map.get(given_name)
|
|
if id:
|
|
group_data.append(id)
|
|
else:
|
|
failed_to_find.append(given_name)
|
|
except Exception as e:
|
|
failed_to_find = given_names
|
|
logger.error(f"Error fetching user groups: {str(e)}")
|
|
|
|
return group_data, failed_to_find
|
|
|
|
|
|
def fetch_user_semantic_id_from_id(
|
|
user_id: str | None, client: WebClient
|
|
) -> str | None:
|
|
if not user_id:
|
|
return None
|
|
|
|
response = make_slack_api_rate_limited(client.users_info)(user=user_id)
|
|
if not response["ok"]:
|
|
return None
|
|
|
|
user: dict = cast(dict[Any, dict], response.data).get("user", {})
|
|
|
|
return (
|
|
user.get("real_name")
|
|
or user.get("name")
|
|
or user.get("profile", {}).get("email")
|
|
)
|
|
|
|
|
|
def read_slack_thread(
|
|
channel: str, thread: str, client: WebClient
|
|
) -> list[ThreadMessage]:
|
|
thread_messages: list[ThreadMessage] = []
|
|
response = client.conversations_replies(channel=channel, ts=thread)
|
|
replies = cast(dict, response.data).get("messages", [])
|
|
for reply in replies:
|
|
if "user" in reply and "bot_id" not in reply:
|
|
message = reply["text"]
|
|
user_sem_id = (
|
|
fetch_user_semantic_id_from_id(reply.get("user"), client)
|
|
or "Unknown User"
|
|
)
|
|
message_type = MessageType.USER
|
|
else:
|
|
self_slack_bot_id = get_onyx_bot_slack_bot_id(client)
|
|
blocks: Any
|
|
if reply.get("user") == self_slack_bot_id:
|
|
# OnyxBot response
|
|
message_type = MessageType.ASSISTANT
|
|
user_sem_id = "Assistant"
|
|
|
|
# OnyxBot responses have both text and blocks
|
|
# The useful content is in the blocks, specifically the first block unless there are
|
|
# auto-detected filters
|
|
blocks = reply.get("blocks")
|
|
if not blocks:
|
|
logger.warning(f"OnyxBot response has no blocks: {reply}")
|
|
continue
|
|
|
|
message = blocks[0].get("text", {}).get("text")
|
|
|
|
# If auto-detected filters are on, use the second block for the actual answer
|
|
# The first block is the auto-detected filters
|
|
if message is not None and message.startswith("_Filters"):
|
|
if len(blocks) < 2:
|
|
logger.warning(f"Only filter blocks found: {reply}")
|
|
continue
|
|
# This is the OnyxBot answer format, if there is a change to how we respond,
|
|
# this will need to be updated to get the correct "answer" portion
|
|
message = reply["blocks"][1].get("text", {}).get("text")
|
|
else:
|
|
# Other bots are not counted as the LLM response which only comes from Onyx
|
|
message_type = MessageType.USER
|
|
bot_user_name = fetch_user_semantic_id_from_id(
|
|
reply.get("user"), client
|
|
)
|
|
user_sem_id = bot_user_name or "Unknown" + " Bot"
|
|
|
|
# For other bots, just use the text as we have no way of knowing that the
|
|
# useful portion is
|
|
message = reply.get("text")
|
|
if not message:
|
|
message = blocks[0].get("text", {}).get("text")
|
|
|
|
if not message:
|
|
logger.warning("Skipping Slack thread message, no text found")
|
|
continue
|
|
|
|
message = remove_onyx_bot_tag(message, client=client)
|
|
thread_messages.append(
|
|
ThreadMessage(message=message, sender=user_sem_id, role=message_type)
|
|
)
|
|
|
|
return thread_messages
|
|
|
|
|
|
def slack_usage_report(action: str, sender_id: str | None, client: WebClient) -> None:
|
|
if DISABLE_TELEMETRY:
|
|
return
|
|
|
|
onyx_user = None
|
|
sender_email = None
|
|
try:
|
|
sender_email = client.users_info(user=sender_id).data["user"]["profile"]["email"] # type: ignore
|
|
except Exception:
|
|
logger.warning("Unable to find sender email")
|
|
|
|
if sender_email is not None:
|
|
with get_session_with_current_tenant() as db_session:
|
|
onyx_user = get_user_by_email(email=sender_email, db_session=db_session)
|
|
|
|
optional_telemetry(
|
|
record_type=RecordType.USAGE,
|
|
data={"action": action},
|
|
user_id=str(onyx_user.id) if onyx_user else "Non-Onyx-Or-No-Auth-User",
|
|
)
|
|
|
|
|
|
class SlackRateLimiter:
|
|
def __init__(self) -> None:
|
|
self.max_qpm: int | None = DANSWER_BOT_MAX_QPM
|
|
self.max_wait_time = DANSWER_BOT_MAX_WAIT_TIME
|
|
self.active_question = 0
|
|
self.last_reset_time = time.time()
|
|
self.waiting_questions: list[int] = []
|
|
|
|
def refill(self) -> None:
|
|
# If elapsed time is greater than the period, reset the active question count
|
|
if (time.time() - self.last_reset_time) > 60:
|
|
self.active_question = 0
|
|
self.last_reset_time = time.time()
|
|
|
|
def notify(
|
|
self, client: WebClient, channel: str, position: int, thread_ts: str | None
|
|
) -> None:
|
|
respond_in_thread_or_channel(
|
|
client=client,
|
|
channel=channel,
|
|
receiver_ids=None,
|
|
text=f"Your question has been queued. You are in position {position}.\n"
|
|
f"Please wait a moment :hourglass_flowing_sand:",
|
|
thread_ts=thread_ts,
|
|
)
|
|
|
|
def is_available(self) -> bool:
|
|
if self.max_qpm is None:
|
|
return True
|
|
|
|
self.refill()
|
|
return self.active_question < self.max_qpm
|
|
|
|
def acquire_slot(self) -> None:
|
|
self.active_question += 1
|
|
|
|
def init_waiter(self) -> tuple[int, int]:
|
|
func_randid = random.getrandbits(128)
|
|
self.waiting_questions.append(func_randid)
|
|
position = self.waiting_questions.index(func_randid) + 1
|
|
|
|
return func_randid, position
|
|
|
|
def waiter(self, func_randid: int) -> None:
|
|
if self.max_qpm is None:
|
|
return
|
|
|
|
wait_time = 0
|
|
while (
|
|
self.active_question >= self.max_qpm
|
|
or self.waiting_questions[0] != func_randid
|
|
):
|
|
if wait_time > self.max_wait_time:
|
|
raise TimeoutError
|
|
time.sleep(2)
|
|
wait_time += 2
|
|
self.refill()
|
|
|
|
del self.waiting_questions[0]
|
|
|
|
|
|
def get_feedback_visibility() -> FeedbackVisibility:
|
|
try:
|
|
return FeedbackVisibility(DANSWER_BOT_FEEDBACK_VISIBILITY.lower())
|
|
except ValueError:
|
|
return FeedbackVisibility.PRIVATE
|
|
|
|
|
|
class TenantSocketModeClient(SocketModeClient):
|
|
def __init__(self, tenant_id: str, slack_bot_id: int, *args: Any, **kwargs: Any):
|
|
super().__init__(*args, **kwargs)
|
|
self._tenant_id = tenant_id
|
|
self.slack_bot_id = slack_bot_id
|
|
|
|
@contextmanager
|
|
def _set_tenant_context(self) -> Generator[None, None, None]:
|
|
token = None
|
|
try:
|
|
if self._tenant_id:
|
|
token = CURRENT_TENANT_ID_CONTEXTVAR.set(self._tenant_id)
|
|
yield
|
|
finally:
|
|
if token:
|
|
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
|
|
|
|
def enqueue_message(self, message: str) -> None:
|
|
with self._set_tenant_context():
|
|
super().enqueue_message(message)
|
|
|
|
def process_message(self) -> None:
|
|
with self._set_tenant_context():
|
|
super().process_message()
|
|
|
|
def run_message_listeners(self, message: dict, raw_message: str) -> None:
|
|
with self._set_tenant_context():
|
|
super().run_message_listeners(message, raw_message)
|