major Agent Search Updates (#3994)

This commit is contained in:
joachim-danswer 2025-02-14 11:40:21 -08:00 committed by GitHub
parent ec78f78f3c
commit 6687d5d499
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 2115 additions and 431 deletions

View File

@ -9,7 +9,6 @@ class CoreState(BaseModel):
This is the core state that is shared across all subgraphs.
"""
base_question: str = ""
log_messages: Annotated[list[str], add] = []
@ -18,4 +17,4 @@ class SubgraphCoreState(BaseModel):
This is the core state that is shared across all subgraphs.
"""
log_messages: Annotated[list[str], add]
log_messages: Annotated[list[str], add] = []

View File

@ -1,8 +1,8 @@
from datetime import datetime
from typing import cast
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from langchain_core.runnables.config import RunnableConfig
from onyx.agents.agent_search.deep_search.initial.generate_individual_sub_answer.states import (
@ -12,14 +12,43 @@ from onyx.agents.agent_search.deep_search.initial.generate_individual_sub_answer
SubQuestionAnswerCheckUpdate,
)
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
binary_string_test,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_POSITIVE_VALUE_STR,
)
from onyx.agents.agent_search.shared_graph_utils.constants import AgentLLMErrorType
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import parse_question_id
from onyx.configs.agent_configs import AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import SUB_ANSWER_CHECK_PROMPT
from onyx.prompts.agent_search import UNKNOWN_ANSWER
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="LLM Timeout Error. The sub-answer will be treated as 'relevant'",
rate_limit="LLM Rate Limit Error. The sub-answer will be treated as 'relevant'",
general_error="General LLM Error. The sub-answer will be treated as 'relevant'",
)
@log_function_time(print_only=True)
def check_sub_answer(
state: AnswerQuestionState, config: RunnableConfig
) -> SubQuestionAnswerCheckUpdate:
@ -53,14 +82,40 @@ def check_sub_answer(
graph_config = cast(GraphConfig, config["metadata"]["config"])
fast_llm = graph_config.tooling.fast_llm
response = list(
fast_llm.stream(
agent_error: AgentErrorLog | None = None
response: BaseMessage | None = None
try:
response = fast_llm.invoke(
prompt=msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK,
)
)
quality_str: str = merge_message_runs(response, chunk_separator="")[0].content
answer_quality = "yes" in quality_str.lower()
quality_str: str = cast(str, response.content)
answer_quality = binary_string_test(
text=quality_str, positive_value=AGENT_POSITIVE_VALUE_STR
)
log_result = f"Answer quality: {quality_str}"
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
answer_quality = True
log_result = agent_error.error_result
logger.error("LLM Timeout Error - check sub answer")
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
answer_quality = True
log_result = agent_error.error_result
logger.error("LLM Rate Limit Error - check sub answer")
return SubQuestionAnswerCheckUpdate(
answer_quality=answer_quality,
@ -69,7 +124,7 @@ def check_sub_answer(
graph_component="initial - generate individual sub answer",
node_name="check sub answer",
node_start_time=node_start_time,
result=f"Answer quality: {quality_str}",
result=log_result,
)
],
)

View File

@ -16,6 +16,23 @@ from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
build_sub_question_answer_prompt,
)
from onyx.agents.agent_search.shared_graph_utils.calculations import (
dedup_sort_inference_section_list,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
LLM_ANSWER_ERROR_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import get_answer_citation_ids
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
@ -30,12 +47,23 @@ from onyx.chat.models import StreamStopInfo
from onyx.chat.models import StreamStopReason
from onyx.chat.models import StreamType
from onyx.configs.agent_configs import AGENT_MAX_ANSWER_CONTEXT_DOCS
from onyx.configs.agent_configs import AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import NO_RECOVERED_DOCS
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="LLM Timeout Error. A sub-answer could not be constructed and the sub-question will be ignored.",
rate_limit="LLM Rate Limit Error. A sub-answer could not be constructed and the sub-question will be ignored.",
general_error="General LLM Error. A sub-answer could not be constructed and the sub-question will be ignored.",
)
@log_function_time(print_only=True)
def generate_sub_answer(
state: AnswerQuestionState,
config: RunnableConfig,
@ -51,12 +79,17 @@ def generate_sub_answer(
state.verified_reranked_documents
level, question_num = parse_question_id(state.question_id)
context_docs = state.context_documents[:AGENT_MAX_ANSWER_CONTEXT_DOCS]
context_docs = dedup_sort_inference_section_list(context_docs)
persona_contextualized_prompt = get_persona_agent_prompt_expressions(
graph_config.inputs.search_request.persona
).contextualized_prompt
if len(context_docs) == 0:
answer_str = NO_RECOVERED_DOCS
cited_documents: list = []
log_results = "No documents retrieved"
write_custom_event(
"sub_answers",
AgentAnswerPiece(
@ -79,41 +112,67 @@ def generate_sub_answer(
response: list[str | list[str | dict[str, Any]]] = []
dispatch_timings: list[float] = []
for message in fast_llm.stream(
prompt=msg,
):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
agent_error: AgentErrorLog | None = None
try:
for message in fast_llm.stream(
prompt=msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION,
):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
)
start_stream_token = datetime.now()
write_custom_event(
"sub_answers",
AgentAnswerPiece(
answer_piece=content,
level=level,
level_question_num=question_num,
answer_type="agent_sub_answer",
),
writer,
)
start_stream_token = datetime.now()
write_custom_event(
"sub_answers",
AgentAnswerPiece(
answer_piece=content,
level=level,
level_question_num=question_num,
answer_type="agent_sub_answer",
),
writer,
)
end_stream_token = datetime.now()
dispatch_timings.append(
(end_stream_token - start_stream_token).microseconds
)
response.append(content)
end_stream_token = datetime.now()
dispatch_timings.append(
(end_stream_token - start_stream_token).microseconds
)
response.append(content)
answer_str = merge_message_runs(response, chunk_separator="")[0].content
logger.debug(
f"Average dispatch time: {sum(dispatch_timings) / len(dispatch_timings)}"
)
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - generate sub answer")
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - generate sub answer")
answer_citation_ids = get_answer_citation_ids(answer_str)
cited_documents = [
context_docs[id] for id in answer_citation_ids if id < len(context_docs)
]
if agent_error:
answer_str = LLM_ANSWER_ERROR_MESSAGE
cited_documents = []
log_results = (
agent_error.error_result
or "Sub-answer generation failed due to LLM error"
)
else:
answer_str = merge_message_runs(response, chunk_separator="")[0].content
answer_citation_ids = get_answer_citation_ids(answer_str)
cited_documents = [
context_docs[id] for id in answer_citation_ids if id < len(context_docs)
]
log_results = None
stop_event = StreamStopInfo(
stop_reason=StreamStopReason.FINISHED,
@ -131,7 +190,7 @@ def generate_sub_answer(
graph_component="initial - generate individual sub answer",
node_name="generate sub answer",
node_start_time=node_start_time,
result="",
result=log_results or "",
)
],
)

View File

@ -42,10 +42,8 @@ class SubQuestionRetrievalIngestionUpdate(LoggerUpdate, BaseModel):
class SubQuestionAnsweringInput(SubgraphCoreState):
question: str = ""
question_id: str = (
"" # 0_0 is original question, everything else is <level>_<question_num>.
)
question: str
question_id: str
# level 0 is original question and first decomposition, level 1 is follow up, etc
# question_num is a unique number per original question per level.

View File

@ -26,14 +26,31 @@ from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
trim_prompt_piece,
)
from onyx.agents.agent_search.shared_graph_utils.calculations import (
get_answer_generation_documents,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import InitialAgentResultStats
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_sections,
dedup_inference_section_list,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
dispatch_main_answer_stop_info,
)
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_deduplicated_structured_subquestion_documents,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
@ -42,12 +59,16 @@ from onyx.agents.agent_search.shared_graph_utils.utils import remove_document_ci
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import ExtendedToolResponse
from onyx.chat.models import StreamingError
from onyx.configs.agent_configs import AGENT_MAX_ANSWER_CONTEXT_DOCS
from onyx.configs.agent_configs import AGENT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER
from onyx.configs.agent_configs import AGENT_MIN_ORIG_QUESTION_DOCS
from onyx.context.search.models import InferenceSection
from onyx.prompts.agent_search import (
INITIAL_ANSWER_PROMPT_W_SUB_QUESTIONS,
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION,
)
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import INITIAL_ANSWER_PROMPT_W_SUB_QUESTIONS
from onyx.prompts.agent_search import (
INITIAL_ANSWER_PROMPT_WO_SUB_QUESTIONS,
)
@ -56,8 +77,16 @@ from onyx.prompts.agent_search import (
)
from onyx.prompts.agent_search import UNKNOWN_ANSWER
from onyx.tools.tool_implementations.search.search_tool import yield_search_responses
from onyx.utils.timing import log_function_time
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="LLM Timeout Error. The initial answer could not be generated.",
rate_limit="LLM Rate Limit Error. The initial answer could not be generated.",
general_error="General LLM Error. The initial answer could not be generated.",
)
@log_function_time(print_only=True)
def generate_initial_answer(
state: SubQuestionRetrievalState,
config: RunnableConfig,
@ -73,15 +102,19 @@ def generate_initial_answer(
question = graph_config.inputs.search_request.query
prompt_enrichment_components = get_prompt_enrichment_components(graph_config)
sub_questions_cited_documents = state.cited_documents
# get all documents cited in sub-questions
structured_subquestion_docs = get_deduplicated_structured_subquestion_documents(
state.sub_question_results
)
orig_question_retrieval_documents = state.orig_question_retrieved_documents
consolidated_context_docs: list[InferenceSection] = sub_questions_cited_documents
consolidated_context_docs = structured_subquestion_docs.cited_documents
counter = 0
for original_doc_number, original_doc in enumerate(
orig_question_retrieval_documents
):
if original_doc_number not in sub_questions_cited_documents:
if original_doc_number not in structured_subquestion_docs.cited_documents:
if (
counter <= AGENT_MIN_ORIG_QUESTION_DOCS
or len(consolidated_context_docs) < AGENT_MAX_ANSWER_CONTEXT_DOCS
@ -90,15 +123,18 @@ def generate_initial_answer(
counter += 1
# sort docs by their scores - though the scores refer to different questions
relevant_docs = dedup_inference_sections(
consolidated_context_docs, consolidated_context_docs
)
relevant_docs = dedup_inference_section_list(consolidated_context_docs)
sub_questions: list[str] = []
streamed_documents = (
relevant_docs
if len(relevant_docs) > 0
else state.orig_question_retrieved_documents[:15]
# Create the list of documents to stream out. Start with the
# ones that wil be in the context (or, if len == 0, use docs
# that were retrieved for the original question)
answer_generation_documents = get_answer_generation_documents(
relevant_docs=relevant_docs,
context_documents=structured_subquestion_docs.context_documents,
original_question_docs=orig_question_retrieval_documents,
max_docs=AGENT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER,
)
# Use the query info from the base document retrieval
@ -108,11 +144,13 @@ def generate_initial_answer(
graph_config.tooling.search_tool
), "search_tool must be provided for agentic search"
relevance_list = relevance_from_docs(relevant_docs)
relevance_list = relevance_from_docs(
answer_generation_documents.streaming_documents
)
for tool_response in yield_search_responses(
query=question,
reranked_sections=streamed_documents,
final_context_sections=streamed_documents,
reranked_sections=answer_generation_documents.streaming_documents,
final_context_sections=answer_generation_documents.context_documents,
search_query_info=query_info,
get_section_relevance=lambda: relevance_list,
search_tool=graph_config.tooling.search_tool,
@ -128,7 +166,7 @@ def generate_initial_answer(
writer,
)
if len(relevant_docs) == 0:
if len(answer_generation_documents.context_documents) == 0:
write_custom_event(
"initial_agent_answer",
AgentAnswerPiece(
@ -194,7 +232,7 @@ def generate_initial_answer(
model = graph_config.tooling.fast_llm
doc_context = format_docs(relevant_docs)
doc_context = format_docs(answer_generation_documents.context_documents)
doc_context = trim_prompt_piece(
config=model.config,
prompt_piece=doc_context,
@ -224,30 +262,82 @@ def generate_initial_answer(
streamed_tokens: list[str | list[str | dict[str, Any]]] = [""]
dispatch_timings: list[float] = []
for message in model.stream(msg):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
)
start_stream_token = datetime.now()
agent_error: AgentErrorLog | None = None
try:
for message in model.stream(
msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION,
):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
)
start_stream_token = datetime.now()
write_custom_event(
"initial_agent_answer",
AgentAnswerPiece(
answer_piece=content,
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
end_stream_token = datetime.now()
dispatch_timings.append(
(end_stream_token - start_stream_token).microseconds
)
streamed_tokens.append(content)
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - generate initial answer")
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - generate initial answer")
if agent_error:
write_custom_event(
"initial_agent_answer",
AgentAnswerPiece(
answer_piece=content,
level=0,
level_question_num=0,
answer_type="agent_level_answer",
StreamingError(
error=AGENT_LLM_TIMEOUT_MESSAGE,
),
writer,
)
end_stream_token = datetime.now()
dispatch_timings.append(
(end_stream_token - start_stream_token).microseconds
return InitialAnswerUpdate(
initial_answer=None,
answer_error=AgentErrorLog(
error_message=agent_error.error_message or "An LLM error occurred",
error_type=agent_error.error_type,
error_result=agent_error.error_result,
),
initial_agent_stats=None,
generated_sub_questions=sub_questions,
agent_base_end_time=None,
agent_base_metrics=None,
log_messages=[
get_langgraph_node_log_string(
graph_component="initial - generate initial answer",
node_name="generate initial answer",
node_start_time=node_start_time,
result=agent_error.error_result or "An LLM error occurred",
)
],
)
streamed_tokens.append(content)
logger.debug(
f"Average dispatch time for initial answer: {sum(dispatch_timings) / len(dispatch_timings)}"

View File

@ -10,8 +10,10 @@ from onyx.agents.agent_search.deep_search.main.states import (
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.utils.timing import log_function_time
@log_function_time(print_only=True)
def validate_initial_answer(
state: SubQuestionRetrievalState,
) -> InitialAnswerQualityUpdate:
@ -25,7 +27,7 @@ def validate_initial_answer(
f"--------{node_start_time}--------Checking for base answer validity - for not set True/False manually"
)
verdict = True
verdict = True # not actually required as already streamed out. Refinement will do similar
return InitialAnswerQualityUpdate(
initial_answer_quality_eval=verdict,

View File

@ -23,6 +23,8 @@ from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
build_history_prompt,
)
from onyx.agents.agent_search.shared_graph_utils.models import BaseMessage_Content
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
@ -33,17 +35,30 @@ from onyx.chat.models import StreamStopReason
from onyx.chat.models import StreamType
from onyx.chat.models import SubQuestionPiece
from onyx.configs.agent_configs import AGENT_NUM_DOCS_FOR_DECOMPOSITION
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION,
)
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH,
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH_ASSUMING_REFINEMENT,
)
from onyx.prompts.agent_search import (
INITIAL_QUESTION_DECOMPOSITION_PROMPT,
INITIAL_QUESTION_DECOMPOSITION_PROMPT_ASSUMING_REFINEMENT,
)
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="LLM Timeout Error. Sub-questions could not be generated.",
rate_limit="LLM Rate Limit Error. Sub-questions could not be generated.",
general_error="General LLM Error. Sub-questions could not be generated.",
)
@log_function_time(print_only=True)
def decompose_orig_question(
state: SubQuestionRetrievalState,
config: RunnableConfig,
@ -85,15 +100,15 @@ def decompose_orig_question(
]
)
decomposition_prompt = (
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH.format(
question=question, sample_doc_str=sample_doc_str, history=history
)
decomposition_prompt = INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH_ASSUMING_REFINEMENT.format(
question=question, sample_doc_str=sample_doc_str, history=history
)
else:
decomposition_prompt = INITIAL_QUESTION_DECOMPOSITION_PROMPT.format(
question=question, history=history
decomposition_prompt = (
INITIAL_QUESTION_DECOMPOSITION_PROMPT_ASSUMING_REFINEMENT.format(
question=question, history=history
)
)
# Start decomposition
@ -112,32 +127,42 @@ def decompose_orig_question(
)
# dispatches custom events for subquestion tokens, adding in subquestion ids.
streamed_tokens = dispatch_separated(
model.stream(msg),
dispatch_subquestion(0, writer),
sep_callback=dispatch_subquestion_sep(0, writer),
)
stop_event = StreamStopInfo(
stop_reason=StreamStopReason.FINISHED,
stream_type=StreamType.SUB_QUESTIONS,
level=0,
)
write_custom_event("stream_finished", stop_event, writer)
streamed_tokens: list[BaseMessage_Content] = []
deomposition_response = merge_content(*streamed_tokens)
try:
streamed_tokens = dispatch_separated(
model.stream(
msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION,
),
dispatch_subquestion(0, writer),
sep_callback=dispatch_subquestion_sep(0, writer),
)
# this call should only return strings. Commenting out for efficiency
# assert [type(tok) == str for tok in streamed_tokens]
decomposition_response = merge_content(*streamed_tokens)
# use no-op cast() instead of str() which runs code
# list_of_subquestions = clean_and_parse_list_string(cast(str, response))
list_of_subqs = cast(str, deomposition_response).split("\n")
list_of_subqs = cast(str, decomposition_response).split("\n")
decomp_list: list[str] = [sq.strip() for sq in list_of_subqs if sq.strip() != ""]
initial_sub_questions = [sq.strip() for sq in list_of_subqs if sq.strip() != ""]
log_result = f"decomposed original question into {len(initial_sub_questions)} subquestions"
stop_event = StreamStopInfo(
stop_reason=StreamStopReason.FINISHED,
stream_type=StreamType.SUB_QUESTIONS,
level=0,
)
write_custom_event("stream_finished", stop_event, writer)
except LLMTimeoutError as e:
logger.error("LLM Timeout Error - decompose orig question")
raise e # fail loudly on this critical step
except LLMRateLimitError as e:
logger.error("LLM Rate Limit Error - decompose orig question")
raise e
return InitialQuestionDecompositionUpdate(
initial_sub_questions=decomp_list,
initial_sub_questions=initial_sub_questions,
agent_start_time=agent_start_time,
agent_refined_start_time=None,
agent_refined_end_time=None,
@ -151,7 +176,7 @@ def decompose_orig_question(
graph_component="initial - generate sub answers",
node_name="decompose original question",
node_start_time=node_start_time,
result=f"decomposed original question into {len(decomp_list)} subquestions",
result=log_result,
)
],
)

View File

@ -26,8 +26,8 @@ from onyx.agents.agent_search.deep_search.main.nodes.decide_refinement_need impo
from onyx.agents.agent_search.deep_search.main.nodes.extract_entities_terms import (
extract_entities_terms,
)
from onyx.agents.agent_search.deep_search.main.nodes.generate_refined_answer import (
generate_refined_answer,
from onyx.agents.agent_search.deep_search.main.nodes.generate_validate_refined_answer import (
generate_validate_refined_answer,
)
from onyx.agents.agent_search.deep_search.main.nodes.ingest_refined_sub_answers import (
ingest_refined_sub_answers,
@ -126,8 +126,8 @@ def main_graph_builder(test_mode: bool = False) -> StateGraph:
# Node to generate the refined answer
graph.add_node(
node="generate_refined_answer",
action=generate_refined_answer,
node="generate_validate_refined_answer",
action=generate_validate_refined_answer,
)
# Early node to extract the entities and terms from the initial answer,
@ -215,11 +215,11 @@ def main_graph_builder(test_mode: bool = False) -> StateGraph:
graph.add_edge(
start_key="ingest_refined_sub_answers",
end_key="generate_refined_answer",
end_key="generate_validate_refined_answer",
)
graph.add_edge(
start_key="generate_refined_answer",
start_key="generate_validate_refined_answer",
end_key="compare_answers",
)
graph.add_edge(
@ -252,9 +252,7 @@ if __name__ == "__main__":
db_session, primary_llm, fast_llm, search_request
)
inputs = MainInput(
base_question=graph_config.inputs.search_request.query, log_messages=[]
)
inputs = MainInput(log_messages=[])
for thing in compiled_graph.stream(
input=inputs,

View File

@ -1,6 +1,7 @@
from datetime import datetime
from typing import cast
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph.types import StreamWriter
@ -10,16 +11,51 @@ from onyx.agents.agent_search.deep_search.main.states import (
)
from onyx.agents.agent_search.deep_search.main.states import MainState
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
binary_string_test,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_POSITIVE_VALUE_STR,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import RefinedAnswerImprovement
from onyx.configs.agent_configs import AGENT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
INITIAL_REFINED_ANSWER_COMPARISON_PROMPT,
)
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="The LLM timed out, and the answers could not be compared.",
rate_limit="The LLM encountered a rate limit, and the answers could not be compared.",
general_error="The LLM encountered an error, and the answers could not be compared.",
)
_ANSWER_QUALITY_NOT_SUFFICIENT_MESSAGE = (
"Answer quality is not sufficient, so stay with the initial answer."
)
@log_function_time(print_only=True)
def compare_answers(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> InitialRefinedAnswerComparisonUpdate:
@ -34,21 +70,75 @@ def compare_answers(
initial_answer = state.initial_answer
refined_answer = state.refined_answer
# if answer quality is not sufficient, then stay with the initial answer
if not state.refined_answer_quality:
write_custom_event(
"refined_answer_improvement",
RefinedAnswerImprovement(
refined_answer_improvement=False,
),
writer,
)
return InitialRefinedAnswerComparisonUpdate(
refined_answer_improvement_eval=False,
log_messages=[
get_langgraph_node_log_string(
graph_component="main",
node_name="compare answers",
node_start_time=node_start_time,
result=_ANSWER_QUALITY_NOT_SUFFICIENT_MESSAGE,
)
],
)
compare_answers_prompt = INITIAL_REFINED_ANSWER_COMPARISON_PROMPT.format(
question=question, initial_answer=initial_answer, refined_answer=refined_answer
)
msg = [HumanMessage(content=compare_answers_prompt)]
agent_error: AgentErrorLog | None = None
# Get the rewritten queries in a defined format
model = graph_config.tooling.fast_llm
resp: BaseMessage | None = None
refined_answer_improvement: bool | None = None
# no need to stream this
resp = model.invoke(msg)
try:
resp = model.invoke(
msg, timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS
)
refined_answer_improvement = (
isinstance(resp.content, str) and "yes" in resp.content.lower()
)
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - compare answers")
# continue as True in this support step
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - compare answers")
# continue as True in this support step
if agent_error or resp is None:
refined_answer_improvement = True
if agent_error:
log_result = agent_error.error_result
else:
log_result = "An answer could not be generated."
else:
refined_answer_improvement = binary_string_test(
text=cast(str, resp.content),
positive_value=AGENT_POSITIVE_VALUE_STR,
)
log_result = f"Answer comparison: {refined_answer_improvement}"
write_custom_event(
"refined_answer_improvement",
@ -65,7 +155,7 @@ def compare_answers(
graph_component="main",
node_name="compare answers",
node_start_time=node_start_time,
result=f"Answer comparison: {refined_answer_improvement}",
result=log_result,
)
],
)

View File

@ -21,6 +21,18 @@ from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
build_history_prompt,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import BaseMessage_Content
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
format_entity_term_extraction,
@ -30,12 +42,31 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
)
from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import StreamingError
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION,
)
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
REFINEMENT_QUESTION_DECOMPOSITION_PROMPT,
REFINEMENT_QUESTION_DECOMPOSITION_PROMPT_W_INITIAL_SUBQUESTION_ANSWERS,
)
from onyx.tools.models import ToolCallKickoff
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_ANSWERED_SUBQUESTIONS_DIVIDER = "\n\n---\n\n"
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="The LLM timed out. The sub-questions could not be generated.",
rate_limit="The LLM encountered a rate limit. The sub-questions could not be generated.",
general_error="The LLM encountered an error. The sub-questions could not be generated.",
)
@log_function_time(print_only=True)
def create_refined_sub_questions(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> RefinedQuestionDecompositionUpdate:
@ -72,8 +103,10 @@ def create_refined_sub_questions(
initial_question_answers = state.sub_question_results
addressed_question_list = [
x.question for x in initial_question_answers if x.verified_high_quality
addressed_subquestions_with_answers = [
f"Subquestion: {x.question}\nSubanswer:\n{x.answer}"
for x in initial_question_answers
if x.verified_high_quality and x.answer
]
failed_question_list = [
@ -82,12 +115,14 @@ def create_refined_sub_questions(
msg = [
HumanMessage(
content=REFINEMENT_QUESTION_DECOMPOSITION_PROMPT.format(
content=REFINEMENT_QUESTION_DECOMPOSITION_PROMPT_W_INITIAL_SUBQUESTION_ANSWERS.format(
question=question,
history=history,
entity_term_extraction_str=entity_term_extraction_str,
base_answer=base_answer,
answered_sub_questions="\n - ".join(addressed_question_list),
answered_subquestions_with_answers=_ANSWERED_SUBQUESTIONS_DIVIDER.join(
addressed_subquestions_with_answers
),
failed_sub_questions="\n - ".join(failed_question_list),
),
)
@ -96,29 +131,65 @@ def create_refined_sub_questions(
# Grader
model = graph_config.tooling.fast_llm
streamed_tokens = dispatch_separated(
model.stream(msg),
dispatch_subquestion(1, writer),
sep_callback=dispatch_subquestion_sep(1, writer),
)
response = merge_content(*streamed_tokens)
agent_error: AgentErrorLog | None = None
streamed_tokens: list[BaseMessage_Content] = []
try:
streamed_tokens = dispatch_separated(
model.stream(
msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION,
),
dispatch_subquestion(1, writer),
sep_callback=dispatch_subquestion_sep(1, writer),
)
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - create refined sub questions")
if isinstance(response, str):
parsed_response = [q for q in response.split("\n") if q.strip() != ""]
else:
raise ValueError("LLM response is not a string")
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - create refined sub questions")
refined_sub_question_dict = {}
for sub_question_num, sub_question in enumerate(parsed_response):
refined_sub_question = RefinementSubQuestion(
sub_question=sub_question,
sub_question_id=make_question_id(1, sub_question_num + 1),
verified=False,
answered=False,
answer="",
if agent_error:
refined_sub_question_dict: dict[int, RefinementSubQuestion] = {}
log_result = agent_error.error_result
write_custom_event(
"refined_sub_question_creation_error",
StreamingError(
error="Your LLM was not able to create refined sub questions in time and timed out. Please try again.",
),
writer,
)
refined_sub_question_dict[sub_question_num + 1] = refined_sub_question
else:
response = merge_content(*streamed_tokens)
if isinstance(response, str):
parsed_response = [q for q in response.split("\n") if q.strip() != ""]
else:
raise ValueError("LLM response is not a string")
refined_sub_question_dict = {}
for sub_question_num, sub_question in enumerate(parsed_response):
refined_sub_question = RefinementSubQuestion(
sub_question=sub_question,
sub_question_id=make_question_id(1, sub_question_num + 1),
verified=False,
answered=False,
answer="",
)
refined_sub_question_dict[sub_question_num + 1] = refined_sub_question
log_result = f"Created {len(refined_sub_question_dict)} refined sub questions"
return RefinedQuestionDecompositionUpdate(
refined_sub_questions=refined_sub_question_dict,
@ -128,7 +199,7 @@ def create_refined_sub_questions(
graph_component="main",
node_name="create refined sub questions",
node_start_time=node_start_time,
result=f"Created {len(refined_sub_question_dict)} refined sub questions",
result=log_result,
)
],
)

View File

@ -11,8 +11,10 @@ from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.utils.timing import log_function_time
@log_function_time(print_only=True)
def decide_refinement_need(
state: MainState, config: RunnableConfig
) -> RequireRefinemenEvalUpdate:
@ -26,6 +28,19 @@ def decide_refinement_need(
decision = True # TODO: just for current testing purposes
if state.answer_error:
return RequireRefinemenEvalUpdate(
require_refined_answer_eval=False,
log_messages=[
get_langgraph_node_log_string(
graph_component="main",
node_name="decide refinement need",
node_start_time=node_start_time,
result="Timeout Error",
)
],
)
log_messages = [
get_langgraph_node_log_string(
graph_component="main",

View File

@ -21,11 +21,16 @@ from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION,
)
from onyx.configs.constants import NUM_EXPLORATORY_DOCS
from onyx.prompts.agent_search import ENTITY_TERM_EXTRACTION_PROMPT
from onyx.prompts.agent_search import ENTITY_TERM_EXTRACTION_PROMPT_JSON_EXAMPLE
from onyx.utils.timing import log_function_time
@log_function_time(print_only=True)
def extract_entities_terms(
state: MainState, config: RunnableConfig
) -> EntityTermExtractionUpdate:
@ -81,6 +86,7 @@ def extract_entities_terms(
# Grader
llm_response = fast_llm.invoke(
prompt=msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION,
)
cleaned_response = (

View File

@ -11,27 +11,49 @@ from onyx.agents.agent_search.deep_search.main.models import (
AgentRefinedMetrics,
)
from onyx.agents.agent_search.deep_search.main.operations import get_query_info
from onyx.agents.agent_search.deep_search.main.operations import logger
from onyx.agents.agent_search.deep_search.main.states import MainState
from onyx.agents.agent_search.deep_search.main.states import (
RefinedAnswerUpdate,
)
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
binary_string_test_after_answer_separator,
)
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
get_prompt_enrichment_components,
)
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
trim_prompt_piece,
)
from onyx.agents.agent_search.shared_graph_utils.models import InferenceSection
from onyx.agents.agent_search.shared_graph_utils.calculations import (
get_answer_generation_documents,
)
from onyx.agents.agent_search.shared_graph_utils.constants import AGENT_ANSWER_SEPARATOR
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_POSITIVE_VALUE_STR,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.models import RefinedAgentStats
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_sections,
dedup_inference_section_list,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
dispatch_main_answer_stop_info,
)
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_deduplicated_structured_subquestion_documents,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
@ -43,26 +65,50 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import ExtendedToolResponse
from onyx.chat.models import StreamingError
from onyx.configs.agent_configs import AGENT_MAX_ANSWER_CONTEXT_DOCS
from onyx.configs.agent_configs import AGENT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER
from onyx.configs.agent_configs import AGENT_MIN_ORIG_QUESTION_DOCS
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION,
)
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION,
)
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
REFINED_ANSWER_PROMPT_W_SUB_QUESTIONS,
)
from onyx.prompts.agent_search import (
REFINED_ANSWER_PROMPT_WO_SUB_QUESTIONS,
)
from onyx.prompts.agent_search import (
REFINED_ANSWER_VALIDATION_PROMPT,
)
from onyx.prompts.agent_search import (
SUB_QUESTION_ANSWER_TEMPLATE_REFINED,
)
from onyx.prompts.agent_search import UNKNOWN_ANSWER
from onyx.tools.tool_implementations.search.search_tool import yield_search_responses
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="The LLM timed out. The refined answer could not be generated.",
rate_limit="The LLM encountered a rate limit. The refined answer could not be generated.",
general_error="The LLM encountered an error. The refined answer could not be generated.",
)
def generate_refined_answer(
@log_function_time(print_only=True)
def generate_validate_refined_answer(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> RefinedAnswerUpdate:
"""
LangGraph node to generate the refined answer.
LangGraph node to generate the refined answer and validate it.
"""
node_start_time = datetime.now()
@ -76,19 +122,24 @@ def generate_refined_answer(
)
verified_reranked_documents = state.verified_reranked_documents
sub_questions_cited_documents = state.cited_documents
# get all documents cited in sub-questions
structured_subquestion_docs = get_deduplicated_structured_subquestion_documents(
state.sub_question_results
)
original_question_verified_documents = (
state.orig_question_verified_reranked_documents
)
original_question_retrieved_documents = state.orig_question_retrieved_documents
consolidated_context_docs: list[InferenceSection] = sub_questions_cited_documents
consolidated_context_docs = structured_subquestion_docs.cited_documents
counter = 0
for original_doc_number, original_doc in enumerate(
original_question_verified_documents
):
if original_doc_number not in sub_questions_cited_documents:
if original_doc_number not in structured_subquestion_docs.cited_documents:
if (
counter <= AGENT_MIN_ORIG_QUESTION_DOCS
or len(consolidated_context_docs)
@ -99,14 +150,16 @@ def generate_refined_answer(
counter += 1
# sort docs by their scores - though the scores refer to different questions
relevant_docs = dedup_inference_sections(
consolidated_context_docs, consolidated_context_docs
)
relevant_docs = dedup_inference_section_list(consolidated_context_docs)
streaming_docs = (
relevant_docs
if len(relevant_docs) > 0
else original_question_retrieved_documents[:15]
# Create the list of documents to stream out. Start with the
# ones that wil be in the context (or, if len == 0, use docs
# that were retrieved for the original question)
answer_generation_documents = get_answer_generation_documents(
relevant_docs=relevant_docs,
context_documents=structured_subquestion_docs.context_documents,
original_question_docs=original_question_retrieved_documents,
max_docs=AGENT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER,
)
query_info = get_query_info(state.orig_question_sub_query_retrieval_results)
@ -114,11 +167,13 @@ def generate_refined_answer(
graph_config.tooling.search_tool
), "search_tool must be provided for agentic search"
# stream refined answer docs, or original question docs if no relevant docs are found
relevance_list = relevance_from_docs(relevant_docs)
relevance_list = relevance_from_docs(
answer_generation_documents.streaming_documents
)
for tool_response in yield_search_responses(
query=question,
reranked_sections=streaming_docs,
final_context_sections=streaming_docs,
reranked_sections=answer_generation_documents.streaming_documents,
final_context_sections=answer_generation_documents.context_documents,
search_query_info=query_info,
get_section_relevance=lambda: relevance_list,
search_tool=graph_config.tooling.search_tool,
@ -199,7 +254,7 @@ def generate_refined_answer(
)
model = graph_config.tooling.fast_llm
relevant_docs_str = format_docs(relevant_docs)
relevant_docs_str = format_docs(answer_generation_documents.context_documents)
relevant_docs_str = trim_prompt_piece(
model.config,
relevant_docs_str,
@ -231,28 +286,80 @@ def generate_refined_answer(
streamed_tokens: list[str | list[str | dict[str, Any]]] = [""]
dispatch_timings: list[float] = []
for message in model.stream(msg):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
)
agent_error: AgentErrorLog | None = None
start_stream_token = datetime.now()
try:
for message in model.stream(
msg, timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION
):
# TODO: in principle, the answer here COULD contain images, but we don't support that yet
content = message.content
if not isinstance(content, str):
raise ValueError(
f"Expected content to be a string, but got {type(content)}"
)
start_stream_token = datetime.now()
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=content,
level=1,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
end_stream_token = datetime.now()
dispatch_timings.append(
(end_stream_token - start_stream_token).microseconds
)
streamed_tokens.append(content)
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - generate refined answer")
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - generate refined answer")
if agent_error:
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=content,
level=1,
level_question_num=0,
answer_type="agent_level_answer",
"initial_agent_answer",
StreamingError(
error=AGENT_LLM_TIMEOUT_MESSAGE,
),
writer,
)
end_stream_token = datetime.now()
dispatch_timings.append((end_stream_token - start_stream_token).microseconds)
streamed_tokens.append(content)
return RefinedAnswerUpdate(
refined_answer=None,
refined_answer_quality=False, # TODO: replace this with the actual check value
refined_agent_stats=None,
agent_refined_end_time=None,
agent_refined_metrics=AgentRefinedMetrics(
refined_doc_boost_factor=0.0,
refined_question_boost_factor=0.0,
duration_s=None,
),
log_messages=[
get_langgraph_node_log_string(
graph_component="main",
node_name="generate refined answer",
node_start_time=node_start_time,
result=agent_error.error_result or "An LLM error occurred",
)
],
)
logger.debug(
f"Average dispatch time for refined answer: {sum(dispatch_timings) / len(dispatch_timings)}"
@ -261,54 +368,43 @@ def generate_refined_answer(
response = merge_content(*streamed_tokens)
answer = cast(str, response)
# run a validation step for the refined answer only
msg = [
HumanMessage(
content=REFINED_ANSWER_VALIDATION_PROMPT.format(
question=question,
history=prompt_enrichment_components.history,
answered_sub_questions=sub_question_answer_str,
relevant_docs=relevant_docs_str,
proposed_answer=answer,
persona_specification=persona_contextualized_prompt,
)
)
]
try:
validation_response = model.invoke(
msg, timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION
)
refined_answer_quality = binary_string_test_after_answer_separator(
text=cast(str, validation_response.content),
positive_value=AGENT_POSITIVE_VALUE_STR,
separator=AGENT_ANSWER_SEPARATOR,
)
except LLMTimeoutError:
refined_answer_quality = True
logger.error("LLM Timeout Error - validate refined answer")
except LLMRateLimitError:
refined_answer_quality = True
logger.error("LLM Rate Limit Error - validate refined answer")
refined_agent_stats = RefinedAgentStats(
revision_doc_efficiency=refined_doc_effectiveness,
revision_question_efficiency=revision_question_efficiency,
)
logger.debug(f"\n\n---INITIAL ANSWER ---\n\n Answer:\n Agent: {initial_answer}")
logger.debug("-" * 10)
logger.debug(f"\n\n---REVISED AGENT ANSWER ---\n\n Answer:\n Agent: {answer}")
logger.debug("-" * 100)
if state.initial_agent_stats:
initial_doc_boost_factor = state.initial_agent_stats.agent_effectiveness.get(
"utilized_chunk_ratio", "--"
)
initial_support_boost_factor = (
state.initial_agent_stats.agent_effectiveness.get("support_ratio", "--")
)
num_initial_verified_docs = state.initial_agent_stats.original_question.get(
"num_verified_documents", "--"
)
initial_verified_docs_avg_score = (
state.initial_agent_stats.original_question.get("verified_avg_score", "--")
)
initial_sub_questions_verified_docs = (
state.initial_agent_stats.sub_questions.get("num_verified_documents", "--")
)
logger.debug("INITIAL AGENT STATS")
logger.debug(f"Document Boost Factor: {initial_doc_boost_factor}")
logger.debug(f"Support Boost Factor: {initial_support_boost_factor}")
logger.debug(f"Originally Verified Docs: {num_initial_verified_docs}")
logger.debug(
f"Originally Verified Docs Avg Score: {initial_verified_docs_avg_score}"
)
logger.debug(
f"Sub-Questions Verified Docs: {initial_sub_questions_verified_docs}"
)
if refined_agent_stats:
logger.debug("-" * 10)
logger.debug("REFINED AGENT STATS")
logger.debug(
f"Revision Doc Factor: {refined_agent_stats.revision_doc_efficiency}"
)
logger.debug(
f"Revision Question Factor: {refined_agent_stats.revision_question_efficiency}"
)
agent_refined_end_time = datetime.now()
if state.agent_refined_start_time:
agent_refined_duration = (
@ -325,7 +421,7 @@ def generate_refined_answer(
return RefinedAnswerUpdate(
refined_answer=answer,
refined_answer_quality=True, # TODO: replace this with the actual check value
refined_answer_quality=refined_answer_quality,
refined_agent_stats=refined_agent_stats,
agent_refined_end_time=agent_refined_end_time,
agent_refined_metrics=agent_refined_metrics,

View File

@ -17,6 +17,7 @@ from onyx.agents.agent_search.orchestration.states import ToolCallUpdate
from onyx.agents.agent_search.orchestration.states import ToolChoiceInput
from onyx.agents.agent_search.orchestration.states import ToolChoiceUpdate
from onyx.agents.agent_search.shared_graph_utils.models import AgentChunkRetrievalStats
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import (
EntityRelationshipTermExtraction,
)
@ -76,6 +77,7 @@ class InitialAnswerUpdate(LoggerUpdate):
"""
initial_answer: str | None = None
answer_error: AgentErrorLog | None = None
initial_agent_stats: InitialAgentResultStats | None = None
generated_sub_questions: list[str] = []
agent_base_end_time: datetime | None = None
@ -88,6 +90,7 @@ class RefinedAnswerUpdate(RefinedAgentEndStats, LoggerUpdate):
"""
refined_answer: str | None = None
answer_error: AgentErrorLog | None = None
refined_agent_stats: RefinedAgentStats | None = None
refined_answer_quality: bool = False

View File

@ -16,16 +16,44 @@ from onyx.agents.agent_search.deep_search.shared.expanded_retrieval.states impor
QueryExpansionUpdate,
)
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_RATELIMIT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_LLM_TIMEOUT_MESSAGE,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AgentLLMErrorType,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentErrorLog
from onyx.agents.agent_search.shared_graph_utils.models import BaseMessage_Content
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import parse_question_id
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION,
)
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
QUERY_REWRITING_PROMPT,
)
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="Query rewriting failed due to LLM timeout - the original question will be used.",
rate_limit="Query rewriting failed due to LLM rate limit - the original question will be used.",
general_error="Query rewriting failed due to LLM error - the original question will be used.",
)
@log_function_time(print_only=True)
def expand_queries(
state: ExpandedRetrievalInput,
config: RunnableConfig,
@ -54,13 +82,43 @@ def expand_queries(
)
]
llm_response_list = dispatch_separated(
llm.stream(prompt=msg), dispatch_subquery(level, question_num, writer)
)
agent_error: AgentErrorLog | None = None
llm_response_list: list[BaseMessage_Content] = []
llm_response = ""
rewritten_queries = []
llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content
try:
llm_response_list = dispatch_separated(
llm.stream(
prompt=msg,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION,
),
dispatch_subquery(level, question_num, writer),
)
llm_response = merge_message_runs(llm_response_list, chunk_separator="")[
0
].content
rewritten_queries = llm_response.split("\n")
log_result = f"Number of expanded queries: {len(rewritten_queries)}"
rewritten_queries = llm_response.split("\n")
except LLMTimeoutError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.TIMEOUT,
error_message=AGENT_LLM_TIMEOUT_MESSAGE,
error_result=_llm_node_error_strings.timeout,
)
logger.error("LLM Timeout Error - expand queries")
log_result = agent_error.error_result
except LLMRateLimitError:
agent_error = AgentErrorLog(
error_type=AgentLLMErrorType.RATE_LIMIT,
error_message=AGENT_LLM_RATELIMIT_MESSAGE,
error_result=_llm_node_error_strings.rate_limit,
)
logger.error("LLM Rate Limit Error - expand queries")
log_result = agent_error.error_result
# use subquestion as query if query generation fails
return QueryExpansionUpdate(
expanded_queries=rewritten_queries,
@ -69,7 +127,7 @@ def expand_queries(
graph_component="shared - expanded retrieval",
node_name="expand queries",
node_start_time=node_start_time,
result=f"Number of expanded queries: {len(rewritten_queries)}",
result=log_result,
)
],
)

View File

@ -26,8 +26,10 @@ from onyx.context.search.postprocessing.postprocessing import rerank_sections
from onyx.context.search.postprocessing.postprocessing import should_rerank
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
from onyx.utils.timing import log_function_time
@log_function_time(print_only=True)
def rerank_documents(
state: ExpandedRetrievalState, config: RunnableConfig
) -> DocRerankingUpdate:

View File

@ -28,8 +28,10 @@ from onyx.tools.tool_implementations.search.search_tool import (
SEARCH_RESPONSE_SUMMARY_ID,
)
from onyx.tools.tool_implementations.search.search_tool import SearchResponseSummary
from onyx.utils.timing import log_function_time
@log_function_time(print_only=True)
def retrieve_documents(
state: RetrievalInput, config: RunnableConfig
) -> DocRetrievalUpdate:

View File

@ -1,5 +1,7 @@
from datetime import datetime
from typing import cast
from langchain_core.messages import BaseMessage
from langchain_core.messages import HumanMessage
from langchain_core.runnables.config import RunnableConfig
@ -10,14 +12,38 @@ from onyx.agents.agent_search.deep_search.shared.expanded_retrieval.states impor
DocVerificationUpdate,
)
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
binary_string_test,
)
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
trim_prompt_piece,
)
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_POSITIVE_VALUE_STR,
)
from onyx.agents.agent_search.shared_graph_utils.models import LLMNodeErrorStrings
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.configs.agent_configs import AGENT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.prompts.agent_search import (
DOCUMENT_VERIFICATION_PROMPT,
)
from onyx.utils.logger import setup_logger
from onyx.utils.timing import log_function_time
logger = setup_logger()
_llm_node_error_strings = LLMNodeErrorStrings(
timeout="The LLM timed out. The document could not be verified. The document will be treated as 'relevant'",
rate_limit="The LLM encountered a rate limit. The document could not be verified. The document will be treated as 'relevant'",
general_error="The LLM encountered an error. The document could not be verified. The document will be treated as 'relevant'",
)
@log_function_time(print_only=True)
def verify_documents(
state: DocVerificationInput, config: RunnableConfig
) -> DocVerificationUpdate:
@ -26,12 +52,14 @@ def verify_documents(
Args:
state (DocVerificationInput): The current state
config (RunnableConfig): Configuration containing ProSearchConfig
config (RunnableConfig): Configuration containing AgentSearchConfig
Updates:
verified_documents: list[InferenceSection]
"""
node_start_time = datetime.now()
question = state.question
retrieved_document_to_verify = state.retrieved_document_to_verify
document_content = retrieved_document_to_verify.combined_content
@ -51,12 +79,40 @@ def verify_documents(
)
]
response = fast_llm.invoke(msg)
response: BaseMessage | None = None
verified_documents = []
if isinstance(response.content, str) and "yes" in response.content.lower():
verified_documents.append(retrieved_document_to_verify)
verified_documents = [
retrieved_document_to_verify
] # default is to treat document as relevant
try:
response = fast_llm.invoke(
msg, timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION
)
assert isinstance(response.content, str)
if not binary_string_test(
text=response.content, positive_value=AGENT_POSITIVE_VALUE_STR
):
verified_documents = []
except LLMTimeoutError:
# In this case, we decide to continue and don't raise an error, as
# little harm in letting some docs through that are less relevant.
logger.error("LLM Timeout Error - verify documents")
except LLMRateLimitError:
# In this case, we decide to continue and don't raise an error, as
# little harm in letting some docs through that are less relevant.
logger.error("LLM Rate Limit Error - verify documents")
return DocVerificationUpdate(
verified_documents=verified_documents,
log_messages=[
get_langgraph_node_log_string(
graph_component="shared - expanded retrieval",
node_name="verify documents",
node_start_time=node_start_time,
)
],
)

View File

@ -21,9 +21,13 @@ from onyx.context.search.models import InferenceSection
class ExpandedRetrievalInput(SubgraphCoreState):
question: str = ""
base_search: bool = False
# exception from 'no default value'for LangGraph input states
# Here, sub_question_id default None implies usage for the
# original question. This is sometimes needed for nested sub-graphs
sub_question_id: str | None = None
question: str
base_search: bool
## Update/Return States
@ -34,7 +38,7 @@ class QueryExpansionUpdate(LoggerUpdate, BaseModel):
log_messages: list[str] = []
class DocVerificationUpdate(BaseModel):
class DocVerificationUpdate(LoggerUpdate, BaseModel):
verified_documents: Annotated[list[InferenceSection], dedup_inference_sections] = []
@ -88,4 +92,4 @@ class DocVerificationInput(ExpandedRetrievalInput):
class RetrievalInput(ExpandedRetrievalInput):
query_to_retrieve: str = ""
query_to_retrieve: str

View File

@ -12,7 +12,7 @@ from onyx.agents.agent_search.deep_search.main.graph_builder import (
main_graph_builder as main_graph_builder_a,
)
from onyx.agents.agent_search.deep_search.main.states import (
MainInput as MainInput_a,
MainInput as MainInput,
)
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
@ -21,6 +21,7 @@ from onyx.chat.models import AnswerPacket
from onyx.chat.models import AnswerStream
from onyx.chat.models import ExtendedToolResponse
from onyx.chat.models import RefinedAnswerImprovement
from onyx.chat.models import StreamingError
from onyx.chat.models import StreamStopInfo
from onyx.chat.models import SubQueryPiece
from onyx.chat.models import SubQuestionPiece
@ -33,6 +34,7 @@ from onyx.llm.factory import get_default_llms
from onyx.tools.tool_runner import ToolCallKickoff
from onyx.utils.logger import setup_logger
logger = setup_logger()
_COMPILED_GRAPH: CompiledStateGraph | None = None
@ -72,13 +74,15 @@ def _parse_agent_event(
return cast(AnswerPacket, event["data"])
elif event["name"] == "refined_answer_improvement":
return cast(RefinedAnswerImprovement, event["data"])
elif event["name"] == "refined_sub_question_creation_error":
return cast(StreamingError, event["data"])
return None
def manage_sync_streaming(
compiled_graph: CompiledStateGraph,
config: GraphConfig,
graph_input: BasicInput | MainInput_a,
graph_input: BasicInput | MainInput,
) -> Iterable[StreamEvent]:
message_id = config.persistence.message_id if config.persistence else None
for event in compiled_graph.stream(
@ -92,7 +96,7 @@ def manage_sync_streaming(
def run_graph(
compiled_graph: CompiledStateGraph,
config: GraphConfig,
input: BasicInput | MainInput_a,
input: BasicInput | MainInput,
) -> AnswerStream:
config.behavior.perform_initial_search_decomposition = (
INITIAL_SEARCH_DECOMPOSITION_ENABLED
@ -123,9 +127,7 @@ def run_main_graph(
) -> AnswerStream:
compiled_graph = load_compiled_graph()
input = MainInput_a(
base_question=config.inputs.search_request.query, log_messages=[]
)
input = MainInput(log_messages=[])
# Agent search is not a Tool per se, but this is helpful for the frontend
yield ToolCallKickoff(
@ -172,9 +174,7 @@ if __name__ == "__main__":
# search_request.persona = get_persona_by_id(1, None, db_session)
# config.perform_initial_search_path_decision = False
config.behavior.perform_initial_search_decomposition = True
input = MainInput_a(
base_question=config.inputs.search_request.query, log_messages=[]
)
input = MainInput(log_messages=[])
tool_responses: list = []
for output in run_graph(compiled_graph, config, input):

View File

@ -7,6 +7,7 @@ from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.models import (
AgentPromptEnrichmentComponents,
)
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_persona_agent_prompt_expressions,
)
@ -40,13 +41,7 @@ def build_sub_question_answer_prompt(
date_str = build_date_time_string()
# TODO: This should include document metadata and title
docs_format_list = [
f"Document Number: [D{doc_num + 1}]\nContent: {doc.combined_content}\n\n"
for doc_num, doc in enumerate(docs)
]
docs_str = "\n\n".join(docs_format_list)
docs_str = format_docs(docs)
docs_str = trim_prompt_piece(
config,
@ -150,3 +145,38 @@ def get_prompt_enrichment_components(
history=history,
date_str=date_str,
)
def binary_string_test(text: str, positive_value: str = "yes") -> bool:
"""
Tests if a string contains a positive value (case-insensitive).
Args:
text: The string to test
positive_value: The value to look for (defaults to "yes")
Returns:
True if the positive value is found in the text
"""
return positive_value.lower() in text.lower()
def binary_string_test_after_answer_separator(
text: str, positive_value: str = "yes", separator: str = "Answer:"
) -> bool:
"""
Tests if a string contains a positive value (case-insensitive).
Args:
text: The string to test
positive_value: The value to look for (defaults to "yes")
Returns:
True if the positive value is found in the text
"""
if separator not in text:
return False
relevant_text = text.split(f"{separator}")[-1]
return binary_string_test(relevant_text, positive_value)

View File

@ -1,7 +1,11 @@
import numpy as np
from onyx.agents.agent_search.shared_graph_utils.models import AnswerGenerationDocuments
from onyx.agents.agent_search.shared_graph_utils.models import RetrievalFitScoreMetrics
from onyx.agents.agent_search.shared_graph_utils.models import RetrievalFitStats
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_section_list,
)
from onyx.chat.models import SectionRelevancePiece
from onyx.context.search.models import InferenceSection
from onyx.utils.logger import setup_logger
@ -96,3 +100,106 @@ def get_fit_scores(
)
return fit_eval
def get_answer_generation_documents(
relevant_docs: list[InferenceSection],
context_documents: list[InferenceSection],
original_question_docs: list[InferenceSection],
max_docs: int,
) -> AnswerGenerationDocuments:
"""
Create a deduplicated list of documents to stream, prioritizing relevant docs.
Args:
relevant_docs: Primary documents to include
context_documents: Additional context documents to append
original_question_docs: Original question documents to append
max_docs: Maximum number of documents to return
Returns:
List of deduplicated documents, limited to max_docs
"""
# get relevant_doc ids
relevant_doc_ids = [doc.center_chunk.document_id for doc in relevant_docs]
# Start with relevant docs or fallback to original question docs
streaming_documents = relevant_docs.copy()
# Use a set for O(1) lookups of document IDs
seen_doc_ids = {doc.center_chunk.document_id for doc in streaming_documents}
# Combine additional documents to check in one iteration
additional_docs = context_documents + original_question_docs
for doc_idx, doc in enumerate(additional_docs):
doc_id = doc.center_chunk.document_id
if doc_id not in seen_doc_ids:
streaming_documents.append(doc)
seen_doc_ids.add(doc_id)
streaming_documents = dedup_inference_section_list(streaming_documents)
relevant_streaming_docs = [
doc
for doc in streaming_documents
if doc.center_chunk.document_id in relevant_doc_ids
]
relevant_streaming_docs = dedup_sort_inference_section_list(relevant_streaming_docs)
additional_streaming_docs = [
doc
for doc in streaming_documents
if doc.center_chunk.document_id not in relevant_doc_ids
]
additional_streaming_docs = dedup_sort_inference_section_list(
additional_streaming_docs
)
for doc in additional_streaming_docs:
if doc.center_chunk.score:
doc.center_chunk.score += -2.0
else:
doc.center_chunk.score = -2.0
sorted_streaming_documents = relevant_streaming_docs + additional_streaming_docs
return AnswerGenerationDocuments(
streaming_documents=sorted_streaming_documents[:max_docs],
context_documents=relevant_streaming_docs[:max_docs],
)
def dedup_sort_inference_section_list(
sections: list[InferenceSection],
) -> list[InferenceSection]:
"""Deduplicates InferenceSections by document_id and sorts by score.
Args:
sections: List of InferenceSections to deduplicate and sort
Returns:
Deduplicated list of InferenceSections sorted by score in descending order
"""
# dedupe/merge with existing framework
sections = dedup_inference_section_list(sections)
# Use dict to deduplicate by document_id, keeping highest scored version
unique_sections: dict[str, InferenceSection] = {}
for section in sections:
doc_id = section.center_chunk.document_id
if doc_id not in unique_sections:
unique_sections[doc_id] = section
continue
# Keep version with higher score
existing_score = unique_sections[doc_id].center_chunk.score or 0
new_score = section.center_chunk.score or 0
if new_score > existing_score:
unique_sections[doc_id] = section
# Sort by score in descending order, handling None scores
sorted_sections = sorted(
unique_sections.values(), key=lambda x: x.center_chunk.score or 0, reverse=True
)
return sorted_sections

View File

@ -0,0 +1,19 @@
from enum import Enum
AGENT_LLM_TIMEOUT_MESSAGE = "The agent timed out. Please try again."
AGENT_LLM_ERROR_MESSAGE = "The agent encountered an error. Please try again."
AGENT_LLM_RATELIMIT_MESSAGE = (
"The agent encountered a rate limit error. Please try again."
)
LLM_ANSWER_ERROR_MESSAGE = "The question was not answered due to an LLM error."
AGENT_POSITIVE_VALUE_STR = "yes"
AGENT_NEGATIVE_VALUE_STR = "no"
AGENT_ANSWER_SEPARATOR = "Answer:"
class AgentLLMErrorType(str, Enum):
TIMEOUT = "timeout"
RATE_LIMIT = "rate_limit"
GENERAL_ERROR = "general_error"

View File

@ -1,3 +1,5 @@
from typing import Any
from pydantic import BaseModel
from onyx.agents.agent_search.deep_search.main.models import (
@ -56,6 +58,12 @@ class InitialAgentResultStats(BaseModel):
agent_effectiveness: dict[str, float | int | None]
class AgentErrorLog(BaseModel):
error_message: str
error_type: str
error_result: str
class RefinedAgentStats(BaseModel):
revision_doc_efficiency: float | None
revision_question_efficiency: float | None
@ -110,6 +118,11 @@ class SubQuestionAnswerResults(BaseModel):
sub_question_retrieval_stats: AgentChunkRetrievalStats
class StructuredSubquestionDocuments(BaseModel):
cited_documents: list[InferenceSection]
context_documents: list[InferenceSection]
class CombinedAgentMetrics(BaseModel):
timings: AgentTimings
base_metrics: AgentBaseMetrics | None
@ -126,3 +139,17 @@ class AgentPromptEnrichmentComponents(BaseModel):
persona_prompts: PersonaPromptExpressions
history: str
date_str: str
class LLMNodeErrorStrings(BaseModel):
timeout: str = "LLM Timeout Error"
rate_limit: str = "LLM Rate Limit Error"
general_error: str = "General LLM Error"
class AnswerGenerationDocuments(BaseModel):
streaming_documents: list[InferenceSection]
context_documents: list[InferenceSection]
BaseMessage_Content = str | list[str | dict[str, Any]]

View File

@ -12,6 +12,13 @@ def dedup_inference_sections(
return deduped
def dedup_inference_section_list(
list: list[InferenceSection],
) -> list[InferenceSection]:
deduped = _merge_sections(list)
return deduped
def dedup_question_answer_results(
question_answer_results_1: list[SubQuestionAnswerResults],
question_answer_results_2: list[SubQuestionAnswerResults],

View File

@ -20,10 +20,18 @@ from onyx.agents.agent_search.models import GraphInputs
from onyx.agents.agent_search.models import GraphPersistence
from onyx.agents.agent_search.models import GraphSearchConfig
from onyx.agents.agent_search.models import GraphTooling
from onyx.agents.agent_search.shared_graph_utils.models import BaseMessage_Content
from onyx.agents.agent_search.shared_graph_utils.models import (
EntityRelationshipTermExtraction,
)
from onyx.agents.agent_search.shared_graph_utils.models import PersonaPromptExpressions
from onyx.agents.agent_search.shared_graph_utils.models import (
StructuredSubquestionDocuments,
)
from onyx.agents.agent_search.shared_graph_utils.models import SubQuestionAnswerResults
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_section_list,
)
from onyx.chat.models import AnswerPacket
from onyx.chat.models import AnswerStyleConfig
from onyx.chat.models import CitationConfig
@ -34,6 +42,9 @@ from onyx.chat.models import StreamStopInfo
from onyx.chat.models import StreamStopReason
from onyx.chat.models import StreamType
from onyx.chat.prompt_builder.answer_prompt_builder import AnswerPromptBuilder
from onyx.configs.agent_configs import (
AGENT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION,
)
from onyx.configs.chat_configs import CHAT_TARGET_CHUNK_PERCENTAGE
from onyx.configs.chat_configs import MAX_CHUNKS_FED_TO_CHAT
from onyx.configs.constants import DEFAULT_PERSONA_ID
@ -46,6 +57,8 @@ from onyx.context.search.models import SearchRequest
from onyx.db.engine import get_session_context_manager
from onyx.db.persona import get_persona_by_id
from onyx.db.persona import Persona
from onyx.llm.chat_llm import LLMRateLimitError
from onyx.llm.chat_llm import LLMTimeoutError
from onyx.llm.interfaces import LLM
from onyx.prompts.agent_search import (
ASSISTANT_SYSTEM_PROMPT_DEFAULT,
@ -66,8 +79,9 @@ from onyx.tools.tool_implementations.search.search_tool import (
from onyx.tools.tool_implementations.search.search_tool import SearchResponseSummary
from onyx.tools.tool_implementations.search.search_tool import SearchTool
from onyx.tools.utils import explicit_tool_calling_supported
from onyx.utils.logger import setup_logger
BaseMessage_Content = str | list[str | dict[str, Any]]
logger = setup_logger()
# Post-processing
@ -380,8 +394,24 @@ def summarize_history(
)
)
history_response = llm.invoke(history_context_prompt)
try:
history_response = llm.invoke(
history_context_prompt,
timeout_override=AGENT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION,
)
except LLMTimeoutError:
logger.error("LLM Timeout Error - summarize history")
return (
history # this is what is done at this point anyway, so we default to this
)
except LLMRateLimitError:
logger.error("LLM Rate Limit Error - summarize history")
return (
history # this is what is done at this point anyway, so we default to this
)
assert isinstance(history_response.content, str)
return history_response.content
@ -447,3 +477,27 @@ def remove_document_citations(text: str) -> str:
# \d+ - one or more digits
# \] - literal ] character
return re.sub(r"\[(?:D|Q)?\d+\]", "", text)
def get_deduplicated_structured_subquestion_documents(
sub_question_results: list[SubQuestionAnswerResults],
) -> StructuredSubquestionDocuments:
"""
Extract and deduplicate all cited documents from sub-question results.
Args:
sub_question_results: List of sub-question results containing cited documents
Returns:
Deduplicated list of cited documents
"""
cited_docs = [
doc for result in sub_question_results for doc in result.cited_documents
]
context_docs = [
doc for result in sub_question_results for doc in result.context_documents
]
return StructuredSubquestionDocuments(
cited_documents=dedup_inference_section_list(cited_docs),
context_documents=dedup_inference_section_list(context_docs),
)

View File

@ -3,6 +3,24 @@ import os
INITIAL_SEARCH_DECOMPOSITION_ENABLED = True
ALLOW_REFINEMENT = True
AGENT_DEFAULT_RETRIEVAL_HITS = 15
AGENT_DEFAULT_RERANKING_HITS = 10
AGENT_DEFAULT_SUB_QUESTION_MAX_CONTEXT_HITS = 8
AGENT_DEFAULT_NUM_DOCS_FOR_INITIAL_DECOMPOSITION = 3
AGENT_DEFAULT_NUM_DOCS_FOR_REFINED_DECOMPOSITION = 5
AGENT_DEFAULT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER = 25
AGENT_DEFAULT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER = 35
AGENT_DEFAULT_EXPLORATORY_SEARCH_RESULTS = 5
AGENT_DEFAULT_MIN_ORIG_QUESTION_DOCS = 3
AGENT_DEFAULT_MAX_ANSWER_CONTEXT_DOCS = 10
AGENT_DEFAULT_MAX_STATIC_HISTORY_WORD_LENGTH = 2000
INITIAL_SEARCH_DECOMPOSITION_ENABLED = True
ALLOW_REFINEMENT = True
AGENT_DEFAULT_RETRIEVAL_HITS = 15
AGENT_DEFAULT_RERANKING_HITS = 10
AGENT_DEFAULT_SUB_QUESTION_MAX_CONTEXT_HITS = 8
@ -13,9 +31,21 @@ AGENT_DEFAULT_MIN_ORIG_QUESTION_DOCS = 3
AGENT_DEFAULT_MAX_ANSWER_CONTEXT_DOCS = 10
AGENT_DEFAULT_MAX_STATIC_HISTORY_WORD_LENGTH = 2000
#####
# Agent Configs
#####
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_GENERAL_GENERATION = 30 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION = 10 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION = 25 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION = 4 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION = 1 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION = 3 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION = 12 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK = 8 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION = 25 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION = 6 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION = 25 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION = 8 # in seconds
AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS = 8 # in seconds
AGENT_RETRIEVAL_STATS = (
@ -77,4 +107,151 @@ AGENT_MAX_STATIC_HISTORY_WORD_LENGTH = int(
or AGENT_DEFAULT_MAX_STATIC_HISTORY_WORD_LENGTH
) # 2000
AGENT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER = int(
os.environ.get("AGENT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER")
or AGENT_DEFAULT_MAX_STREAMED_DOCS_FOR_INITIAL_ANSWER
) # 25
AGENT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER = int(
os.environ.get("AGENT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER")
or AGENT_DEFAULT_MAX_STREAMED_DOCS_FOR_REFINED_ANSWER
) # 35
AGENT_RETRIEVAL_STATS = (
not os.environ.get("AGENT_RETRIEVAL_STATS") == "False"
) or True # default True
AGENT_MAX_QUERY_RETRIEVAL_RESULTS = int(
os.environ.get("AGENT_MAX_QUERY_RETRIEVAL_RESULTS") or AGENT_DEFAULT_RETRIEVAL_HITS
) # 15
AGENT_MAX_QUERY_RETRIEVAL_RESULTS = int(
os.environ.get("AGENT_MAX_QUERY_RETRIEVAL_RESULTS") or AGENT_DEFAULT_RETRIEVAL_HITS
) # 15
# Reranking agent configs
# Reranking stats - no influence on flow outside of stats collection
AGENT_RERANKING_STATS = (
not os.environ.get("AGENT_RERANKING_STATS") == "True"
) or False # default False
AGENT_MAX_QUERY_RETRIEVAL_RESULTS = int(
os.environ.get("AGENT_MAX_QUERY_RETRIEVAL_RESULTS") or AGENT_DEFAULT_RETRIEVAL_HITS
) # 15
AGENT_RERANKING_MAX_QUERY_RETRIEVAL_RESULTS = int(
os.environ.get("AGENT_RERANKING_MAX_QUERY_RETRIEVAL_RESULTS")
or AGENT_DEFAULT_RERANKING_HITS
) # 10
AGENT_NUM_DOCS_FOR_DECOMPOSITION = int(
os.environ.get("AGENT_NUM_DOCS_FOR_DECOMPOSITION")
or AGENT_DEFAULT_NUM_DOCS_FOR_INITIAL_DECOMPOSITION
) # 3
AGENT_NUM_DOCS_FOR_REFINED_DECOMPOSITION = int(
os.environ.get("AGENT_NUM_DOCS_FOR_REFINED_DECOMPOSITION")
or AGENT_DEFAULT_NUM_DOCS_FOR_REFINED_DECOMPOSITION
) # 5
AGENT_EXPLORATORY_SEARCH_RESULTS = int(
os.environ.get("AGENT_EXPLORATORY_SEARCH_RESULTS")
or AGENT_DEFAULT_EXPLORATORY_SEARCH_RESULTS
) # 5
AGENT_MIN_ORIG_QUESTION_DOCS = int(
os.environ.get("AGENT_MIN_ORIG_QUESTION_DOCS")
or AGENT_DEFAULT_MIN_ORIG_QUESTION_DOCS
) # 3
AGENT_MAX_ANSWER_CONTEXT_DOCS = int(
os.environ.get("AGENT_MAX_ANSWER_CONTEXT_DOCS")
or AGENT_DEFAULT_SUB_QUESTION_MAX_CONTEXT_HITS
) # 8
AGENT_MAX_STATIC_HISTORY_WORD_LENGTH = int(
os.environ.get("AGENT_MAX_STATIC_HISTORY_WORD_LENGTH")
or AGENT_DEFAULT_MAX_STATIC_HISTORY_WORD_LENGTH
) # 2000
AGENT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_ENTITY_TERM_EXTRACTION
) # 25
AGENT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_DOCUMENT_VERIFICATION
) # 3
AGENT_TIMEOUT_OVERRIDE_LLM_GENERAL_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_GENERAL_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_GENERAL_GENERATION
) # 30
AGENT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBQUESTION_GENERATION
) # 8
AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_GENERATION
) # 12
AGENT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_INITIAL_ANSWER_GENERATION
) # 25
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_GENERATION
) # 25
AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_SUBANSWER_CHECK
) # 8
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_SUBQUESTION_GENERATION
) # 6
AGENT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_QUERY_REWRITING_GENERATION
) # 1
AGENT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_HISTORY_SUMMARY_GENERATION
) # 4
AGENT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_COMPARE_ANSWERS
) # 8
AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION = int(
os.environ.get("AGENT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION")
or AGENT_DEFAULT_TIMEOUT_OVERRIDE_LLM_REFINED_ANSWER_VALIDATION
) # 8
GRAPH_VERSION_NAME: str = "a"

View File

@ -893,14 +893,18 @@ def translate_db_sub_questions_to_server_objects(
question=sub_question.sub_question,
answer=sub_question.sub_answer,
sub_queries=sub_queries,
context_docs=get_retrieval_docs_from_search_docs(verified_docs),
context_docs=get_retrieval_docs_from_search_docs(
verified_docs, sort_by_score=False
),
)
)
return sub_questions
def get_retrieval_docs_from_search_docs(
search_docs: list[SearchDoc], remove_doc_content: bool = False
search_docs: list[SearchDoc],
remove_doc_content: bool = False,
sort_by_score: bool = True,
) -> RetrievalDocs:
top_documents = [
translate_db_search_doc_to_server_search_doc(
@ -908,7 +912,8 @@ def get_retrieval_docs_from_search_docs(
)
for db_doc in search_docs
]
top_documents = sorted(top_documents, key=lambda doc: doc.score, reverse=True) # type: ignore
if sort_by_score:
top_documents = sorted(top_documents, key=lambda doc: doc.score, reverse=True) # type: ignore
return RetrievalDocs(top_documents=top_documents)
@ -1018,7 +1023,7 @@ def log_agent_sub_question_results(
sub_question = sub_question_answer_result.question
sub_answer = sub_question_answer_result.answer
sub_document_results = _create_citation_format_list(
sub_question_answer_result.verified_reranked_documents
sub_question_answer_result.context_documents
)
sub_question_object = AgentSubQuestion(

View File

@ -52,6 +52,18 @@ litellm.telemetry = False
_LLM_PROMPT_LONG_TERM_LOG_CATEGORY = "llm_prompt"
class LLMTimeoutError(Exception):
"""
Exception raised when an LLM call times out.
"""
class LLMRateLimitError(Exception):
"""
Exception raised when an LLM call is rate limited.
"""
def _base_msg_to_role(msg: BaseMessage) -> str:
if isinstance(msg, HumanMessage) or isinstance(msg, HumanMessageChunk):
return "user"
@ -389,6 +401,7 @@ class DefaultMultiLLM(LLM):
tool_choice: ToolChoiceOptions | None,
stream: bool,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> litellm.ModelResponse | litellm.CustomStreamWrapper:
# litellm doesn't accept LangChain BaseMessage objects, so we need to convert them
# to a dict representation
@ -419,7 +432,7 @@ class DefaultMultiLLM(LLM):
stream=stream,
# model params
temperature=0,
timeout=self._timeout,
timeout=timeout_override or self._timeout,
# For now, we don't support parallel tool calls
# NOTE: we can't pass this in if tools are not specified
# or else OpenAI throws an error
@ -438,6 +451,12 @@ class DefaultMultiLLM(LLM):
except Exception as e:
self._record_error(processed_prompt, e)
# for break pointing
if isinstance(e, litellm.Timeout):
raise LLMTimeoutError(e)
elif isinstance(e, litellm.RateLimitError):
raise LLMRateLimitError(e)
raise e
@property
@ -458,6 +477,7 @@ class DefaultMultiLLM(LLM):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> BaseMessage:
if LOG_DANSWER_MODEL_INTERACTIONS:
self.log_model_configs()
@ -465,7 +485,12 @@ class DefaultMultiLLM(LLM):
response = cast(
litellm.ModelResponse,
self._completion(
prompt, tools, tool_choice, False, structured_response_format
prompt=prompt,
tools=tools,
tool_choice=tool_choice,
stream=False,
structured_response_format=structured_response_format,
timeout_override=timeout_override,
),
)
choice = response.choices[0]
@ -483,19 +508,31 @@ class DefaultMultiLLM(LLM):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> Iterator[BaseMessage]:
if LOG_DANSWER_MODEL_INTERACTIONS:
self.log_model_configs()
if DISABLE_LITELLM_STREAMING:
yield self.invoke(prompt, tools, tool_choice, structured_response_format)
yield self.invoke(
prompt,
tools,
tool_choice,
structured_response_format,
timeout_override,
)
return
output = None
response = cast(
litellm.CustomStreamWrapper,
self._completion(
prompt, tools, tool_choice, True, structured_response_format
prompt=prompt,
tools=tools,
tool_choice=tool_choice,
stream=True,
structured_response_format=structured_response_format,
timeout_override=timeout_override,
),
)
try:

View File

@ -81,6 +81,7 @@ class CustomModelServer(LLM):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> BaseMessage:
return self._execute(prompt)
@ -90,5 +91,6 @@ class CustomModelServer(LLM):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> Iterator[BaseMessage]:
yield self._execute(prompt)

View File

@ -90,12 +90,13 @@ class LLM(abc.ABC):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> BaseMessage:
self._precall(prompt)
# TODO add a postcall to log model outputs independent of concrete class
# implementation
return self._invoke_implementation(
prompt, tools, tool_choice, structured_response_format
prompt, tools, tool_choice, structured_response_format, timeout_override
)
@abc.abstractmethod
@ -105,6 +106,7 @@ class LLM(abc.ABC):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> BaseMessage:
raise NotImplementedError
@ -114,12 +116,13 @@ class LLM(abc.ABC):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> Iterator[BaseMessage]:
self._precall(prompt)
# TODO add a postcall to log model outputs independent of concrete class
# implementation
messages = self._stream_implementation(
prompt, tools, tool_choice, structured_response_format
prompt, tools, tool_choice, structured_response_format, timeout_override
)
tokens = []
@ -138,5 +141,6 @@ class LLM(abc.ABC):
tools: list[dict] | None = None,
tool_choice: ToolChoiceOptions | None = None,
structured_response_format: dict | None = None,
timeout_override: int | None = None,
) -> Iterator[BaseMessage]:
raise NotImplementedError

View File

@ -1,3 +1,7 @@
from onyx.agents.agent_search.shared_graph_utils.constants import (
AGENT_ANSWER_SEPARATOR,
)
# Standards
SEPARATOR_LINE = "-------"
SEPARATOR_LINE_LONG = "---------------"
@ -5,8 +9,6 @@ UNKNOWN_ANSWER = "I do not have enough information to answer this question."
NO_RECOVERED_DOCS = "No relevant information recovered"
YES = "yes"
NO = "no"
# Framing/Support/Template Prompts
HISTORY_FRAMING_PROMPT = f"""
For more context, here is the history of the conversation so far that preceded this question:
@ -16,6 +18,43 @@ For more context, here is the history of the conversation so far that preceded t
""".strip()
COMMON_RAG_RULES = f"""
IMPORTANT RULES:
- If you cannot reliably answer the question solely using the provided information, say that you cannot reliably answer. \
You may give some additional facts you learned, but do not try to invent an answer.
- If the information is empty or irrelevant, just say "{UNKNOWN_ANSWER}".
- If the information is relevant but not fully conclusive, provide an answer to the extent you can but also specify that \
the information is not conclusive and why.
- When constructing/considering categories, focus less on the question and more on the context actually provided! \
Example: if the question is about the products of company A, and the content provided lists a number of products, \
do automatically NOT ASSUME that those belong to company A! So you cannot list those as products of company A, despite \
the fact that the question is about company A's products. What you should say instead is maybe something like \
"Here are a number of products, but I cannot say whether some or all of them belong to company A: \
<proceed with listing the products>". It is ABSOLUTELY ESSENTIAL that the answer constructed reflects \
actual knowledge. For that matter, also consider the title of the document and other information that may be \
provided. If that does not make it clear that - in the example above - the products belong to company A, \
then do not list them as products of company A, just maybe as "A list products that may not necessarily \
belong to company A". THIS IS IMPORTANT!
- Related, if the context provides a list of items with associated data or other information that seems \
to align with the categories in the question, but does not specify whether the items or the information is \
specific to the exact requested category, then present the information with a disclaimer. Use a title such as \
"I am not sure whether these items (or the information provided) is specific to [relevant category] or whether \
these are all [specific group], but I found this information may be helpful:" \
followed by the list of items and associated data/or information discovered.
- Do not group together items amongst one headline where not all items belong to the category of the headline! \
(Example: "Products used by Company A" where some products listed are not built by Company A, but other companies,
or it is not clear that the products are built by Company A). Only state what you know for sure!
- Do NOT perform any calculations in the answer! Just report on facts.
- If appropriate, organizing your answer in bullet points is often useful.
""".strip()
ASSISTANT_SYSTEM_PROMPT_DEFAULT = "You are an assistant for question-answering tasks."
ASSISTANT_SYSTEM_PROMPT_PERSONA = f"""
@ -129,20 +168,44 @@ History summary:
# Sub-question
# Intentionally left a copy in case we want to modify this one differently
INITIAL_QUESTION_DECOMPOSITION_PROMPT = f"""
Decompose the initial user question into no more than 3 appropriate sub-questions that help to answer the \
original question. The purpose for this decomposition may be to:
1) isolate individual entities (i.e., 'compare sales of company A and company B' -> \
Please create a list of no more than 3 sub-questions whose answers would help to inform the answer \
to the initial question.
The purpose for these sub-questions could be:
1) decomposition to isolate individual entities (i.e., 'compare sales of company A and company B' -> \
['what are sales for company A', 'what are sales for company B'])
2) clarify or disambiguate ambiguous terms (i.e., 'what is our success with company A' -> \
2) clarification and/or disambiguation of ambiguous terms (i.e., 'what is our success with company A' -> \
['what are our sales with company A','what is our market share with company A', \
'is company A a reference customer for us', etc.])
3) if a term or a metric is essentially clear, but it could relate to various components of an entity and you \
are generally familiar with the entity, then you can decompose the question into sub-questions that are more \
specific to components (i.e., 'what do we do to improve scalability of product X', 'what do we to to improve \
scalability of product X', 'what do we do to improve stability of product X', ...])
4) research an area that could really help to answer the question.
Here is the initial question to decompose:
3) if a term or a metric is essentially clear, but it could relate to various aspects of an entity and you \
are generally familiar with the entity, then you can create sub-questions that are more \
specific (i.e., 'what do we do to improve product X' -> 'what do we do to improve scalability of product X', \
'what do we do to improve performance of product X', 'what do we do to improve stability of product X', ...)
4) research individual questions and areas that should really help to ultimately answer the question.
Important:
- Each sub-question should lend itself to be answered by a RAG system. Correspondingly, phrase the question \
in a way that is amenable to that. An example set of sub-questions based on an initial question could look like this:
'what can I do to improve the performance of workflow X' -> \
'what are the settings affecting performance for workflow X', 'are there complaints and bugs related to \
workflow X performance', 'what are performance benchmarks for workflow X', ...
- Consequently, again, don't just decompose, but make sure that the sub-questions have the proper form. I.e., no \
'I', etc.
- Do not(!) create sub-questions that are clarifying question to the person who asked the question, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
And here is the initial question to create sub-questions for, so that you have the full context:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
@ -150,7 +213,79 @@ Here is the initial question to decompose:
{{history}}
Do NOT include any text in your answer outside of the list of sub-questions!
Please formulate your answer as a newline-separated list of questions like so:
Please formulate your answer as a newline-separated list of questions like so (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question>
<sub-question>
<sub-question>
...
Answer:
""".strip()
# INITIAL PHASE - AWARE OF REFINEMENT
# Sub-question
# Suggest augmenting question generation as well, that a future refinement phase could use
# to generate new questions
# Intentionally left a copy in case we want to modify this one differently
INITIAL_QUESTION_DECOMPOSITION_PROMPT_ASSUMING_REFINEMENT = f"""
Please create a list of no more than 3 sub-questions whose answers would help to inform the answer \
to the initial question.
The purpose for these sub-questions could be:
1) decomposition to isolate individual entities (i.e., 'compare sales of company A and company B' -> \
['what are sales for company A', 'what are sales for company B'])
2) clarification and/or disambiguation of ambiguous terms (i.e., 'what is our success with company A' -> \
['what are our sales with company A','what is our market share with company A', \
'is company A a reference customer for us', etc.])
3) if a term or a metric is essentially clear, but it could relate to various aspects of an entity and you \
are generally familiar with the entity, then you can create sub-questions that are more \
specific (i.e., 'what do we do to improve product X' -> 'what do we do to improve scalability of product X', \
'what do we do to improve performance of product X', 'what do we do to improve stability of product X', ...)
4) research individual questions and areas that should really help to ultimately answer the question.
5) if meaningful, find relevant facts that may inform another set of sub-questions generate after the set you \
create now are answered. Example: 'which products have we implemented at company A, and is this different to \
its competitors?' could potentially create sub-questions 'what products have we implemented at company A', \
and 'who are the competitors of company A'. The additional round of sub-question generation which sees the \
answers for this initial round of sub-question creation could then use the answer to the second sub-question \
(which could be 'company B and C are competitors of company A') to then ask 'which products have we implemented \
at company B', 'which products have we implemented at company C'...
Important:
- Each sub-question should lend itself to be answered by a RAG system. Correspondingly, phrase the question \
in a way that is amenable to that. An example set of sub-questions based on an initial question could look like this:
'what can I do to improve the performance of workflow X' -> \
'what are the settings affecting performance for workflow X', 'are there complaints and bugs related to \
workflow X performance', 'what are performance benchmarks for workflow X', ...
- Consequently, again, don't just decompose, but make sure that the sub-questions have the proper form. I.e., no \
'I', etc.
- Do not(!) create sub-questions that are clarifying question to the person who asked the question, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
And here is the initial question to create sub-questions for:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
{{history}}
Do NOT include any text in your answer outside of the list of sub-questions!
Please formulate your answer as a newline-separated list of questions like so (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question>
<sub-question>
<sub-question>
@ -162,23 +297,47 @@ Answer:
# TODO: combine shared pieces with INITIAL_QUESTION_DECOMPOSITION_PROMPT
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH = f"""
Decompose the initial user question into no more than 3 appropriate sub-questions that help to answer the \
original question. The purpose for this decomposition may be to:
1) isolate individual entities (i.e., 'compare sales of company A and company B' -> \
Please create a list of no more than 3 sub-questions whose answers would help to inform the answer \
to the initial question.
The purpose for these sub-questions could be:
1) decomposition to isolate individual entities (i.e., 'compare sales of company A and company B' -> \
['what are sales for company A', 'what are sales for company B'])
2) clarify or disambiguate ambiguous terms (i.e., 'what is our success with company A' -> \
2) clarification and/or disambiguation of ambiguous terms (i.e., 'what is our success with company A' -> \
['what are our sales with company A','what is our market share with company A', \
'is company A a reference customer for us', etc.])
3) if a term or a metric is essentially clear, but it could relate to various components of an entity and you \
are generally familiar with the entity, then you can decompose the question into sub-questions that are more \
specific to components (i.e., 'what do we do to improve scalability of product X', 'what do we to to improve \
scalability of product X', 'what do we do to improve stability of product X', ...])
4) research an area that could really help to answer the question.
3) if a term or a metric is essentially clear, but it could relate to various aspects of an entity and you \
are generally familiar with the entity, then you can create sub-questions that are more \
specific (i.e., 'what do we do to improve product X' -> 'what do we do to improve scalability of product X', \
'what do we do to improve performance of product X', 'what do we do to improve stability of product X', ...)
4) research individual questions and areas that should really help to ultimately answer the question.
Important:
- Each sub-question should lend itself to be answered by a RAG system. Correspondingly, phrase the question \
in a way that is amenable to that. An example set of sub-questions based on an initial question could look like this:
'what can I do to improve the performance of workflow X' -> \
'what are the settings affecting performance for workflow X', 'are there complaints and bugs related to \
workflow X performance', 'what are performance benchmarks for workflow X', ...
- Consequently, again, don't just decompose, but make sure that the sub-questions have the proper form. I.e., no \
'I', etc.
- Do not(!) create sub-questions that are clarifying question to the person who asked the question, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
To give you some context, you will see below also some documents that may relate to the question. Please only \
use this information to learn what the question is approximately asking about, but do not focus on the details \
to construct the sub-questions! Also, some of the entities, relationships and terms that are in the dataset may \
not be in these few documents, so DO NOT focussed too much on the documents when constructing the sub-questions! \
not be in these few documents, so DO NOT focus too much on the documents when constructing the sub-questions! \
Decomposition and disambiguations are most important!
Here are the sample docs to give you some context:
@ -186,7 +345,7 @@ Here are the sample docs to give you some context:
{{sample_doc_str}}
{SEPARATOR_LINE}
And here is the initial question to decompose:
And here is the initial question to create sub-questions for, so that you have the full context:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
@ -194,7 +353,9 @@ And here is the initial question to decompose:
{{history}}
Do NOT include any text in your answer outside of the list of sub-questions!\
Please formulate your answer as a newline-separated list of questions like so:
Please formulate your answer as a newline-separated list of questions like so (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question>
<sub-question>
<sub-question>
@ -203,6 +364,84 @@ Please formulate your answer as a newline-separated list of questions like so:
Answer:
""".strip()
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH_ASSUMING_REFINEMENT = f"""
Please create a list of no more than 3 sub-questions whose answers would help to inform the answer \
to the initial question.
The purpose for these sub-questions could be:
1) decomposition to isolate individual entities (i.e., 'compare sales of company A and company B' -> \
['what are sales for company A', 'what are sales for company B'])
2) clarification and/or disambiguation of ambiguous terms (i.e., 'what is our success with company A' -> \
['what are our sales with company A','what is our market share with company A', \
'is company A a reference customer for us', etc.])
3) if a term or a metric is essentially clear, but it could relate to various aspects of an entity and you \
are generally familiar with the entity, then you can create sub-questions that are more \
specific (i.e., 'what do we do to improve product X' -> 'what do we do to improve scalability of product X', \
'what do we do to improve performance of product X', 'what do we do to improve stability of product X', ...)
4) research individual questions and areas that should really help to ultimately answer the question.
5) if applicable and useful, consider using sub-questions to gather relevant information that can inform a \
subsequent set of sub-questions. The answers to your initial sub-questions will be available when generating \
the next set.
For example, if you start with the question, "Which products have we implemented at Company A, and how does \
this compare to its competitors?" you might first create sub-questions like "What products have we implemented \
at Company A?" and "Who are the competitors of Company A?"
The answer to the second sub-question, such as "Company B and C are competitors of Company A," can then be used \
to generate more specific sub-questions in the next round, like "Which products have we implemented at Company B?" \
and "Which products have we implemented at Company C?"
You'll be the judge!
Important:
- Each sub-question should lend itself to be answered by a RAG system. Correspondingly, phrase the question \
in a way that is amenable to that. An example set of sub-questions based on an initial question could look like this:
'what can I do to improve the performance of workflow X' -> \
'what are the settings affecting performance for workflow X', 'are there complaints and bugs related to \
workflow X performance', 'what are performance benchmarks for workflow X', ...
- Consequently, again, don't just decompose, but make sure that the sub-questions have the proper form. I.e., no \
'I', etc.
- Do not(!) create sub-questions that are clarifying question to the person who asked the question, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
To give you some context, you will see below also some documents that may relate to the question. Please only \
use this information to learn what the question is approximately asking about, but do not focus on the details \
to construct the sub-questions! Also, some of the entities, relationships and terms that are in the dataset may \
not be in these few documents, so DO NOT focus too much on the documents when constructing the sub-questions! \
Decomposition and disambiguations are most important!
Here are the sample docs to give you some context:
{SEPARATOR_LINE}
{{sample_doc_str}}
{SEPARATOR_LINE}
And here is the initial question to create sub-questions for, so that you have the full context:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
{{history}}
Do NOT include any text in your answer outside of the list of sub-questions!\
Please formulate your answer as a newline-separated list of questions like so (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question>
<sub-question>
<sub-question>
...
Answer:
""".strip()
# Retrieval
QUERY_REWRITING_PROMPT = f"""
@ -257,23 +496,35 @@ Answer:
""".strip()
# Sub-Question Anser Generation
# Sub-Question Answer Generation
SUB_QUESTION_RAG_PROMPT = f"""
Use the context provided below - and only the provided context - to answer the given question. \
(Note that the answer is in service of answering a broader question, given below as 'motivation').
Again, only use the provided context and do not use your internal knowledge! If you cannot answer the \
question based on the context, say "{UNKNOWN_ANSWER}". It is a matter of life and death that you do NOT \
use your internal knowledge, just the provided information!
Make sure that you keep all relevant information, specifically as it concerns to the ultimate goal. \
Make sure that you keep all relevant information, specifically as it concerns the ultimate goal. \
(But keep other details as well.)
It is critical that you provide inline citations in the format [D1], [D2], [D3], etc! \
{COMMON_RAG_RULES}
- Make sure that you only state what you actually can positively learn from the provided context! Particularly \
don't make assumptions! Example: if i) a question you should answer is asking for products of companies that \
are competitors of company A, and ii) the context mentions products of companies A, B, C, D, E, etc., do NOT assume \
that B, C, D, E, etc. are competitors of A! All you know is that these are products of a number of companies, and you \
would have to rely on another question - that you do not have access to - to learn which companies are competitors of A.
Correspondingly, you should not say that these are the products of competitors of A, but rather something like \
"Here are some products of various companies".
It is critical that you provide inline citations in the format [D1], [D2], [D3], etc! Please use format [D1][D2] and NOT \
[D1, D2] format if you cite two or more documents together! \
It is important that the citation is close to the information it supports. \
Proper citations are very important to the user!
For your general information, here is the ultimate motivation:
Here is the document context for you to consider:
{SEPARATOR_LINE}
{{context}}
{SEPARATOR_LINE}
For your general information, here is the ultimate motivation for the question you need to answer:
{SEPARATOR_LINE}
{{original_question}}
{SEPARATOR_LINE}
@ -283,12 +534,8 @@ And here is the actual question I want you to answer based on the context above
{{question}}
{SEPARATOR_LINE}
Here is the context:
{SEPARATOR_LINE}
{{context}}
{SEPARATOR_LINE}
Please keep your answer brief and concise, and focus on facts and data.
Please keep your answer brief and concise, and focus on facts and data. (Again, only state what you see in the documents \
for sure and communicate if/in what way this may or may not relate to the question you need to answer!)
Answer:
""".strip()
@ -321,22 +568,18 @@ Use the information provided below - and only the provided information - to answ
The information provided below consists of:
1) a number of answered sub-questions - these are very important to help you organize your thoughts and your answer
2) a number of documents that deemed relevant for the question.
2) a number of documents that are deemed relevant for the question.
{{history}}
It is critical that you provide prover inline citations to documents in the format [D1], [D2], [D3], etc.! \
It is critical that you provide proper inline citations to documents in the format [D1], [D2], [D3], etc.! \
It is important that the citation is close to the information it supports. If you have multiple citations that support \
a fact, please cite for example as [D1][D3], or [D2][D4], etc. \
Feel free to also cite sub-questions in addition to documents, but make sure that you have documents cited with the \
sub-question citation. If you want to cite both a document and a sub-question, please use [D1][Q3], or [D2][D7][Q4], etc. \
Again, please NEVER cite sub-questions without a document citation! Proper citations are very important for the user!
IMPORTANT RULES:
- If you cannot reliably answer the question solely using the provided information, say that you cannot reliably answer. \
You may give some additional facts you learned, but do not try to invent an answer.
- If the information is empty or irrelevant, just say "{UNKNOWN_ANSWER}".
- If the information is relevant but not fully conclusive, specify that the information is not conclusive and say why.
{COMMON_RAG_RULES}
Again, you should be sure that the answer is supported by the information provided!
@ -361,7 +604,9 @@ And here is the question I want you to answer based on the information above:
{{question}}
{SEPARATOR_LINE}
Please keep your answer brief and concise, and focus on facts and data.
Please keep your answer brief and concise, and focus on facts and data. (Again, only state what you see in the documents for \
sure and communicate if/in what way this may or may not relate to the question you need to answer! Use the answered \
sub-questions as well, but be cautious and reconsider the docments again for validation.)
Answer:
""".strip()
@ -376,11 +621,7 @@ The information provided below consists of a number of documents that were deeme
{{history}}
IMPORTANT RULES:
- If you cannot reliably answer the question solely using the provided information, say that you cannot reliably answer. \
You may give some additional facts you learned, but do not try to invent an answer.
- If the information is irrelevant, just say "{UNKNOWN_ANSWER}".
- If the information is relevant but not fully conclusive, specify that the information is not conclusive and say why.
{COMMON_RAG_RULES}
Again, you should be sure that the answer is supported by the information provided!
@ -399,7 +640,8 @@ And here is the question I want you to answer based on the context above:
{{question}}
{SEPARATOR_LINE}
Please keep your answer brief and concise, and focus on facts and data.
Please keep your answer brief and concise, and focus on facts and data. (Again, only state what you see in the documents \
for sure and communicate if/in what way this may or may not relate to the question you need to answer!)
Answer:
""".strip()
@ -439,6 +681,12 @@ independently without the original question available
- For each sub-question, please also provide a search term that can be used to retrieve relevant documents from a document store.
- Consider specifically the sub-questions that were suggested but not answered. This is a sign that they are not answerable \
with the available context, and you should not ask similar questions.
- Do not(!) create sub-questions that are clarifying question to the person who asked the question, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
Here is the initial question:
{SEPARATOR_LINE}
@ -474,7 +722,111 @@ objects/relationships/terms you can ask about! Do not ask about entities, terms
Again, please find questions that are NOT overlapping too much with the already answered sub-questions or those that \
already were suggested and failed. In other words - what can we try in addition to what has been tried so far?
Generate the list of questions separated by one new line like this:
Generate the list of questions separated by one new line like this (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question 1>
<sub-question 2>
<sub-question 3>
...""".strip()
REFINEMENT_QUESTION_DECOMPOSITION_PROMPT_W_INITIAL_SUBQUESTION_ANSWERS = f"""
An initial user question needs to be answered. An initial answer has been provided but it wasn't quite good enough. \
Also, some sub-questions had been answered and this information has been used to provide the initial answer. \
Some other subquestions may have been suggested based on little knowledge, but they were not directly answerable. \
Also, some entities, relationships and terms are given to you so that you have an idea of how the available data looks like.
Your role is to generate 2-4 new sub-questions that would help to answer the initial question, considering:
1) The initial question
2) The initial answer that was found to be unsatisfactory
3) The sub-questions that were answered AND their answers
4) The sub-questions that were suggested but not answered (and that you should not repeat!)
5) The entities, relationships and terms that were extracted from the context
The individual questions should be answerable by a good RAG system. So a good idea would be to use the sub-questions to \
resolve ambiguities and/or to separate the question for different entities that may be involved in the original question, \
but in a way that does not duplicate questions that were already tried.
Additional Guidelines:
- The new sub-questions should be specific to the question and provide richer context for the question, resolve ambiguities, \
or address shortcoming of the initial answer
- Each new sub-question - when answered - should be relevant for the answer to the original question
- The new sub-questions should be free from comparisons, ambiguities,judgements, aggregations, or any other complications that \
may require extra context
- The new sub-questions MUST have the full context of the original question so that it can be executed by a RAG system \
independently without the original question available
Example:
- initial question: "What is the capital of France?"
- bad sub-question: "What is the name of the river there?"
- good sub-question: "What is the name of the river that flows through Paris?"
- For each new sub-question, please also provide a search term that can be used to retrieve relevant documents \
from a document store.
- Consider specifically the sub-questions that were suggested but not answered. This is a sign that they are not answerable \
with the available context, and you should not ask similar questions.
- Pay attention to the answers of previous sub-question to make your sub-questions more specific! \
Often the initial sub-questions were set up to give you critical information that you should use to generate new sub-questions.\
For example, if the answer to a an earlier sub-question is \
'Company B and C are competitors of Company A', you should not ask now a new sub-question involving the term 'competitors', \
as you already have the information to create a more precise question - you should instead explicitly reference \
'Company B' and 'Company C' in your new sub-questions, as these are the competitors based on the previously answered question.
- Be precise(!) and don't make inferences you cannot be sure about! For example, in the previous example \
where Company B and Company C were identified as competitors of Company A, and then you also get information on \
companies D and E, do not make the inference that these are also competitors of Company A! Stick to the information you have!
(Also, don't assume that companies B and C arethe only competitors of A, unless stated!)
- Do not(!) create sub-questions that are clarifying question *to the person who asked the question*, \
like making suggestions or asking the user for more information! This is not useful for the actual \
question-answering process! You need to take the information from the user as it is given to you! \
For example, should the question be of the type 'why does product X perform poorly for customer A', DO NOT create a \
sub-question of the type 'what are the settings that customer A uses for product X?'! A valid sub-question \
could rather be 'which settings for product X have been shown to lead to poor performance for customers?'
Here is the initial question:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
{{history}}
Here is the initial sub-optimal answer:
{SEPARATOR_LINE}
{{base_answer}}
{SEPARATOR_LINE}
Here are the sub-questions that were answered:
{SEPARATOR_LINE}
{{answered_subquestions_with_answers}}
{SEPARATOR_LINE}
Here are the sub-questions that were suggested but not answered:
{SEPARATOR_LINE}
{{failed_sub_questions}}
{SEPARATOR_LINE}
And here are the entities, relationships and terms extracted from the context:
{SEPARATOR_LINE}
{{entity_term_extraction_str}}
{SEPARATOR_LINE}
Please generate the list of good, fully contextualized sub-questions that would help to address the main question. \
Specifically pay attention also to the entities, relationships and terms extracted, as these indicate what type of \
objects/relationships/terms you can ask about! Do not ask about entities, terms or relationships that are not mentioned \
in the 'entities, relationships and terms' section.
Again, please find questions that are NOT overlapping too much with the already answered sub-questions or those that \
already were suggested and failed. In other words - what can we try in addition to what has been tried so far?
Generate the list of questions separated by one new line like this (and please ONLY ANSWER WITH THIS LIST! Do not \
add any explanations or other text!):
<sub-question 1>
<sub-question 2>
<sub-question 3>
@ -489,7 +841,7 @@ Your task is to improve on a given answer to a question, as the initial answer w
Use the information provided below - and only the provided information - to write your new and improved answer.
The information provided below consists of:
1) an initial answer that was given but found to be lacking in some way.
1) an initial answer that was given but likely found to be lacking in some way.
2) a number of answered sub-questions - these are very important(!) and definitely should help you to answer the main \
question. Note that the sub-questions have a type, 'initial' and 'refined'. The 'initial' ones were available for the \
creation of the initial answer, but the 'refined' were not, they are new. So please use the 'refined' sub-questions in \
@ -499,6 +851,7 @@ particular to update/extend/correct/enrich the initial answer and to add more de
the relevant document for a fact!
It is critical that you provide proper inline citations to documents in the format [D1], [D2], [D3], etc! \
Please use format [D1][D2] and NOT [D1, D2] format if you cite two or more documents together! \
It is important that the citation is close to the information it supports. \
DO NOT just list all of the citations at the very end. \
Feel free to also cite sub-questions in addition to documents, \
@ -509,14 +862,7 @@ Proper citations are very important for the user!
{{history}}
IMPORTANT RULES:
- If you cannot reliably answer the question solely using the provided information, say that you cannot reliably answer. \
You may give some additional facts you learned, but do not try to invent an answer.
- If the information is empty or irrelevant, just say "{UNKNOWN_ANSWER}".
- If the information is relevant but not fully conclusive, provide an answer to the extent you can but also specify that \
the information is not conclusive and why.
- Ignore any existing citations within the answered sub-questions, like [D1]... and [Q2]! The citations you will need to \
use will need to refer to the documents (and sub-questions) that you are explicitly presented with below!
{COMMON_RAG_RULES}
Again, you should be sure that the answer is supported by the information provided!
@ -545,7 +891,9 @@ Lastly, here is the main question I want you to answer based on the information
{{question}}
{SEPARATOR_LINE}
Please keep your answer brief and concise, and focus on facts and data.
Please keep your answer brief and concise, and focus on facts and data. (Again, only state what you see in the documents for \
sure and communicate if/in what way this may or may not relate to the question you need to answer! Use the answered \
sub-questions as well, but be cautious and reconsider the docments again for validation.)
Answer:
""".strip()
@ -561,18 +909,13 @@ The information provided below consists of:
2) a number of documents that were also deemed relevant for the question.
It is critical that you provide proper inline citations to documents in the format [D1], [D2], [D3], etc! \
Please use format [D1][D2] and NOT [D1, D2] format if you cite two or more documents together! \
It is important that the citation is close to the information it supports. \
DO NOT just list all of the citations at the very end of your response. Citations are very important for the user!
{{history}}
IMPORTANT RULES:
- If you cannot reliably answer the question solely using the provided information, say that you cannot reliably answer. \
You may give some additional facts you learned, but do not try to invent an answer.
- If the information is empty or irrelevant, just say "{UNKNOWN_ANSWER}".
- If the information is relevant but not fully conclusive, provide an answer to the extent you can but also specify that \
the information is not conclusive and why.
{COMMON_RAG_RULES}
Again, you should be sure that the answer is supported by the information provided!
Try to keep your answer concise. But also highlight uncertainties you may have should there be substantial ones, \
@ -597,11 +940,103 @@ Lastly, here is the question I want you to answer based on the information above
{{question}}
{SEPARATOR_LINE}
Please keep your answer brief and concise, and focus on facts and data.
Please keep your answer brief and concise, and focus on facts and data. (Again, only state what you see in the documents for \
sure and communicate if/in what way this may or may not relate to the question you need to answer!)
Answer:
""".strip()
REFINED_ANSWER_VALIDATION_PROMPT = f"""
{{persona_specification}}
Your task is to verify whether a given answer is truthful and accurate, and supported by the facts that you \
will be provided with.
The information provided below consists of:
1) a question that needed to be answered
2) a proposed answer to the question, whose accuracy you should assess
3) potentially, a brief summary of the history of the conversation thus far, as it may give more context \
to the question. Note that the statements in the history are NOT considered as facts, ONLY but serve to to \
give context to the question.
4) a number of answered sub-questions - you can take the answers as facts for these purposes.
5) a number of relevant documents that should support the answer and that you should use as fact, \
i.e., if a statement in the document backs up a statement in the answer, then that statement in the answer \
should be considered as true.
IMPORTANT RULES AND CONSIDERATIONS:
- Please consider the statements made in the proposed answer and assess whether they are truthful and accurate, based \
on the provided sub-answered and the documents. (Again, the history is NOT considered as facts!)
- Look in particular for:
* material statements that are not supported by the sub-answered or the documents
* assignments and groupings that are not supported, like company A is competitor of company B, but this is not \
explicitly supported by documents or sub-answers, guesses or interpretations unless explicitly asked for
- look also at the citations in the proposed answer and assess whether they are appropriate given the statements \
made in the proposed answer that cites the document.
- Are items grouped together amongst one headline where not all items belong to the category of the headline? \
(Example: "Products used by Company A" where some products listed are not used by Company A)
- Does the proposed answer address the question in full?
- Is the answer specific to the question? Example: if the question asks for the prices for products by Company A, \
but the answer lists the prices for products by Company A and Company B, or products it cannot be sure are by \
Company A, then this is not quite specific enough to the question and the answer should be rejected.
- Similarly, if the question asks for properties of a certain class but the proposed answer lists or includes entities \
that are not of that class without very explicitly saying so, then the answer should be considered inaccurate.
- If there are any calculations in the proposed answer that are not supported by the documents, they need to be tested. \
If any calculation is wrong, the proposed answer should be considered as not trustworthy.
Here is the information:
{SEPARATOR_LINE_LONG}
QUESTION:
{SEPARATOR_LINE}
{{question}}
{SEPARATOR_LINE}
PROPOSED ANSWER:
{SEPARATOR_LINE}
{{proposed_answer}}
{SEPARATOR_LINE}
Here is the additional contextual information:
{SEPARATOR_LINE_LONG}
{{history}}
Sub-questions and their answers (to be considered as facts):
{SEPARATOR_LINE}
{{answered_sub_questions}}
{SEPARATOR_LINE}
And here are the relevant documents that support the sub-question answers, and that are relevant for the actual question:
{SEPARATOR_LINE}
{{relevant_docs}}
{SEPARATOR_LINE}
Please think through this step by step. Format your response just as a string in the following format:
Analysis: <think through your reasoning as outlined in the 'IMPORTANT RULES AND CONSIDERATIONS' section above, \
but keep it short. Come to a conclusion whether the proposed answer can be trusted>
Comments: <state your condensed comments you would give to a user reading the proposed answer, regarding the accuracy and \
specificity.>
{AGENT_ANSWER_SEPARATOR} <answer here only with yes or no, whether the proposed answer can be trusted. Base this on your \
analysis, but only say 'yes' (trustworthy) or 'no' (not trustworthy)>
""".strip()
INITIAL_REFINED_ANSWER_COMPARISON_PROMPT = f"""
For the given question, please compare the initial answer and the refined answer and determine if the refined answer is \

View File

@ -1,18 +1,39 @@
import csv
import datetime
import json
import os
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from typing import Any
import yaml
from onyx.agents.agent_search.deep_search.main.graph_builder import (
main_graph_builder,
)
from onyx.agents.agent_search.deep_search.main.states import MainInput
from onyx.agents.agent_search.deep_search.main.graph_builder import (
main_graph_builder as main_graph_builder_a,
)
from onyx.agents.agent_search.deep_search.main.states import (
MainInput as MainInput_a,
)
from onyx.agents.agent_search.run_graph import run_basic_graph
from onyx.agents.agent_search.run_graph import run_main_graph
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import OnyxAnswerPiece
from onyx.chat.models import RefinedAnswerImprovement
from onyx.chat.models import StreamStopInfo
from onyx.chat.models import StreamType
from onyx.chat.models import SubQuestionPiece
from onyx.context.search.models import SearchRequest
from onyx.db.engine import get_session_context_manager
from onyx.llm.factory import get_default_llms
from onyx.tools.force import ForceUseTool
from onyx.tools.tool_implementations.search.search_tool import SearchTool
from onyx.utils.logger import setup_logger
logger = setup_logger()
cwd = os.getcwd()
@ -35,95 +56,183 @@ input_file_object = open(
)
output_file = f"{OUTPUT_DIR}/agent_test_output.csv"
csv_output_data: list[list[str]] = []
test_data = json.load(input_file_object)
example_data = test_data["examples"]
example_ids = test_data["example_ids"]
failed_example_ids: list[int] = []
with get_session_context_manager() as db_session:
output_data = []
output_data: dict[str, Any] = {}
primary_llm, fast_llm = get_default_llms()
for example in example_data:
example_id = example["id"]
query_start_time: datetime = datetime.now()
example_id: int = int(example.get("id"))
example_question: str = example.get("question")
if not example_question or not example_id:
continue
if len(example_ids) > 0 and example_id not in example_ids:
continue
example_question = example["question"]
target_sub_questions = example.get("target_sub_questions", [])
num_target_sub_questions = len(target_sub_questions)
search_request = SearchRequest(query=example_question)
logger.info(f"{query_start_time} -- Processing example {example_id}")
config, search_tool = get_test_config(
db_session=db_session,
primary_llm=primary_llm,
fast_llm=fast_llm,
search_request=search_request,
)
try:
example_question = example["question"]
target_sub_questions = example.get("target_sub_questions", [])
num_target_sub_questions = len(target_sub_questions)
search_request = SearchRequest(query=example_question)
inputs = MainInput()
initial_answer_duration: timedelta | None = None
refined_answer_duration: timedelta | None = None
base_answer_duration: timedelta | None = None
start_time = datetime.datetime.now()
logger.debug("\n\nTEST QUERY START\n\n")
question_result = compiled_graph.invoke(
input=inputs, config={"metadata": {"config": config}}
)
end_time = datetime.datetime.now()
graph = main_graph_builder_a()
compiled_graph = graph.compile()
query_end_time = datetime.now()
duration = end_time - start_time
if num_target_sub_questions > 0:
chunk_expansion_ratio = (
question_result["initial_agent_stats"]
.get("agent_effectiveness", {})
.get("utilized_chunk_ratio", None)
search_request = SearchRequest(
# query="what can you do with gitlab?",
# query="What are the guiding principles behind the development of cockroachDB",
# query="What are the temperatures in Munich, Hawaii, and New York?",
# query="When was Washington born?",
# query="What is Onyx?",
# query="What is the difference between astronomy and astrology?",
query=example_question,
)
support_effectiveness_ratio = (
question_result["initial_agent_stats"]
.get("agent_effectiveness", {})
.get("support_ratio", None)
)
else:
chunk_expansion_ratio = None
support_effectiveness_ratio = None
generated_sub_questions = question_result.get("generated_sub_questions", [])
num_generated_sub_questions = len(generated_sub_questions)
base_answer = question_result["initial_base_answer"].split("==")[-1]
agent_answer = question_result["initial_answer"].split("==")[-1]
answer_tokens: dict[str, list[str]] = defaultdict(list)
output_point = {
"example_id": example_id,
"question": example_question,
"duration": duration,
"target_sub_questions": target_sub_questions,
"generated_sub_questions": generated_sub_questions,
"num_target_sub_questions": num_target_sub_questions,
"num_generated_sub_questions": num_generated_sub_questions,
"chunk_expansion_ratio": chunk_expansion_ratio,
"support_effectiveness_ratio": support_effectiveness_ratio,
"base_answer": base_answer,
"agent_answer": agent_answer,
}
with get_session_context_manager() as db_session:
config = get_test_config(
db_session, primary_llm, fast_llm, search_request
)
assert (
config.persistence is not None
), "set a chat session id to run this test"
output_data.append(output_point)
# search_request.persona = get_persona_by_id(1, None, db_session)
# config.perform_initial_search_path_decision = False
config.behavior.perform_initial_search_decomposition = True
input = MainInput_a()
# Base Flow
base_flow_start_time: datetime = datetime.now()
for output in run_basic_graph(config):
if isinstance(output, OnyxAnswerPiece):
answer_tokens["base_answer"].append(output.answer_piece or "")
output_data["base_answer"] = "".join(answer_tokens["base_answer"])
output_data["base_answer_duration"] = (
datetime.now() - base_flow_start_time
)
# Agent Flow
agent_flow_start_time: datetime = datetime.now()
config = get_test_config(
db_session,
primary_llm,
fast_llm,
search_request,
use_agentic_search=True,
)
config.tooling.force_use_tool = ForceUseTool(
force_use=True, tool_name=SearchTool._NAME
)
tool_responses: list = []
sub_question_dict_tokens: dict[int, dict[int, str]] = defaultdict(
lambda: defaultdict(str)
)
for output in run_main_graph(config):
if isinstance(output, AgentAnswerPiece):
if output.level == 0 and output.level_question_num == 0:
answer_tokens["initial"].append(output.answer_piece)
elif output.level == 1 and output.level_question_num == 0:
answer_tokens["refined"].append(output.answer_piece)
elif isinstance(output, SubQuestionPiece):
if (
output.level is not None
and output.level_question_num is not None
):
sub_question_dict_tokens[output.level][
output.level_question_num
] += output.sub_question
elif isinstance(output, StreamStopInfo):
if (
output.stream_type == StreamType.MAIN_ANSWER
and output.level == 0
):
initial_answer_duration = (
datetime.now() - agent_flow_start_time
)
elif isinstance(output, RefinedAnswerImprovement):
output_data["refined_answer_improves_on_initial_answer"] = str(
output.refined_answer_improvement
)
refined_answer_duration = datetime.now() - agent_flow_start_time
output_data["example_id"] = example_id
output_data["question"] = example_question
output_data["initial_answer"] = "".join(answer_tokens["initial"])
output_data["refined_answer"] = "".join(answer_tokens["refined"])
output_data["initial_answer_duration"] = initial_answer_duration or ""
output_data["refined_answer_duration"] = refined_answer_duration
output_data["initial_sub_questions"] = "\n---\n".join(
[x for x in sub_question_dict_tokens[0].values()]
)
output_data["refined_sub_questions"] = "\n---\n".join(
[x for x in sub_question_dict_tokens[1].values()]
)
csv_output_data.append(
[
str(example_id),
example_question,
output_data["base_answer"],
output_data["base_answer_duration"],
output_data["initial_sub_questions"],
output_data["initial_answer"],
output_data["initial_answer_duration"],
output_data["refined_sub_questions"],
output_data["refined_answer"],
output_data["refined_answer_duration"],
output_data["refined_answer_improves_on_initial_answer"],
]
)
except Exception as e:
logger.error(f"Error processing example {example_id}: {e}")
failed_example_ids.append(example_id)
continue
with open(output_file, "w", newline="") as csvfile:
fieldnames = [
"example_id",
"question",
"duration",
"target_sub_questions",
"generated_sub_questions",
"num_target_sub_questions",
"num_generated_sub_questions",
"chunk_expansion_ratio",
"support_effectiveness_ratio",
"base_answer",
"agent_answer",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter="\t")
writer.writeheader()
writer.writerows(output_data)
writer = csv.writer(csvfile, delimiter="\t")
writer.writerow(
[
"example_id",
"question",
"base_answer",
"base_answer_duration",
"initial_sub_questions",
"initial_answer",
"initial_answer_duration",
"refined_sub_questions",
"refined_answer",
"refined_answer_duration",
"refined_answer_improves_on_initial_answer",
]
)
writer.writerows(csv_output_data)
print("DONE")

View File

@ -1122,7 +1122,7 @@ export function ChatPage({
"Continue Generating (pick up exactly where you left off)",
});
};
const [gener, setFinishedStreaming] = useState(false);
const [uncaughtError, setUncaughtError] = useState<string | null>(null);
const onSubmit = async ({
messageIdToResend,
@ -1445,7 +1445,6 @@ export function ChatPage({
Object.hasOwn(packet, "level_question_num")
) {
if ((packet as StreamStopInfo).stream_type == "main_answer") {
setFinishedStreaming(true);
updateChatState("streaming", frozenSessionId);
}
if (
@ -1563,8 +1562,23 @@ export function ChatPage({
}
);
} else if (Object.hasOwn(packet, "error")) {
error = (packet as StreamingError).error;
stackTrace = (packet as StreamingError).stack_trace;
if (
sub_questions.length > 0 &&
sub_questions
.filter((q) => q.level === 0)
.every((q) => q.is_stopped === true)
) {
setUncaughtError((packet as StreamingError).error);
updateChatState("input");
setAgenticGenerating(false);
setAlternativeGeneratingAssistant(null);
setSubmittedMessage("");
return;
// throw new Error((packet as StreamingError).error);
} else {
error = (packet as StreamingError).error;
stackTrace = (packet as StreamingError).stack_trace;
}
} else if (Object.hasOwn(packet, "message_id")) {
finalMessage = packet as BackendMessage;
} else if (Object.hasOwn(packet, "stop_reason")) {
@ -2054,6 +2068,7 @@ export function ChatPage({
}
const data = await response.json();
router.push(data.redirect_url);
} catch (error) {
console.error("Error seeding chat from Slack:", error);
@ -2649,6 +2664,7 @@ export function ChatPage({
{message.sub_questions &&
message.sub_questions.length > 0 ? (
<AgenticMessage
error={uncaughtError}
isStreamingQuestions={
message.isStreamingQuestions ?? false
}

View File

@ -81,6 +81,7 @@ export const AgenticMessage = ({
agenticDocs,
secondLevelSubquestions,
toggleDocDisplay,
error,
}: {
isStreamingQuestions: boolean;
isGenerating: boolean;
@ -113,6 +114,7 @@ export const AgenticMessage = ({
regenerate?: (modelOverRide: LlmOverride) => Promise<void>;
setPresentingDocument?: (document: OnyxDocument) => void;
toggleDocDisplay?: (agentic: boolean) => void;
error?: string | null;
}) => {
const [noShowingMessage, setNoShowingMessage] = useState(isComplete);
@ -491,11 +493,28 @@ export const AgenticMessage = ({
) : (
content
)}
{error && (
<p className="mt-2 text-red-700 text-sm my-auto">
{error}
</p>
)}
</div>
</div>
</>
) : isComplete ? null : (
<></>
) : isComplete ? (
error && (
<p className="mt-2 mx-4 text-red-700 text-sm my-auto">
{error}
</p>
)
) : (
<>
{error && (
<p className="mt-2 mx-4 text-red-700 text-sm my-auto">
{error}
</p>
)}
</>
)}
{handleFeedback &&
(isActive ? (

View File

@ -186,6 +186,7 @@ export const AIMessage = ({
setPresentingDocument,
index,
documentSidebarVisible,
removePadding,
}: {
index?: number;
shared?: boolean;
@ -214,6 +215,7 @@ export const AIMessage = ({
overriddenModel?: string;
regenerate?: (modelOverRide: LlmOverride) => Promise<void>;
setPresentingDocument: (document: OnyxDocument) => void;
removePadding?: boolean;
}) => {
const toolCallGenerating = toolCall && !toolCall.tool_result;
@ -402,7 +404,9 @@ export const AIMessage = ({
<div
id={isComplete ? "onyx-ai-message" : undefined}
ref={trackedElementRef}
className={`py-5 text-text ml-4 lg:px-5 relative flex `}
className={`py-5 ml-4 lg:px-5 relative flex
${removePadding && "!pl-24 -mt-12"}`}
>
<div
className={`mx-auto ${
@ -411,11 +415,13 @@ export const AIMessage = ({
>
<div className={`lg:mr-12 ${!shared && "mobile:ml-0 md:ml-8"}`}>
<div className="flex items-start">
<AssistantIcon
className="mobile:hidden"
size={24}
assistant={alternativeAssistant || currentPersona}
/>
{!removePadding && (
<AssistantIcon
className="mobile:hidden"
size={24}
assistant={alternativeAssistant || currentPersona}
/>
)}
<div className="w-full">
<div className="max-w-message-max break-words">
@ -596,7 +602,8 @@ export const AIMessage = ({
)}
</div>
{handleFeedback &&
{!removePadding &&
handleFeedback &&
(isActive ? (
<div
className={`