mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-07-12 22:23:01 +02:00
Telemetry Revision (#868)
This commit is contained in:
@ -258,7 +258,7 @@ def translate_citations(
|
||||
|
||||
|
||||
@log_generator_function_time()
|
||||
def stream_chat_packets(
|
||||
def stream_chat_message(
|
||||
new_msg_req: CreateChatMessageRequest,
|
||||
user: User | None,
|
||||
db_session: Session,
|
||||
|
@ -25,13 +25,15 @@ from danswer.danswerbot.slack.utils import fetch_userids_from_emails
|
||||
from danswer.danswerbot.slack.utils import respond_in_thread
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.models import SlackBotConfig
|
||||
from danswer.one_shot_answer.answer_question import get_one_shot_answer
|
||||
from danswer.one_shot_answer.answer_question import get_search_answer
|
||||
from danswer.one_shot_answer.models import DirectQARequest
|
||||
from danswer.one_shot_answer.models import OneShotQAResponse
|
||||
from danswer.search.models import BaseFilters
|
||||
from danswer.search.models import OptionalSearchSetting
|
||||
from danswer.search.models import RetrievalDetails
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.telemetry import optional_telemetry
|
||||
from danswer.utils.telemetry import RecordType
|
||||
|
||||
logger_base = setup_logger()
|
||||
|
||||
@ -96,8 +98,9 @@ def handle_message(
|
||||
messages = message_info.thread_messages
|
||||
message_ts_to_respond_to = message_info.msg_to_respond
|
||||
sender_id = message_info.sender
|
||||
bipass_filters = message_info.bipass_filters
|
||||
bypass_filters = message_info.bypass_filters
|
||||
is_bot_msg = message_info.is_bot_msg
|
||||
is_bot_dm = message_info.is_bot_dm
|
||||
|
||||
engine = get_sqlalchemy_engine()
|
||||
|
||||
@ -129,7 +132,7 @@ def handle_message(
|
||||
|
||||
if channel_config and channel_config.channel_config:
|
||||
channel_conf = channel_config.channel_config
|
||||
if not bipass_filters and "answer_filters" in channel_conf:
|
||||
if not bypass_filters and "answer_filters" in channel_conf:
|
||||
reflexion = "well_answered_postfilter" in channel_conf["answer_filters"]
|
||||
|
||||
if (
|
||||
@ -150,7 +153,7 @@ def handle_message(
|
||||
respond_tag_only = channel_conf.get("respond_tag_only") or False
|
||||
respond_team_member_list = channel_conf.get("respond_team_member_list") or None
|
||||
|
||||
if respond_tag_only and not bipass_filters:
|
||||
if respond_tag_only and not bypass_filters:
|
||||
logger.info(
|
||||
"Skipping message since the channel is configured such that "
|
||||
"DanswerBot only responds to tags"
|
||||
@ -184,9 +187,21 @@ def handle_message(
|
||||
logger=logger,
|
||||
)
|
||||
def _get_answer(new_message_request: DirectQARequest) -> OneShotQAResponse:
|
||||
action = "slack_message"
|
||||
if is_bot_msg:
|
||||
action = "slack_slash_message"
|
||||
elif bypass_filters:
|
||||
action = "slack_tag_message"
|
||||
elif is_bot_dm:
|
||||
action = "slack_dm_message"
|
||||
optional_telemetry(
|
||||
record_type=RecordType.USAGE,
|
||||
data={"action": action},
|
||||
)
|
||||
|
||||
with Session(engine, expire_on_commit=False) as db_session:
|
||||
# This also handles creating the query event in postgres
|
||||
answer = get_one_shot_answer(
|
||||
answer = get_search_answer(
|
||||
query_req=new_message_request,
|
||||
user=None,
|
||||
db_session=db_session,
|
||||
|
@ -204,8 +204,9 @@ def build_request_details(
|
||||
channel_to_respond=channel,
|
||||
msg_to_respond=cast(str, message_ts or thread_ts),
|
||||
sender=event.get("user") or None,
|
||||
bipass_filters=tagged,
|
||||
bypass_filters=tagged,
|
||||
is_bot_msg=False,
|
||||
is_bot_dm=event.get("channel_type") == "im",
|
||||
)
|
||||
|
||||
elif req.type == "slash_commands":
|
||||
@ -220,8 +221,9 @@ def build_request_details(
|
||||
channel_to_respond=channel,
|
||||
msg_to_respond=None,
|
||||
sender=sender,
|
||||
bipass_filters=True,
|
||||
bypass_filters=True,
|
||||
is_bot_msg=True,
|
||||
is_bot_dm=False,
|
||||
)
|
||||
|
||||
raise RuntimeError("Programming fault, this should never happen.")
|
||||
@ -270,8 +272,9 @@ def process_message(
|
||||
and not respond_every_channel
|
||||
# Can't have configs for DMs so don't toss them out
|
||||
and not is_dm
|
||||
# If @DanswerBot or /DanswerBot, always respond with the default configs
|
||||
and not (details.is_bot_msg or details.bipass_filters)
|
||||
# If /DanswerBot (is_bot_msg) or @DanswerBot (bypass_filters)
|
||||
# always respond with the default configs
|
||||
and not (details.is_bot_msg or details.bypass_filters)
|
||||
):
|
||||
return
|
||||
|
||||
|
@ -8,5 +8,6 @@ class SlackMessageInfo(BaseModel):
|
||||
channel_to_respond: str
|
||||
msg_to_respond: str | None
|
||||
sender: str | None
|
||||
bipass_filters: bool
|
||||
is_bot_msg: bool
|
||||
bypass_filters: bool # User has tagged @DanswerBot
|
||||
is_bot_msg: bool # User is using /DanswerBot
|
||||
is_bot_dm: bool # User is direct messaging to DanswerBot
|
||||
|
@ -10,12 +10,10 @@ from danswer.indexing.models import IndexChunk
|
||||
from danswer.search.models import Embedder
|
||||
from danswer.search.search_nlp_models import EmbeddingModel
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def embed_chunks(
|
||||
chunks: list[DocAwareChunk],
|
||||
embedding_model: SentenceTransformer | None = None,
|
||||
|
@ -27,6 +27,7 @@ from danswer.indexing.models import DocAwareChunk
|
||||
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
||||
from danswer.search.models import Embedder
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@ -66,7 +67,8 @@ def upsert_documents_in_db(
|
||||
)
|
||||
|
||||
|
||||
def _indexing_pipeline(
|
||||
@log_function_time()
|
||||
def index_doc_batch(
|
||||
*,
|
||||
chunker: Chunker,
|
||||
embedder: Embedder,
|
||||
@ -200,7 +202,7 @@ def build_indexing_pipeline(
|
||||
document_index = document_index or get_default_document_index()
|
||||
|
||||
return partial(
|
||||
_indexing_pipeline,
|
||||
index_doc_batch,
|
||||
chunker=chunker,
|
||||
embedder=embedder,
|
||||
document_index=document_index,
|
||||
|
@ -46,7 +46,6 @@ from danswer.utils.timing import log_generator_function_time
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
@log_generator_function_time()
|
||||
def stream_answer_objects(
|
||||
query_req: DirectQARequest,
|
||||
user: User | None,
|
||||
@ -250,7 +249,8 @@ def stream_answer_objects(
|
||||
yield msg_detail_response
|
||||
|
||||
|
||||
def stream_one_shot_answer(
|
||||
@log_generator_function_time()
|
||||
def stream_search_answer(
|
||||
query_req: DirectQARequest,
|
||||
user: User | None,
|
||||
db_session: Session,
|
||||
@ -262,7 +262,7 @@ def stream_one_shot_answer(
|
||||
yield get_json_line(obj.dict())
|
||||
|
||||
|
||||
def get_one_shot_answer(
|
||||
def get_search_answer(
|
||||
query_req: DirectQARequest,
|
||||
user: User | None,
|
||||
db_session: Session,
|
||||
|
@ -7,7 +7,6 @@ from danswer.search.search_nlp_models import IntentModel
|
||||
from danswer.search.search_runner import remove_stop_words_and_punctuation
|
||||
from danswer.server.query_and_chat.models import HelperResponse
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@ -23,7 +22,6 @@ def count_unk_tokens(text: str, tokenizer: AutoTokenizer) -> int:
|
||||
return num_unk_tokens
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def query_intent(query: str) -> tuple[SearchType, QueryFlow]:
|
||||
intent_model = IntentModel()
|
||||
class_probs = intent_model.predict(query)
|
||||
|
@ -134,7 +134,7 @@ def combine_retrieval_results(
|
||||
return sorted_chunks
|
||||
|
||||
|
||||
@log_function_time()
|
||||
@log_function_time(print_only=True)
|
||||
def doc_index_retrieval(
|
||||
query: SearchQuery,
|
||||
document_index: DocumentIndex,
|
||||
@ -171,7 +171,7 @@ def doc_index_retrieval(
|
||||
return top_chunks
|
||||
|
||||
|
||||
@log_function_time()
|
||||
@log_function_time(print_only=True)
|
||||
def semantic_reranking(
|
||||
query: str,
|
||||
chunks: list[InferenceChunk],
|
||||
|
@ -14,7 +14,6 @@ from danswer.prompts.filter_extration import SOURCE_FILTER_PROMPT
|
||||
from danswer.prompts.filter_extration import WEB_SOURCE_WARNING
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.text_processing import extract_embedded_json
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@ -42,7 +41,6 @@ def _sample_document_sources(
|
||||
return random.sample(valid_sources, num_sample)
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def extract_source_filter(
|
||||
query: str, db_session: Session
|
||||
) -> list[DocumentSource] | None:
|
||||
|
@ -10,7 +10,6 @@ from danswer.llm.utils import dict_based_prompt_to_langchain_prompt
|
||||
from danswer.prompts.filter_extration import TIME_FILTER_PROMPT
|
||||
from danswer.prompts.prompt_utils import get_current_llm_day_time
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@ -40,7 +39,6 @@ def best_match_time(time_str: str) -> datetime | None:
|
||||
return None
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def extract_time_filter(query: str) -> tuple[datetime | None, bool]:
|
||||
"""Returns a datetime if a hard time filter should be applied for the given query
|
||||
Additionally returns a bool, True if more recently updated Documents should be
|
||||
|
@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.auth.users import current_user
|
||||
from danswer.chat.chat_utils import create_chat_chain
|
||||
from danswer.chat.process_message import stream_chat_packets
|
||||
from danswer.chat.process_message import stream_chat_message
|
||||
from danswer.db.chat import create_chat_session
|
||||
from danswer.db.chat import delete_chat_session
|
||||
from danswer.db.chat import get_chat_message
|
||||
@ -37,7 +37,6 @@ from danswer.server.query_and_chat.models import RenameChatSessionResponse
|
||||
from danswer.server.query_and_chat.models import SearchFeedbackRequest
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
router = APIRouter(prefix="/chat")
|
||||
@ -172,7 +171,7 @@ def handle_new_chat_message(
|
||||
if not chat_message_req.message and chat_message_req.prompt_id is not None:
|
||||
raise HTTPException(status_code=400, detail="Empty chat message is invalid")
|
||||
|
||||
packets = stream_chat_packets(
|
||||
packets = stream_chat_message(
|
||||
new_msg_req=chat_message_req,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
|
@ -11,7 +11,7 @@ from danswer.db.engine import get_session
|
||||
from danswer.db.models import User
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.document_index.vespa.index import VespaIndex
|
||||
from danswer.one_shot_answer.answer_question import stream_one_shot_answer
|
||||
from danswer.one_shot_answer.answer_question import stream_search_answer
|
||||
from danswer.one_shot_answer.models import DirectQARequest
|
||||
from danswer.search.access_filters import build_access_filters_for_user
|
||||
from danswer.search.danswer_helper import recommend_search_flow
|
||||
@ -165,7 +165,7 @@ def get_answer_with_quote(
|
||||
) -> StreamingResponse:
|
||||
query = query_request.messages[0].message
|
||||
logger.info(f"Received query for one shot answer with quotes: {query}")
|
||||
packets = stream_one_shot_answer(
|
||||
packets = stream_search_answer(
|
||||
query_req=query_request, user=user, db_session=db_session
|
||||
)
|
||||
return StreamingResponse(packets, media_type="application/json")
|
||||
|
@ -16,6 +16,7 @@ DANSWER_TELEMETRY_ENDPOINT = "https://telemetry.danswer.ai/anonymous_telemetry"
|
||||
class RecordType(str, Enum):
|
||||
VERSION = "version"
|
||||
SIGN_UP = "sign_up"
|
||||
USAGE = "usage"
|
||||
LATENCY = "latency"
|
||||
FAILURE = "failure"
|
||||
|
||||
|
@ -17,7 +17,9 @@ F = TypeVar("F", bound=Callable)
|
||||
FG = TypeVar("FG", bound=Callable[..., Generator | Iterator])
|
||||
|
||||
|
||||
def log_function_time(func_name: str | None = None) -> Callable[[F], F]:
|
||||
def log_function_time(
|
||||
func_name: str | None = None, print_only: bool = False
|
||||
) -> Callable[[F], F]:
|
||||
def decorator(func: F) -> F:
|
||||
@wraps(func)
|
||||
def wrapped_func(*args: Any, **kwargs: Any) -> Any:
|
||||
@ -26,10 +28,13 @@ def log_function_time(func_name: str | None = None) -> Callable[[F], F]:
|
||||
elapsed_time_str = str(time.time() - start_time)
|
||||
log_name = func_name or func.__name__
|
||||
logger.info(f"{log_name} took {elapsed_time_str} seconds")
|
||||
optional_telemetry(
|
||||
record_type=RecordType.LATENCY,
|
||||
data={"function": log_name, "latency": str(elapsed_time_str)},
|
||||
)
|
||||
|
||||
if not print_only:
|
||||
optional_telemetry(
|
||||
record_type=RecordType.LATENCY,
|
||||
data={"function": log_name, "latency": str(elapsed_time_str)},
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
@ -37,7 +42,9 @@ def log_function_time(func_name: str | None = None) -> Callable[[F], F]:
|
||||
return decorator
|
||||
|
||||
|
||||
def log_generator_function_time(func_name: str | None = None) -> Callable[[FG], FG]:
|
||||
def log_generator_function_time(
|
||||
func_name: str | None = None, print_only: bool = False
|
||||
) -> Callable[[FG], FG]:
|
||||
def decorator(func: FG) -> FG:
|
||||
@wraps(func)
|
||||
def wrapped_func(*args: Any, **kwargs: Any) -> Any:
|
||||
@ -54,10 +61,11 @@ def log_generator_function_time(func_name: str | None = None) -> Callable[[FG],
|
||||
elapsed_time_str = str(time.time() - start_time)
|
||||
log_name = func_name or func.__name__
|
||||
logger.info(f"{log_name} took {elapsed_time_str} seconds")
|
||||
optional_telemetry(
|
||||
record_type=RecordType.LATENCY,
|
||||
data={"function": log_name, "latency": str(elapsed_time_str)},
|
||||
)
|
||||
if not print_only:
|
||||
optional_telemetry(
|
||||
record_type=RecordType.LATENCY,
|
||||
data={"function": log_name, "latency": str(elapsed_time_str)},
|
||||
)
|
||||
|
||||
return cast(FG, wrapped_func)
|
||||
|
||||
|
@ -4,14 +4,12 @@ from fastapi import APIRouter
|
||||
|
||||
from danswer.search.search_nlp_models import get_intent_model_tokenizer
|
||||
from danswer.search.search_nlp_models import get_local_intent_model
|
||||
from danswer.utils.timing import log_function_time
|
||||
from shared_models.model_server_models import IntentRequest
|
||||
from shared_models.model_server_models import IntentResponse
|
||||
|
||||
router = APIRouter(prefix="/custom")
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def classify_intent(query: str) -> list[float]:
|
||||
tokenizer = get_intent_model_tokenizer()
|
||||
intent_model = get_local_intent_model()
|
||||
|
@ -20,7 +20,7 @@ WARM_UP_STRING = "Danswer is amazing"
|
||||
router = APIRouter(prefix="/encoder")
|
||||
|
||||
|
||||
@log_function_time()
|
||||
@log_function_time(print_only=True)
|
||||
def embed_text(
|
||||
texts: list[str],
|
||||
normalize_embeddings: bool = NORMALIZE_EMBEDDINGS,
|
||||
@ -34,7 +34,7 @@ def embed_text(
|
||||
return embeddings
|
||||
|
||||
|
||||
@log_function_time()
|
||||
@log_function_time(print_only=True)
|
||||
def calc_sim_scores(query: str, docs: list[str]) -> list[list[float]]:
|
||||
cross_encoders = get_local_reranking_model_ensemble()
|
||||
sim_scores = [
|
||||
|
@ -11,7 +11,7 @@ from sqlalchemy.orm import Session
|
||||
from danswer.chat.models import LLMMetricsContainer
|
||||
from danswer.configs.constants import MessageType
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.one_shot_answer.answer_question import get_one_shot_answer
|
||||
from danswer.one_shot_answer.answer_question import get_search_answer
|
||||
from danswer.one_shot_answer.models import DirectQARequest
|
||||
from danswer.one_shot_answer.models import ThreadMessage
|
||||
from danswer.search.models import IndexFilters
|
||||
@ -105,7 +105,7 @@ def get_answer_for_question(
|
||||
rerank_metrics = MetricsHander[RerankMetricsContainer]()
|
||||
llm_metrics = MetricsHander[LLMMetricsContainer]()
|
||||
|
||||
answer = get_one_shot_answer(
|
||||
answer = get_search_answer(
|
||||
query_req=new_message_request,
|
||||
user=None,
|
||||
db_session=db_session,
|
||||
|
Reference in New Issue
Block a user