diff --git a/backend/onyx/background/indexing/job_client.py b/backend/onyx/background/indexing/job_client.py index 772fb8d52..db8b5e3b4 100644 --- a/backend/onyx/background/indexing/job_client.py +++ b/backend/onyx/background/indexing/job_client.py @@ -16,7 +16,7 @@ from typing import Optional from onyx.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME from onyx.db.engine import SqlEngine -from onyx.utils.logger import setup_logger +from onyx.setup import setup_logger from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import TENANT_ID_PREFIX from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR diff --git a/backend/onyx/onyxbot/slack/blocks.py b/backend/onyx/onyxbot/slack/blocks.py index 2c2138253..9eae5d017 100644 --- a/backend/onyx/onyxbot/slack/blocks.py +++ b/backend/onyx/onyxbot/slack/blocks.py @@ -23,7 +23,7 @@ from onyx.configs.constants import SearchFeedbackType from onyx.configs.onyxbot_configs import DANSWER_BOT_NUM_DOCS_TO_DISPLAY from onyx.context.search.models import SavedSearchDoc from onyx.db.chat import get_chat_session_by_message_id -from onyx.db.engine import get_session_with_tenant +from onyx.db.engine import get_session_with_current_tenant from onyx.db.models import ChannelConfig from onyx.onyxbot.slack.constants import CONTINUE_IN_WEB_UI_ACTION_ID from onyx.onyxbot.slack.constants import DISLIKE_BLOCK_ACTION_ID @@ -410,12 +410,11 @@ def _build_qa_response_blocks( def _build_continue_in_web_ui_block( - tenant_id: str, message_id: int | None, ) -> Block: if message_id is None: raise ValueError("No message id provided to build continue in web ui block") - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: chat_session = get_chat_session_by_message_id( db_session=db_session, message_id=message_id, @@ -482,7 +481,6 @@ def build_follow_up_resolved_blocks( def build_slack_response_blocks( answer: ChatOnyxBotResponse, - tenant_id: str, message_info: SlackMessageInfo, channel_conf: ChannelConfig | None, use_citations: bool, @@ -517,7 +515,6 @@ def build_slack_response_blocks( if channel_conf and channel_conf.get("show_continue_in_web_ui"): web_follow_up_block.append( _build_continue_in_web_ui_block( - tenant_id=tenant_id, message_id=answer.chat_message_id, ) ) diff --git a/backend/onyx/onyxbot/slack/handlers/handle_buttons.py b/backend/onyx/onyxbot/slack/handlers/handle_buttons.py index 42428f231..548e3ebfc 100644 --- a/backend/onyx/onyxbot/slack/handlers/handle_buttons.py +++ b/backend/onyx/onyxbot/slack/handlers/handle_buttons.py @@ -11,7 +11,7 @@ from onyx.configs.constants import SearchFeedbackType from onyx.configs.onyxbot_configs import DANSWER_FOLLOWUP_EMOJI from onyx.connectors.slack.utils import expert_info_from_slack_id from onyx.connectors.slack.utils import make_slack_api_rate_limited -from onyx.db.engine import get_session_with_tenant +from onyx.db.engine import get_session_with_current_tenant from onyx.db.feedback import create_chat_message_feedback from onyx.db.feedback import create_doc_retrieval_feedback from onyx.onyxbot.slack.blocks import build_follow_up_resolved_blocks @@ -114,7 +114,7 @@ def handle_generate_answer_button( thread_ts=thread_ts, ) - with get_session_with_tenant(tenant_id=client.tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: slack_channel_config = get_slack_channel_config_for_bot_and_channel( db_session=db_session, slack_bot_id=client.slack_bot_id, @@ -136,7 +136,6 @@ def handle_generate_answer_button( slack_channel_config=slack_channel_config, receiver_ids=None, client=client.web_client, - tenant_id=client.tenant_id, channel=channel_id, logger=logger, feedback_reminder_id=None, @@ -151,11 +150,10 @@ def handle_slack_feedback( user_id_to_post_confirmation: str, channel_id_to_post_confirmation: str, thread_ts_to_post_confirmation: str, - tenant_id: str, ) -> None: message_id, doc_id, doc_rank = decompose_action_id(feedback_id) - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: if feedback_type in [LIKE_BLOCK_ACTION_ID, DISLIKE_BLOCK_ACTION_ID]: create_chat_message_feedback( is_positive=feedback_type == LIKE_BLOCK_ACTION_ID, @@ -246,7 +244,7 @@ def handle_followup_button( tag_ids: list[str] = [] group_ids: list[str] = [] - with get_session_with_tenant(tenant_id=client.tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: channel_name, is_dm = get_channel_name_from_id( client=client.web_client, channel_id=channel_id ) diff --git a/backend/onyx/onyxbot/slack/handlers/handle_message.py b/backend/onyx/onyxbot/slack/handlers/handle_message.py index 3d38417e2..96e79cb45 100644 --- a/backend/onyx/onyxbot/slack/handlers/handle_message.py +++ b/backend/onyx/onyxbot/slack/handlers/handle_message.py @@ -5,7 +5,7 @@ from slack_sdk.errors import SlackApiError from onyx.configs.onyxbot_configs import DANSWER_BOT_FEEDBACK_REMINDER from onyx.configs.onyxbot_configs import DANSWER_REACT_EMOJI -from onyx.db.engine import get_session_with_tenant +from onyx.db.engine import get_session_with_current_tenant from onyx.db.models import SlackChannelConfig from onyx.db.users import add_slack_user_if_not_exists from onyx.onyxbot.slack.blocks import get_feedback_reminder_blocks @@ -109,7 +109,6 @@ def handle_message( slack_channel_config: SlackChannelConfig, client: WebClient, feedback_reminder_id: str | None, - tenant_id: str, ) -> bool: """Potentially respond to the user message depending on filters and if an answer was generated @@ -135,9 +134,7 @@ def handle_message( action = "slack_tag_message" elif is_bot_dm: action = "slack_dm_message" - slack_usage_report( - action=action, sender_id=sender_id, client=client, tenant_id=tenant_id - ) + slack_usage_report(action=action, sender_id=sender_id, client=client) document_set_names: list[str] | None = None persona = slack_channel_config.persona if slack_channel_config else None @@ -218,7 +215,7 @@ def handle_message( except SlackApiError as e: logger.error(f"Was not able to react to user message due to: {e}") - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: if message_info.email: add_slack_user_if_not_exists(db_session, message_info.email) @@ -244,6 +241,5 @@ def handle_message( channel=channel, logger=logger, feedback_reminder_id=feedback_reminder_id, - tenant_id=tenant_id, ) return issue_with_regular_answer diff --git a/backend/onyx/onyxbot/slack/handlers/handle_regular_answer.py b/backend/onyx/onyxbot/slack/handlers/handle_regular_answer.py index f7c7d8f1a..73b123a32 100644 --- a/backend/onyx/onyxbot/slack/handlers/handle_regular_answer.py +++ b/backend/onyx/onyxbot/slack/handlers/handle_regular_answer.py @@ -24,7 +24,6 @@ from onyx.context.search.enums import OptionalSearchSetting from onyx.context.search.models import BaseFilters from onyx.context.search.models import RetrievalDetails from onyx.db.engine import get_session_with_current_tenant -from onyx.db.engine import get_session_with_tenant from onyx.db.models import SlackChannelConfig from onyx.db.models import User from onyx.db.persona import get_persona_by_id @@ -72,7 +71,6 @@ def handle_regular_answer( channel: str, logger: OnyxLoggingAdapter, feedback_reminder_id: str | None, - tenant_id: str, num_retries: int = DANSWER_BOT_NUM_RETRIES, thread_context_percent: float = MAX_THREAD_CONTEXT_PERCENTAGE, should_respond_with_error_msgs: bool = DANSWER_BOT_DISPLAY_ERROR_MSGS, @@ -87,7 +85,7 @@ def handle_regular_answer( user = None if message_info.is_bot_dm: if message_info.email: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: user = get_user_by_email(message_info.email, db_session) document_set_names: list[str] | None = None @@ -96,7 +94,7 @@ def handle_regular_answer( # This way slack flow always has a persona persona = slack_channel_config.persona if not persona: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: persona = get_persona_by_id(DEFAULT_PERSONA_ID, user, db_session) document_set_names = [ document_set.name for document_set in persona.document_sets @@ -157,7 +155,7 @@ def handle_regular_answer( def _get_slack_answer( new_message_request: CreateChatMessageRequest, onyx_user: User | None ) -> ChatOnyxBotResponse: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: packets = stream_chat_message_objects( new_msg_req=new_message_request, user=onyx_user, @@ -197,7 +195,7 @@ def handle_regular_answer( enable_auto_detect_filters=auto_detect_filters, ) - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: answer_request = prepare_chat_message_request( message_text=user_message.message, user=user, @@ -361,7 +359,6 @@ def handle_regular_answer( return True all_blocks = build_slack_response_blocks( - tenant_id=tenant_id, message_info=message_info, answer=answer, channel_conf=channel_conf, diff --git a/backend/onyx/onyxbot/slack/listener.py b/backend/onyx/onyxbot/slack/listener.py index 6fb590622..2cec1a66e 100644 --- a/backend/onyx/onyxbot/slack/listener.py +++ b/backend/onyx/onyxbot/slack/listener.py @@ -37,6 +37,7 @@ from onyx.context.search.retrieval.search_runner import ( download_nltk_data, ) from onyx.db.engine import get_all_tenant_ids +from onyx.db.engine import get_session_with_current_tenant from onyx.db.engine import get_session_with_tenant from onyx.db.models import SlackBot from onyx.db.search_settings import get_current_search_settings @@ -92,6 +93,7 @@ from shared_configs.configs import MODEL_SERVER_PORT from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import SLACK_CHANNEL_ID from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR +from shared_configs.contextvars import get_current_tenant_id logger = setup_logger() @@ -347,7 +349,7 @@ class SlackbotHandler: redis_client = get_redis_client(tenant_id=tenant_id) try: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: # Attempt to fetch Slack bots try: bots = list(fetch_slack_bots(db_session=db_session)) @@ -586,7 +588,7 @@ def prefilter_requests(req: SocketModeRequest, client: TenantSocketModeClient) - channel_name, _ = get_channel_name_from_id( client=client.web_client, channel_id=channel ) - with get_session_with_tenant(tenant_id=client.tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: slack_channel_config = get_slack_channel_config_for_bot_and_channel( db_session=db_session, slack_bot_id=client.slack_bot_id, @@ -680,7 +682,6 @@ def process_feedback(req: SocketModeRequest, client: TenantSocketModeClient) -> user_id_to_post_confirmation=user_id, channel_id_to_post_confirmation=channel_id, thread_ts_to_post_confirmation=thread_ts, - tenant_id=client.tenant_id, ) query_event_id, _, _ = decompose_action_id(feedback_id) @@ -796,8 +797,9 @@ def process_message( respond_every_channel: bool = DANSWER_BOT_RESPOND_EVERY_CHANNEL, notify_no_answer: bool = NOTIFY_SLACKBOT_NO_ANSWER, ) -> None: + tenant_id = get_current_tenant_id() logger.debug( - f"Received Slack request of type: '{req.type}' for tenant, {client.tenant_id}" + f"Received Slack request of type: '{req.type}' for tenant, {tenant_id}" ) # Throw out requests that can't or shouldn't be handled @@ -810,50 +812,39 @@ def process_message( client=client.web_client, channel_id=channel ) - token: Token[str | None] | None = None - # Set the current tenant ID at the beginning for all DB calls within this thread - if client.tenant_id: - logger.info(f"Setting tenant ID to {client.tenant_id}") - token = CURRENT_TENANT_ID_CONTEXTVAR.set(client.tenant_id) - try: - with get_session_with_tenant(tenant_id=client.tenant_id) as db_session: - slack_channel_config = get_slack_channel_config_for_bot_and_channel( - db_session=db_session, - slack_bot_id=client.slack_bot_id, - channel_name=channel_name, - ) + with get_session_with_current_tenant() as db_session: + slack_channel_config = get_slack_channel_config_for_bot_and_channel( + db_session=db_session, + slack_bot_id=client.slack_bot_id, + channel_name=channel_name, + ) - follow_up = bool( - slack_channel_config.channel_config - and slack_channel_config.channel_config.get("follow_up_tags") - is not None - ) + follow_up = bool( + slack_channel_config.channel_config + and slack_channel_config.channel_config.get("follow_up_tags") is not None + ) - feedback_reminder_id = schedule_feedback_reminder( - details=details, client=client.web_client, include_followup=follow_up - ) + feedback_reminder_id = schedule_feedback_reminder( + details=details, client=client.web_client, include_followup=follow_up + ) - failed = handle_message( - message_info=details, - slack_channel_config=slack_channel_config, - client=client.web_client, - feedback_reminder_id=feedback_reminder_id, - tenant_id=client.tenant_id, - ) + failed = handle_message( + message_info=details, + slack_channel_config=slack_channel_config, + client=client.web_client, + feedback_reminder_id=feedback_reminder_id, + ) - if failed: - if feedback_reminder_id: - remove_scheduled_feedback_reminder( - client=client.web_client, - channel=details.sender_id, - msg_id=feedback_reminder_id, - ) - # Skipping answering due to pre-filtering is not considered a failure - if notify_no_answer: - apologize_for_fail(details, client) - finally: - if token: - CURRENT_TENANT_ID_CONTEXTVAR.reset(token) + if failed: + if feedback_reminder_id: + remove_scheduled_feedback_reminder( + client=client.web_client, + channel=details.sender_id, + msg_id=feedback_reminder_id, + ) + # Skipping answering due to pre-filtering is not considered a failure + if notify_no_answer: + apologize_for_fail(details, client) def acknowledge_message(req: SocketModeRequest, client: TenantSocketModeClient) -> None: diff --git a/backend/onyx/onyxbot/slack/utils.py b/backend/onyx/onyxbot/slack/utils.py index f5d2209b0..1f85ca2da 100644 --- a/backend/onyx/onyxbot/slack/utils.py +++ b/backend/onyx/onyxbot/slack/utils.py @@ -4,6 +4,8 @@ import re import string import time import uuid +from collections.abc import Generator +from contextlib import contextmanager from typing import Any from typing import cast @@ -30,7 +32,7 @@ from onyx.configs.onyxbot_configs import ( ) from onyx.connectors.slack.utils import make_slack_api_rate_limited from onyx.connectors.slack.utils import SlackTextCleaner -from onyx.db.engine import get_session_with_tenant +from onyx.db.engine import get_session_with_current_tenant from onyx.db.users import get_user_by_email from onyx.llm.exceptions import GenAIDisabledException from onyx.llm.factory import get_default_llms @@ -43,6 +45,7 @@ from onyx.utils.logger import setup_logger from onyx.utils.telemetry import optional_telemetry from onyx.utils.telemetry import RecordType from onyx.utils.text_processing import replace_whitespaces_w_space +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR logger = setup_logger() @@ -569,9 +572,7 @@ def read_slack_thread( return thread_messages -def slack_usage_report( - action: str, sender_id: str | None, client: WebClient, tenant_id: str -) -> None: +def slack_usage_report(action: str, sender_id: str | None, client: WebClient) -> None: if DISABLE_TELEMETRY: return @@ -583,14 +584,13 @@ def slack_usage_report( logger.warning("Unable to find sender email") if sender_email is not None: - with get_session_with_tenant(tenant_id=tenant_id) as db_session: + with get_session_with_current_tenant() as db_session: onyx_user = get_user_by_email(email=sender_email, db_session=db_session) optional_telemetry( record_type=RecordType.USAGE, data={"action": action}, user_id=str(onyx_user.id) if onyx_user else "Non-Onyx-Or-No-Auth-User", - tenant_id=tenant_id, ) @@ -665,5 +665,28 @@ def get_feedback_visibility() -> FeedbackVisibility: class TenantSocketModeClient(SocketModeClient): def __init__(self, tenant_id: str, slack_bot_id: int, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) - self.tenant_id = tenant_id + self._tenant_id = tenant_id self.slack_bot_id = slack_bot_id + + @contextmanager + def _set_tenant_context(self) -> Generator[None, None, None]: + token = None + try: + if self._tenant_id: + token = CURRENT_TENANT_ID_CONTEXTVAR.set(self._tenant_id) + yield + finally: + if token: + CURRENT_TENANT_ID_CONTEXTVAR.reset(token) + + def enqueue_message(self, message: str) -> None: + with self._set_tenant_context(): + super().enqueue_message(message) + + def process_message(self) -> None: + with self._set_tenant_context(): + super().process_message() + + def run_message_listeners(self, message: dict, raw_message: str) -> None: + with self._set_tenant_context(): + super().run_message_listeners(message, raw_message)