mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-25 19:37:29 +02:00
Fe for feedback (#346)
This commit is contained in:
@@ -23,7 +23,6 @@ from danswer.db.connector_credential_pair import update_connector_credential_pai
|
||||
from danswer.db.credentials import backend_update_credential_json
|
||||
from danswer.db.engine import get_db_current_time
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.feedback import create_document_metadata
|
||||
from danswer.db.index_attempt import create_index_attempt
|
||||
from danswer.db.index_attempt import get_index_attempt
|
||||
from danswer.db.index_attempt import get_inprogress_index_attempts
|
||||
|
155
backend/danswer/bots/slack/blocks.py
Normal file
155
backend/danswer/bots/slack/blocks.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from slack_sdk.models.blocks import ActionsBlock
|
||||
from slack_sdk.models.blocks import Block
|
||||
from slack_sdk.models.blocks import ButtonElement
|
||||
from slack_sdk.models.blocks import SectionBlock
|
||||
|
||||
from danswer.bots.slack.constants import DISLIKE_BLOCK_ACTION_ID
|
||||
from danswer.bots.slack.constants import LIKE_BLOCK_ACTION_ID
|
||||
from danswer.bots.slack.utils import build_block_id_from_query_event_id
|
||||
from danswer.configs.app_configs import DANSWER_BOT_NUM_DOCS_TO_DISPLAY
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.slack.utils import UserIdReplacer
|
||||
from danswer.direct_qa.interfaces import DanswerQuote
|
||||
from danswer.server.models import SearchDoc
|
||||
|
||||
|
||||
def build_feedback_block(query_event_id: int) -> Block:
|
||||
return ActionsBlock(
|
||||
block_id=build_block_id_from_query_event_id(query_event_id),
|
||||
elements=[
|
||||
ButtonElement(
|
||||
action_id=LIKE_BLOCK_ACTION_ID,
|
||||
text="👍",
|
||||
style="primary",
|
||||
),
|
||||
ButtonElement(
|
||||
action_id=DISLIKE_BLOCK_ACTION_ID,
|
||||
text="👎",
|
||||
style="danger",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
_MAX_BLURB_LEN = 75
|
||||
|
||||
|
||||
def _build_custom_semantic_identifier(
|
||||
semantic_identifier: str, blurb: str, source: str
|
||||
) -> str:
|
||||
"""
|
||||
On slack, since we just show the semantic identifier rather than semantic + blurb, we need
|
||||
to do some custom formatting to make sure the semantic identifier is unique and meaningful.
|
||||
"""
|
||||
if source == DocumentSource.SLACK.value:
|
||||
truncated_blurb = (
|
||||
f"{blurb[:_MAX_BLURB_LEN]}..." if len(blurb) > _MAX_BLURB_LEN else blurb
|
||||
)
|
||||
# NOTE: removing tags so that we don't accidentally tag users in Slack +
|
||||
# so that it can be used as part of a <link|text> link
|
||||
truncated_blurb = UserIdReplacer.replace_tags_basic(truncated_blurb)
|
||||
truncated_blurb = UserIdReplacer.replace_channels_basic(truncated_blurb)
|
||||
truncated_blurb = UserIdReplacer.replace_special_mentions(truncated_blurb)
|
||||
if truncated_blurb:
|
||||
return f"#{semantic_identifier}: {truncated_blurb}"
|
||||
else:
|
||||
return f"#{semantic_identifier}"
|
||||
|
||||
return semantic_identifier
|
||||
|
||||
|
||||
def build_documents_block(
|
||||
documents: list[SearchDoc],
|
||||
already_displayed_doc_identifiers: list[str],
|
||||
num_docs_to_display: int = DANSWER_BOT_NUM_DOCS_TO_DISPLAY,
|
||||
) -> SectionBlock:
|
||||
seen_docs_identifiers = set(already_displayed_doc_identifiers)
|
||||
top_document_lines: list[str] = []
|
||||
for d in documents:
|
||||
if d.document_id in seen_docs_identifiers:
|
||||
continue
|
||||
seen_docs_identifiers.add(d.document_id)
|
||||
|
||||
custom_semantic_identifier = _build_custom_semantic_identifier(
|
||||
semantic_identifier=d.semantic_identifier,
|
||||
blurb=d.blurb,
|
||||
source=d.source_type,
|
||||
)
|
||||
|
||||
top_document_lines.append(f"- <{d.link}|{custom_semantic_identifier}>")
|
||||
if len(top_document_lines) >= num_docs_to_display:
|
||||
break
|
||||
|
||||
return SectionBlock(
|
||||
fields=[
|
||||
"*Other potentially relevant docs:*",
|
||||
*top_document_lines,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def build_quotes_block(
|
||||
quotes: list[DanswerQuote],
|
||||
) -> tuple[list[Block], list[str]]:
|
||||
quote_lines: list[str] = []
|
||||
doc_identifiers: list[str] = []
|
||||
for quote in quotes:
|
||||
doc_id = quote.document_id
|
||||
doc_link = quote.link
|
||||
doc_name = quote.semantic_identifier
|
||||
if doc_link and doc_name and doc_id and doc_id not in doc_identifiers:
|
||||
doc_identifiers.append(doc_id)
|
||||
custom_semantic_identifier = _build_custom_semantic_identifier(
|
||||
semantic_identifier=doc_name,
|
||||
blurb=quote.blurb,
|
||||
source=quote.source_type,
|
||||
)
|
||||
quote_lines.append(f"- <{doc_link}|{custom_semantic_identifier}>")
|
||||
|
||||
if not quote_lines:
|
||||
return [], []
|
||||
|
||||
return (
|
||||
[
|
||||
SectionBlock(
|
||||
fields=[
|
||||
"*Sources:*",
|
||||
*quote_lines,
|
||||
]
|
||||
)
|
||||
],
|
||||
doc_identifiers,
|
||||
)
|
||||
|
||||
|
||||
def build_qa_response_blocks(
|
||||
query_event_id: int,
|
||||
answer: str | None,
|
||||
quotes: list[DanswerQuote] | None,
|
||||
documents: list[SearchDoc],
|
||||
) -> list[Block]:
|
||||
doc_identifiers: list[str] = []
|
||||
quotes_blocks: list[Block] = []
|
||||
if not answer:
|
||||
answer_block = SectionBlock(
|
||||
text=f"Sorry, I was unable to find an answer, but I did find some potentially relevant docs 🤓"
|
||||
)
|
||||
else:
|
||||
answer_block = SectionBlock(text=answer)
|
||||
if quotes:
|
||||
quotes_blocks, doc_identifiers = build_quotes_block(quotes)
|
||||
|
||||
# if no quotes OR `build_quotes_block()` did not give back any blocks
|
||||
if not quotes_blocks:
|
||||
quotes_blocks = [
|
||||
SectionBlock(
|
||||
text="*Warning*: no sources were quoted for this answer, so it may be unreliable 😔"
|
||||
)
|
||||
]
|
||||
|
||||
documents_block = build_documents_block(documents, doc_identifiers)
|
||||
return (
|
||||
[answer_block]
|
||||
+ quotes_blocks
|
||||
+ [documents_block, build_feedback_block(query_event_id=query_event_id)]
|
||||
)
|
2
backend/danswer/bots/slack/constants.py
Normal file
2
backend/danswer/bots/slack/constants.py
Normal file
@@ -0,0 +1,2 @@
|
||||
LIKE_BLOCK_ACTION_ID = "feedback-like"
|
||||
DISLIKE_BLOCK_ACTION_ID = "feedback-dislike"
|
16
backend/danswer/bots/slack/handlers/handle_feedback.py
Normal file
16
backend/danswer/bots/slack/handlers/handle_feedback.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.configs.constants import QAFeedbackType
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.feedback import update_query_event_feedback
|
||||
|
||||
|
||||
def handle_qa_feedback(query_id: int, feedback_type: QAFeedbackType) -> None:
|
||||
engine = get_sqlalchemy_engine()
|
||||
with Session(engine) as db_session:
|
||||
update_query_event_feedback(
|
||||
feedback=feedback_type,
|
||||
query_id=query_id,
|
||||
user_id=None, # no "user" for Slack bot for now
|
||||
db_session=db_session,
|
||||
)
|
115
backend/danswer/bots/slack/handlers/handle_message.py
Normal file
115
backend/danswer/bots/slack/handlers/handle_message.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import logging
|
||||
|
||||
from retry import retry
|
||||
from slack_sdk import WebClient
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.bots.slack.blocks import build_qa_response_blocks
|
||||
from danswer.bots.slack.utils import respond_in_thread
|
||||
from danswer.configs.app_configs import DANSWER_BOT_ANSWER_GENERATION_TIMEOUT
|
||||
from danswer.configs.app_configs import DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER
|
||||
from danswer.configs.app_configs import DANSWER_BOT_DISPLAY_ERROR_MSGS
|
||||
from danswer.configs.app_configs import DANSWER_BOT_NUM_RETRIES
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.direct_qa.answer_question import answer_qa_query
|
||||
from danswer.server.models import QAResponse
|
||||
from danswer.server.models import QuestionRequest
|
||||
|
||||
|
||||
def handle_message(
|
||||
msg: str,
|
||||
channel: str,
|
||||
message_ts_to_respond_to: str,
|
||||
client: WebClient,
|
||||
logger: logging.Logger,
|
||||
num_retries: int = DANSWER_BOT_NUM_RETRIES,
|
||||
answer_generation_timeout: int = DANSWER_BOT_ANSWER_GENERATION_TIMEOUT,
|
||||
should_respond_with_error_msgs: bool = DANSWER_BOT_DISPLAY_ERROR_MSGS,
|
||||
disable_docs_only_answer: bool = DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER,
|
||||
) -> None:
|
||||
@retry(
|
||||
tries=num_retries,
|
||||
delay=0.25,
|
||||
backoff=2,
|
||||
logger=logger,
|
||||
)
|
||||
def _get_answer(question: QuestionRequest) -> QAResponse:
|
||||
engine = get_sqlalchemy_engine()
|
||||
with Session(engine, expire_on_commit=False) as db_session:
|
||||
answer = answer_qa_query(
|
||||
question=question,
|
||||
user=None,
|
||||
db_session=db_session,
|
||||
answer_generation_timeout=answer_generation_timeout,
|
||||
)
|
||||
if not answer.error_msg:
|
||||
return answer
|
||||
else:
|
||||
raise RuntimeError(answer.error_msg)
|
||||
|
||||
try:
|
||||
answer = _get_answer(
|
||||
QuestionRequest(
|
||||
query=msg,
|
||||
collection=DOCUMENT_INDEX_NAME,
|
||||
use_keyword=None,
|
||||
filters=None,
|
||||
offset=None,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Unable to process message - did not successfully answer "
|
||||
f"in {num_retries} attempts"
|
||||
)
|
||||
# Optionally, respond in thread with the error message, Used primarily
|
||||
# for debugging purposes
|
||||
if should_respond_with_error_msgs:
|
||||
respond_in_thread(
|
||||
client=client,
|
||||
channel=channel,
|
||||
text=f"Encountered exception when trying to answer: \n\n```{e}```",
|
||||
thread_ts=message_ts_to_respond_to,
|
||||
)
|
||||
return
|
||||
|
||||
if not answer.top_ranked_docs:
|
||||
logger.error(f"Unable to answer question: '{msg}' - no documents found")
|
||||
# Optionally, respond in thread with the error message, Used primarily
|
||||
# for debugging purposes
|
||||
if should_respond_with_error_msgs:
|
||||
respond_in_thread(
|
||||
client=client,
|
||||
channel=channel,
|
||||
text="Found no documents when trying to answer. Did you index any documents?",
|
||||
thread_ts=message_ts_to_respond_to,
|
||||
)
|
||||
return
|
||||
|
||||
if not answer.answer and disable_docs_only_answer:
|
||||
logger.info(
|
||||
"Unable to find answer - not responding since the "
|
||||
"`DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER` env variable is set"
|
||||
)
|
||||
return
|
||||
|
||||
# convert raw response into "nicely" formatted Slack message
|
||||
blocks = build_qa_response_blocks(
|
||||
query_event_id=answer.query_event_id,
|
||||
answer=answer.answer,
|
||||
documents=answer.top_ranked_docs,
|
||||
quotes=answer.quotes,
|
||||
)
|
||||
try:
|
||||
respond_in_thread(
|
||||
client=client,
|
||||
channel=channel,
|
||||
blocks=blocks,
|
||||
thread_ts=message_ts_to_respond_to,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Unable to process message - could not respond in slack in {num_retries} attempts"
|
||||
)
|
||||
return
|
184
backend/danswer/bots/slack/listener.py
Normal file
184
backend/danswer/bots/slack/listener.py
Normal file
@@ -0,0 +1,184 @@
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import MutableMapping
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
from slack_sdk import WebClient
|
||||
from slack_sdk.socket_mode import SocketModeClient
|
||||
from slack_sdk.socket_mode.request import SocketModeRequest
|
||||
from slack_sdk.socket_mode.response import SocketModeResponse
|
||||
|
||||
from danswer.bots.slack.constants import DISLIKE_BLOCK_ACTION_ID
|
||||
from danswer.bots.slack.constants import LIKE_BLOCK_ACTION_ID
|
||||
from danswer.bots.slack.handlers.handle_feedback import handle_qa_feedback
|
||||
from danswer.bots.slack.handlers.handle_message import handle_message
|
||||
from danswer.bots.slack.utils import get_query_event_id_from_block_id
|
||||
from danswer.configs.constants import QAFeedbackType
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
_CHANNEL_ID = "channel_id"
|
||||
|
||||
|
||||
class _ChannelIdAdapter(logging.LoggerAdapter):
|
||||
"""This is used to add the channel ID to all log messages
|
||||
emitted in this file"""
|
||||
|
||||
def process(
|
||||
self, msg: str, kwargs: MutableMapping[str, Any]
|
||||
) -> tuple[str, MutableMapping[str, Any]]:
|
||||
channel_id = self.extra.get(_CHANNEL_ID) if self.extra else None
|
||||
if channel_id:
|
||||
return f"[Channel ID: {channel_id}] {msg}", kwargs
|
||||
else:
|
||||
return msg, kwargs
|
||||
|
||||
|
||||
def _get_socket_client() -> SocketModeClient:
|
||||
# For more info on how to set this up, checkout the docs:
|
||||
# https://docs.danswer.dev/slack_bot_setup
|
||||
app_token = os.environ.get("DANSWER_BOT_SLACK_APP_TOKEN")
|
||||
if not app_token:
|
||||
raise RuntimeError("DANSWER_BOT_SLACK_APP_TOKEN is not set")
|
||||
bot_token = os.environ.get("DANSWER_BOT_SLACK_BOT_TOKEN")
|
||||
if not bot_token:
|
||||
raise RuntimeError("DANSWER_BOT_SLACK_BOT_TOKEN is not set")
|
||||
return SocketModeClient(
|
||||
# This app-level token will be used only for establishing a connection
|
||||
app_token=app_token,
|
||||
web_client=WebClient(token=bot_token),
|
||||
)
|
||||
|
||||
|
||||
def _process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> None:
|
||||
logger.info(f"Received request of type: '{req.type}', with paylod: '{req.payload}'")
|
||||
if req.type == "events_api":
|
||||
# Acknowledge the request immediately
|
||||
response = SocketModeResponse(envelope_id=req.envelope_id)
|
||||
client.send_socket_mode_response(response)
|
||||
|
||||
event = cast(dict[str, Any], req.payload.get("event", {}))
|
||||
channel = cast(str | None, event.get("channel"))
|
||||
channel_specific_logger = _ChannelIdAdapter(
|
||||
logger, extra={_CHANNEL_ID: channel}
|
||||
)
|
||||
|
||||
# Ensure that the message is a new message + of expected type
|
||||
event_type = event.get("type")
|
||||
if event_type != "message":
|
||||
channel_specific_logger.info(
|
||||
f"Ignoring non-message event of type '{event_type}' for channel '{channel}'"
|
||||
)
|
||||
|
||||
# this should never happen, but we can't continue without a channel since
|
||||
# we can't send a response without it
|
||||
if not channel:
|
||||
channel_specific_logger.error(f"Found message without channel - skipping")
|
||||
return
|
||||
|
||||
message_subtype = event.get("subtype")
|
||||
# ignore things like channel_join, channel_leave, etc.
|
||||
# NOTE: "file_share" is just a message with a file attachment, so we
|
||||
# should not ignore it
|
||||
if message_subtype not in [None, "file_share"]:
|
||||
channel_specific_logger.info(
|
||||
f"Ignoring message with subtype '{message_subtype}' since is is a special message type"
|
||||
)
|
||||
return
|
||||
|
||||
if event.get("bot_profile"):
|
||||
channel_specific_logger.info("Ignoring message from bot")
|
||||
return
|
||||
|
||||
message_ts = event.get("ts")
|
||||
thread_ts = event.get("thread_ts")
|
||||
# pick the root of the thread (if a thread exists)
|
||||
message_ts_to_respond_to = cast(str, thread_ts or message_ts)
|
||||
if thread_ts and message_ts != thread_ts:
|
||||
channel_specific_logger.info(
|
||||
"Skipping message since it is not the root of a thread"
|
||||
)
|
||||
return
|
||||
|
||||
msg = cast(str | None, event.get("text"))
|
||||
if not msg:
|
||||
channel_specific_logger.error("Unable to process empty message")
|
||||
return
|
||||
|
||||
# TODO: message should be enqueued and processed elsewhere,
|
||||
# but doing it here for now for simplicity
|
||||
handle_message(
|
||||
msg=msg,
|
||||
channel=channel,
|
||||
message_ts_to_respond_to=message_ts_to_respond_to,
|
||||
client=client.web_client,
|
||||
logger=cast(logging.Logger, channel_specific_logger),
|
||||
)
|
||||
|
||||
channel_specific_logger.info(
|
||||
f"Successfully processed message with ts: '{message_ts}'"
|
||||
)
|
||||
|
||||
# handle button clicks
|
||||
if req.type == "interactive" and req.payload.get("type") == "block_actions":
|
||||
# Acknowledge the request immediately
|
||||
response = SocketModeResponse(envelope_id=req.envelope_id)
|
||||
client.send_socket_mode_response(response)
|
||||
|
||||
actions = req.payload.get("actions")
|
||||
if not actions:
|
||||
logger.error("Unable to process block actions - no actions found")
|
||||
return
|
||||
|
||||
action = cast(dict[str, Any], actions[0])
|
||||
action_id = action.get("action_id")
|
||||
if action_id == LIKE_BLOCK_ACTION_ID:
|
||||
feedback_type = QAFeedbackType.LIKE
|
||||
elif action_id == DISLIKE_BLOCK_ACTION_ID:
|
||||
feedback_type = QAFeedbackType.DISLIKE
|
||||
else:
|
||||
logger.error(
|
||||
f"Unable to process block action - unknown action_id: '{action_id}'"
|
||||
)
|
||||
return
|
||||
|
||||
block_id = cast(str, action.get("block_id"))
|
||||
query_event_id = get_query_event_id_from_block_id(block_id)
|
||||
handle_qa_feedback(
|
||||
query_id=query_event_id,
|
||||
feedback_type=feedback_type,
|
||||
)
|
||||
|
||||
logger.info(f"Successfully handled QA feedback for event: {query_event_id}")
|
||||
|
||||
|
||||
def process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> None:
|
||||
try:
|
||||
_process_slack_event(client=client, req=req)
|
||||
except Exception:
|
||||
logger.exception("Failed to process slack event")
|
||||
|
||||
|
||||
# Follow the guide (https://docs.danswer.dev/slack_bot_setup) to set up
|
||||
# the slack bot in your workspace, and then add the bot to any channels you want to
|
||||
# try and answer questions for. Running this file will setup Danswer to listen to all
|
||||
# messages in those channels and attempt to answer them. As of now, it will only respond
|
||||
# to messages sent directly in the channel - it will not respond to messages sent within a
|
||||
# thread.
|
||||
#
|
||||
# NOTE: we are using Web Sockets so that you can run this from within a firewalled VPC
|
||||
# without issue.
|
||||
if __name__ == "__main__":
|
||||
socket_client = _get_socket_client()
|
||||
socket_client.socket_mode_request_listeners.append(process_slack_event) # type: ignore
|
||||
# Establish a WebSocket connection to the Socket Mode servers
|
||||
logger.info("Listening for messages from Slack...")
|
||||
socket_client.connect()
|
||||
|
||||
# Just not to stop this process
|
||||
from threading import Event
|
||||
|
||||
Event().wait()
|
58
backend/danswer/bots/slack/utils.py
Normal file
58
backend/danswer/bots/slack/utils.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
from typing import cast
|
||||
|
||||
from retry import retry
|
||||
from slack_sdk import WebClient
|
||||
from slack_sdk.models.blocks import Block
|
||||
from slack_sdk.models.metadata import Metadata
|
||||
|
||||
from danswer.configs.app_configs import DANSWER_BOT_NUM_RETRIES
|
||||
from danswer.connectors.slack.utils import make_slack_api_rate_limited
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
@retry(
|
||||
tries=DANSWER_BOT_NUM_RETRIES,
|
||||
delay=0.25,
|
||||
backoff=2,
|
||||
logger=cast(logging.Logger, logger),
|
||||
)
|
||||
def respond_in_thread(
|
||||
client: WebClient,
|
||||
channel: str,
|
||||
thread_ts: str,
|
||||
text: str | None = None,
|
||||
blocks: list[Block] | None = None,
|
||||
metadata: Metadata | None = None,
|
||||
) -> None:
|
||||
if not text and not blocks:
|
||||
raise ValueError("One of `text` or `blocks` must be provided")
|
||||
|
||||
if text:
|
||||
logger.info(f"Trying to send message: {text}")
|
||||
if blocks:
|
||||
logger.info(f"Trying to send blocks: {blocks}")
|
||||
|
||||
slack_call = make_slack_api_rate_limited(client.chat_postMessage)
|
||||
response = slack_call(
|
||||
channel=channel,
|
||||
text=text,
|
||||
blocks=blocks,
|
||||
thread_ts=thread_ts,
|
||||
metadata=metadata,
|
||||
)
|
||||
if not response.get("ok"):
|
||||
raise RuntimeError(f"Unable to post message: {response}")
|
||||
|
||||
|
||||
def build_block_id_from_query_event_id(query_event_id: int) -> str:
|
||||
return f"{''.join(random.choice(string.ascii_letters) for _ in range(5))}:{query_event_id}"
|
||||
|
||||
|
||||
def get_query_event_id_from_block_id(block_id: str) -> int:
|
||||
return int(block_id.split(":")[-1])
|
@@ -27,15 +27,15 @@ def fetch_query_event_by_id(query_id: int, db_session: Session) -> QueryEvent:
|
||||
return query_event
|
||||
|
||||
|
||||
def fetch_doc_m_by_id(doc_id: str, db_session: Session) -> DbDocument:
|
||||
def fetch_docs_by_id(doc_id: str, db_session: Session) -> DbDocument:
|
||||
stmt = select(DbDocument).where(DbDocument.id == doc_id)
|
||||
result = db_session.execute(stmt)
|
||||
doc_m = result.scalar_one_or_none()
|
||||
doc = result.scalar_one_or_none()
|
||||
|
||||
if not doc_m:
|
||||
if not doc:
|
||||
raise ValueError("Invalid Document provided for updating")
|
||||
|
||||
return doc_m
|
||||
return doc
|
||||
|
||||
|
||||
def fetch_docs_ranked_by_boost(
|
||||
@@ -44,29 +44,19 @@ def fetch_docs_ranked_by_boost(
|
||||
order_func = asc if ascending else desc
|
||||
stmt = select(DbDocument).order_by(order_func(DbDocument.boost)).limit(limit)
|
||||
result = db_session.execute(stmt)
|
||||
doc_m_list = result.scalars().all()
|
||||
doc_list = result.scalars().all()
|
||||
|
||||
return list(doc_m_list)
|
||||
return list(doc_list)
|
||||
|
||||
|
||||
def create_document_metadata(
|
||||
doc_id: str,
|
||||
semantic_id: str,
|
||||
link: str | None,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
try:
|
||||
fetch_doc_m_by_id(doc_id, db_session)
|
||||
return
|
||||
except ValueError:
|
||||
# Document already exists, don't reset its data
|
||||
pass
|
||||
def update_document_boost(db_session: Session, document_id: str, boost: int) -> None:
|
||||
stmt = select(DbDocument).where(DbDocument.id == document_id)
|
||||
result = db_session.execute(stmt).scalar_one_or_none()
|
||||
if result is None:
|
||||
raise ValueError(f"No document found with ID: '{document_id}'")
|
||||
|
||||
DbDocument(
|
||||
id=doc_id,
|
||||
semantic_id=semantic_id,
|
||||
link=link,
|
||||
)
|
||||
result.boost = boost
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def create_query_event(
|
||||
@@ -121,7 +111,7 @@ def create_doc_retrieval_feedback(
|
||||
if user_id != query_event.user_id:
|
||||
raise ValueError("User trying to give feedback on a query run by another user.")
|
||||
|
||||
doc_m = fetch_doc_m_by_id(document_id, db_session)
|
||||
doc_m = fetch_docs_by_id(document_id, db_session)
|
||||
|
||||
retrieval_feedback = DocumentRetrievalFeedback(
|
||||
qa_event_id=qa_event_id,
|
||||
|
@@ -1,332 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import MutableMapping
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
from retry import retry
|
||||
from slack_sdk import WebClient
|
||||
from slack_sdk.socket_mode import SocketModeClient
|
||||
from slack_sdk.socket_mode.request import SocketModeRequest
|
||||
from slack_sdk.socket_mode.response import SocketModeResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.configs.app_configs import DANSWER_BOT_ANSWER_GENERATION_TIMEOUT
|
||||
from danswer.configs.app_configs import DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER
|
||||
from danswer.configs.app_configs import DANSWER_BOT_DISPLAY_ERROR_MSGS
|
||||
from danswer.configs.app_configs import DANSWER_BOT_NUM_DOCS_TO_DISPLAY
|
||||
from danswer.configs.app_configs import DANSWER_BOT_NUM_RETRIES
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.slack.utils import make_slack_api_rate_limited
|
||||
from danswer.connectors.slack.utils import UserIdReplacer
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.direct_qa.answer_question import answer_qa_query
|
||||
from danswer.direct_qa.interfaces import DanswerQuote
|
||||
from danswer.server.models import QAResponse
|
||||
from danswer.server.models import QuestionRequest
|
||||
from danswer.server.models import SearchDoc
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
_CHANNEL_ID = "channel_id"
|
||||
|
||||
|
||||
class _ChannelIdAdapter(logging.LoggerAdapter):
|
||||
"""This is used to add the channel ID to all log messages
|
||||
emitted in this file"""
|
||||
|
||||
def process(
|
||||
self, msg: str, kwargs: MutableMapping[str, Any]
|
||||
) -> tuple[str, MutableMapping[str, Any]]:
|
||||
channel_id = self.extra.get(_CHANNEL_ID) if self.extra else None
|
||||
if channel_id:
|
||||
return f"[Channel ID: {channel_id}] {msg}", kwargs
|
||||
else:
|
||||
return msg, kwargs
|
||||
|
||||
|
||||
def _get_socket_client() -> SocketModeClient:
|
||||
# For more info on how to set this up, checkout the docs:
|
||||
# https://docs.danswer.dev/slack_bot_setup
|
||||
app_token = os.environ.get("DANSWER_BOT_SLACK_APP_TOKEN")
|
||||
if not app_token:
|
||||
raise RuntimeError("DANSWER_BOT_SLACK_APP_TOKEN is not set")
|
||||
bot_token = os.environ.get("DANSWER_BOT_SLACK_BOT_TOKEN")
|
||||
if not bot_token:
|
||||
raise RuntimeError("DANSWER_BOT_SLACK_BOT_TOKEN is not set")
|
||||
return SocketModeClient(
|
||||
# This app-level token will be used only for establishing a connection
|
||||
app_token=app_token,
|
||||
web_client=WebClient(token=bot_token),
|
||||
)
|
||||
|
||||
|
||||
_MAX_BLURB_LEN = 25
|
||||
|
||||
|
||||
def _build_custom_semantic_identifier(
|
||||
semantic_identifier: str, blurb: str, source: str
|
||||
) -> str:
|
||||
"""
|
||||
On slack, since we just show the semantic identifier rather than semantic + blurb, we need
|
||||
to do some custom formatting to make sure the semantic identifier is unique and meaningful.
|
||||
"""
|
||||
if source == DocumentSource.SLACK.value:
|
||||
truncated_blurb = (
|
||||
f"{blurb[:_MAX_BLURB_LEN]}..." if len(blurb) > _MAX_BLURB_LEN else blurb
|
||||
)
|
||||
# NOTE: removing tags so that we don't accidentally tag users in Slack +
|
||||
# so that it can be used as part of a <link|text> link
|
||||
truncated_blurb = UserIdReplacer.replace_tags_basic(truncated_blurb)
|
||||
truncated_blurb = UserIdReplacer.replace_channels_basic(truncated_blurb)
|
||||
truncated_blurb = UserIdReplacer.replace_special_mentions(truncated_blurb)
|
||||
if truncated_blurb:
|
||||
return f"#{semantic_identifier}: {truncated_blurb}"
|
||||
else:
|
||||
return f"#{semantic_identifier}"
|
||||
|
||||
return semantic_identifier
|
||||
|
||||
|
||||
def _process_quotes(quotes: list[DanswerQuote] | None) -> tuple[str | None, list[str]]:
|
||||
if not quotes:
|
||||
return None, []
|
||||
|
||||
quote_lines: list[str] = []
|
||||
doc_identifiers: list[str] = []
|
||||
for quote in quotes:
|
||||
doc_id = quote.document_id
|
||||
doc_link = quote.link
|
||||
doc_name = quote.semantic_identifier
|
||||
if doc_link and doc_name and doc_id and doc_id not in doc_identifiers:
|
||||
doc_identifiers.append(doc_id)
|
||||
custom_semantic_identifier = _build_custom_semantic_identifier(
|
||||
semantic_identifier=doc_name,
|
||||
blurb=quote.blurb,
|
||||
source=quote.source_type,
|
||||
)
|
||||
quote_lines.append(f"- <{doc_link}|{custom_semantic_identifier}>")
|
||||
|
||||
if not quote_lines:
|
||||
return None, []
|
||||
|
||||
return "\n".join(quote_lines), doc_identifiers
|
||||
|
||||
|
||||
def _process_documents(
|
||||
documents: list[SearchDoc] | None,
|
||||
already_displayed_doc_identifiers: list[str],
|
||||
num_docs_to_display: int = DANSWER_BOT_NUM_DOCS_TO_DISPLAY,
|
||||
) -> str | None:
|
||||
if not documents:
|
||||
return None
|
||||
|
||||
seen_docs_identifiers = set(already_displayed_doc_identifiers)
|
||||
top_document_lines: list[str] = []
|
||||
for d in documents:
|
||||
if d.document_id in seen_docs_identifiers:
|
||||
continue
|
||||
seen_docs_identifiers.add(d.document_id)
|
||||
|
||||
custom_semantic_identifier = _build_custom_semantic_identifier(
|
||||
semantic_identifier=d.semantic_identifier,
|
||||
blurb=d.blurb,
|
||||
source=d.source_type,
|
||||
)
|
||||
|
||||
top_document_lines.append(f"- <{d.link}|{custom_semantic_identifier}>")
|
||||
if len(top_document_lines) >= num_docs_to_display:
|
||||
break
|
||||
|
||||
return "\n".join(top_document_lines)
|
||||
|
||||
|
||||
@retry(
|
||||
tries=DANSWER_BOT_NUM_RETRIES,
|
||||
delay=0.25,
|
||||
backoff=2,
|
||||
logger=cast(logging.Logger, logger),
|
||||
)
|
||||
def _respond_in_thread(
|
||||
client: SocketModeClient,
|
||||
channel: str,
|
||||
text: str,
|
||||
thread_ts: str,
|
||||
) -> None:
|
||||
logger.info(f"Trying to send message: {text}")
|
||||
slack_call = make_slack_api_rate_limited(client.web_client.chat_postMessage)
|
||||
response = slack_call(
|
||||
channel=channel,
|
||||
text=text,
|
||||
thread_ts=thread_ts,
|
||||
)
|
||||
if not response.get("ok"):
|
||||
raise RuntimeError(f"Unable to post message: {response}")
|
||||
|
||||
|
||||
def process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> None:
|
||||
if req.type == "events_api":
|
||||
# Acknowledge the request anyway
|
||||
response = SocketModeResponse(envelope_id=req.envelope_id)
|
||||
client.send_socket_mode_response(response)
|
||||
|
||||
event = cast(dict[str, Any], req.payload.get("event", {}))
|
||||
channel = cast(str | None, event.get("channel"))
|
||||
channel_specific_logger = _ChannelIdAdapter(
|
||||
logger, extra={_CHANNEL_ID: channel}
|
||||
)
|
||||
|
||||
# Ensure that the message is a new message + of expected type
|
||||
event_type = event.get("type")
|
||||
if event_type != "message":
|
||||
channel_specific_logger.info(
|
||||
f"Ignoring non-message event of type '{event_type}' for channel '{channel}'"
|
||||
)
|
||||
|
||||
# this should never happen, but we can't continue without a channel since
|
||||
# we can't send a response without it
|
||||
if not channel:
|
||||
channel_specific_logger.error(f"Found message without channel - skipping")
|
||||
return
|
||||
|
||||
message_subtype = event.get("subtype")
|
||||
# ignore things like channel_join, channel_leave, etc.
|
||||
# NOTE: "file_share" is just a message with a file attachment, so we
|
||||
# should not ignore it
|
||||
if message_subtype not in [None, "file_share"]:
|
||||
channel_specific_logger.info(
|
||||
f"Ignoring message with subtype '{message_subtype}' since is is a special message type"
|
||||
)
|
||||
return
|
||||
|
||||
if event.get("bot_profile"):
|
||||
channel_specific_logger.info("Ignoring message from bot")
|
||||
return
|
||||
|
||||
message_ts = event.get("ts")
|
||||
thread_ts = event.get("thread_ts")
|
||||
# pick the root of the thread (if a thread exists)
|
||||
message_ts_to_respond_to = cast(str, thread_ts or message_ts)
|
||||
if thread_ts and message_ts != thread_ts:
|
||||
channel_specific_logger.info(
|
||||
"Skipping message since it is not the root of a thread"
|
||||
)
|
||||
return
|
||||
|
||||
msg = cast(str | None, event.get("text"))
|
||||
if not msg:
|
||||
channel_specific_logger.error("Unable to process empty message")
|
||||
return
|
||||
|
||||
# TODO: message should be enqueued and processed elsewhere,
|
||||
# but doing it here for now for simplicity
|
||||
|
||||
@retry(
|
||||
tries=DANSWER_BOT_NUM_RETRIES,
|
||||
delay=0.25,
|
||||
backoff=2,
|
||||
logger=cast(logging.Logger, logger),
|
||||
)
|
||||
def _get_answer(question: QuestionRequest) -> QAResponse:
|
||||
engine = get_sqlalchemy_engine()
|
||||
with Session(engine, expire_on_commit=False) as db_session:
|
||||
answer = answer_qa_query(
|
||||
question=question,
|
||||
user=None,
|
||||
db_session=db_session,
|
||||
answer_generation_timeout=DANSWER_BOT_ANSWER_GENERATION_TIMEOUT,
|
||||
)
|
||||
if not answer.error_msg:
|
||||
return answer
|
||||
else:
|
||||
raise RuntimeError(answer.error_msg)
|
||||
|
||||
try:
|
||||
answer = _get_answer(
|
||||
QuestionRequest(
|
||||
query=msg,
|
||||
collection=DOCUMENT_INDEX_NAME,
|
||||
use_keyword=None,
|
||||
filters=None,
|
||||
offset=None,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
channel_specific_logger.exception(
|
||||
f"Unable to process message - did not successfully answer "
|
||||
f"in {DANSWER_BOT_NUM_RETRIES} attempts"
|
||||
)
|
||||
# Optionally, respond in thread with the error message, Used primarily
|
||||
# for debugging purposes
|
||||
if DANSWER_BOT_DISPLAY_ERROR_MSGS:
|
||||
_respond_in_thread(
|
||||
client=client,
|
||||
channel=channel,
|
||||
text=f"Encountered exception when trying to answer: \n\n```{e}```",
|
||||
thread_ts=message_ts_to_respond_to,
|
||||
)
|
||||
return
|
||||
|
||||
# convert raw response into "nicely" formatted Slack message
|
||||
quote_str, doc_identifiers = _process_quotes(answer.quotes)
|
||||
top_documents_str = _process_documents(answer.top_ranked_docs, doc_identifiers)
|
||||
|
||||
if not answer.answer:
|
||||
if DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER:
|
||||
logger.info(
|
||||
"Unable to find answer - not responding since the "
|
||||
"`DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER` env variable is set"
|
||||
)
|
||||
return
|
||||
|
||||
text = f"Sorry, I was unable to find an answer, but I did find some potentially relevant docs 🤓\n\n{top_documents_str}"
|
||||
else:
|
||||
top_documents_str_with_header = (
|
||||
f"*Other potentially relevant docs:*\n{top_documents_str}"
|
||||
)
|
||||
if quote_str:
|
||||
text = f"{answer.answer}\n\n*Sources:*\n{quote_str}\n\n{top_documents_str_with_header}"
|
||||
else:
|
||||
text = f"{answer.answer}\n\n*Warning*: no sources were quoted for this answer, so it may be unreliable 😔\n\n{top_documents_str_with_header}"
|
||||
|
||||
try:
|
||||
_respond_in_thread(
|
||||
client=client,
|
||||
channel=channel,
|
||||
text=text,
|
||||
thread_ts=message_ts_to_respond_to,
|
||||
)
|
||||
except Exception:
|
||||
channel_specific_logger.exception(
|
||||
f"Unable to process message - could not respond in slack in {DANSWER_BOT_NUM_RETRIES} attempts"
|
||||
)
|
||||
return
|
||||
|
||||
channel_specific_logger.info(
|
||||
f"Successfully processed message with ts: '{message_ts}'"
|
||||
)
|
||||
|
||||
|
||||
# Follow the guide (https://docs.danswer.dev/slack_bot_setup) to set up
|
||||
# the slack bot in your workspace, and then add the bot to any channels you want to
|
||||
# try and answer questions for. Running this file will setup Danswer to listen to all
|
||||
# messages in those channels and attempt to answer them. As of now, it will only respond
|
||||
# to messages sent directly in the channel - it will not respond to messages sent within a
|
||||
# thread.
|
||||
#
|
||||
# NOTE: we are using Web Sockets so that you can run this from within a firewalled VPC
|
||||
# without issue.
|
||||
if __name__ == "__main__":
|
||||
socket_client = _get_socket_client()
|
||||
socket_client.socket_mode_request_listeners.append(process_slack_event) # type: ignore
|
||||
# Establish a WebSocket connection to the Socket Mode servers
|
||||
logger.info("Listening for messages from Slack...")
|
||||
socket_client.connect()
|
||||
|
||||
# Just not to stop this process
|
||||
from threading import Event
|
||||
|
||||
Event().wait()
|
@@ -85,6 +85,7 @@ def retrieve_ranked_documents(
|
||||
f"Semantic search returned no results with filters: {filters_log_msg}"
|
||||
)
|
||||
return None, None
|
||||
logger.info(top_chunks)
|
||||
ranked_chunks = semantic_reranking(query, top_chunks[:num_rerank])
|
||||
|
||||
top_docs = [
|
||||
|
@@ -50,6 +50,7 @@ from danswer.db.deletion_attempt import create_deletion_attempt
|
||||
from danswer.db.deletion_attempt import get_deletion_attempts
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.db.feedback import fetch_docs_ranked_by_boost
|
||||
from danswer.db.feedback import update_document_boost
|
||||
from danswer.db.index_attempt import create_index_attempt
|
||||
from danswer.db.index_attempt import get_latest_index_attempts
|
||||
from danswer.db.models import DeletionAttempt
|
||||
@@ -63,6 +64,7 @@ from danswer.server.models import ApiKey
|
||||
from danswer.server.models import AuthStatus
|
||||
from danswer.server.models import AuthUrl
|
||||
from danswer.server.models import BoostDoc
|
||||
from danswer.server.models import BoostUpdateRequest
|
||||
from danswer.server.models import ConnectorBase
|
||||
from danswer.server.models import ConnectorCredentialPairIdentifier
|
||||
from danswer.server.models import ConnectorIndexingStatus
|
||||
@@ -104,6 +106,7 @@ def get_most_boosted_docs(
|
||||
BoostDoc(
|
||||
document_id=doc.id,
|
||||
semantic_id=doc.semantic_id,
|
||||
# source=doc.source,
|
||||
link=doc.link or "",
|
||||
boost=doc.boost,
|
||||
hidden=doc.hidden,
|
||||
@@ -112,6 +115,22 @@ def get_most_boosted_docs(
|
||||
]
|
||||
|
||||
|
||||
@router.post("/admin/doc-boosts")
|
||||
def document_boost_update(
|
||||
boost_update: BoostUpdateRequest,
|
||||
_: User | None = Depends(current_admin_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> None:
|
||||
try:
|
||||
update_document_boost(
|
||||
db_session=db_session,
|
||||
document_id=boost_update.document_id,
|
||||
boost=boost_update.boost,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/admin/connector/google-drive/app-credential")
|
||||
def check_google_app_credentials_exist(
|
||||
_: User = Depends(current_admin_user),
|
||||
|
@@ -115,6 +115,11 @@ class BoostDoc(BaseModel):
|
||||
hidden: bool
|
||||
|
||||
|
||||
class BoostUpdateRequest(BaseModel):
|
||||
document_id: str
|
||||
boost: int
|
||||
|
||||
|
||||
class SearchDoc(BaseModel):
|
||||
document_id: str
|
||||
semantic_identifier: str
|
||||
|
@@ -29,7 +29,7 @@ autorestart=true
|
||||
# If not setup, this will just fail 5 times and then stop.
|
||||
# More details on setup here: https://docs.danswer.dev/slack_bot_setup
|
||||
[program:slack_bot_listener]
|
||||
command=python danswer/listeners/slack_listener.py
|
||||
command=python danswer/bots/slack/listener.py
|
||||
stdout_logfile=/var/log/slack_bot_listener.log
|
||||
stdout_logfile_maxbytes=52428800
|
||||
redirect_stderr=true
|
||||
|
Reference in New Issue
Block a user