improved logging through agent_state plus some default fixes

This commit is contained in:
joachim-danswer 2025-01-31 11:31:38 -08:00 committed by Evan Lohn
parent 23ae4547ca
commit 1a2760edee
28 changed files with 251 additions and 233 deletions

View File

@ -15,8 +15,7 @@ logger = setup_logger()
def send_to_expanded_retrieval(state: AnswerQuestionInput) -> Send | Hashable:
logger.debug("sending to expanded retrieval via edge")
now_start = datetime.now()
edge_start_time = datetime.now()
return Send(
"initial_sub_question_expanded_retrieval",
@ -24,6 +23,6 @@ def send_to_expanded_retrieval(state: AnswerQuestionInput) -> Send | Hashable:
question=state.question,
base_search=False,
sub_question_id=state.question_id,
log_messages=[f"{now_start} -- Sending to expanded retrieval"],
log_messages=[f"{edge_start_time} -- Sending to expanded retrieval"],
),
)

View File

@ -14,21 +14,28 @@ from onyx.agents.agent_search.deep_search_a.initial.generate_individual_sub_answ
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.prompts import SUB_CHECK_PROMPT
from onyx.agents.agent_search.shared_graph_utils.prompts import UNKNOWN_ANSWER
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import parse_question_id
def check_sub_answer(
state: AnswerQuestionState, config: RunnableConfig
) -> QACheckUpdate:
now_start = datetime.now()
node_start_time = datetime.now()
level, question_num = parse_question_id(state.question_id)
if state.answer == UNKNOWN_ANSWER:
now_end = datetime.now()
return QACheckUpdate(
answer_quality=False,
log_messages=[
f"{now_start} -- Answer check SQ-{level}-{question_num} - unknown answer, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate individual sub answer",
node_name="check sub answer",
node_start_time=node_start_time,
result="unknown answer",
)
],
)
msg = [
@ -51,11 +58,14 @@ def check_sub_answer(
quality_str: str = merge_message_runs(response, chunk_separator="")[0].content
answer_quality = "yes" in quality_str.lower()
now_end = datetime.now()
return QACheckUpdate(
answer_quality=answer_quality,
log_messages=[
f"""{now_start} -- Answer check SQ-{level}-{question_num} - Answer quality: {quality_str},
Time taken: {now_end - now_start}"""
get_langgraph_node_log_string(
graph_component="initial - generate individual sub answer",
node_name="check sub answer",
node_start_time=node_start_time,
result=f"Answer quality: {quality_str}",
)
],
)

View File

@ -18,6 +18,9 @@ from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
)
from onyx.agents.agent_search.shared_graph_utils.prompts import NO_RECOVERED_DOCS
from onyx.agents.agent_search.shared_graph_utils.utils import get_answer_citation_ids
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_persona_agent_prompt_expressions,
)
@ -37,8 +40,7 @@ def generate_sub_answer(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> QAGenerationUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------START ANSWER GENERATION---")
node_start_time = datetime.now()
agent_search_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = state.question
@ -62,8 +64,6 @@ def generate_sub_answer(
writer,
)
else:
logger.debug(f"Number of verified retrieval docs: {len(context_docs)}")
fast_llm = agent_search_config.fast_llm
msg = build_sub_question_answer_prompt(
question=question,
@ -119,15 +119,15 @@ def generate_sub_answer(
)
write_custom_event("stream_finished", stop_event, writer)
now_end = datetime.now()
logger.info(
f"{now_start} -- Answer generation SQ-{level} - Q{question_nr} - Time taken: {now_end - now_start}"
)
return QAGenerationUpdate(
answer=answer_str,
cited_docs=cited_docs,
log_messages=[
f"{now_start} -- Answer generation SQ-{level} - Q{question_nr} - Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate individual sub answer",
node_name="generate sub answer",
node_start_time=node_start_time,
result="",
)
],
)

View File

@ -4,6 +4,7 @@ from typing import Annotated
from pydantic import BaseModel
from onyx.agents.agent_search.core_state import SubgraphCoreState
from onyx.agents.agent_search.deep_search_a.main.states import LoggerUpdate
from onyx.agents.agent_search.shared_graph_utils.models import AgentChunkStats
from onyx.agents.agent_search.shared_graph_utils.models import QueryResult
from onyx.agents.agent_search.shared_graph_utils.models import (
@ -16,19 +17,19 @@ from onyx.context.search.models import InferenceSection
## Update States
class QACheckUpdate(BaseModel):
class QACheckUpdate(LoggerUpdate, BaseModel):
answer_quality: bool = False
log_messages: list[str] = []
class QAGenerationUpdate(BaseModel):
class QAGenerationUpdate(LoggerUpdate, BaseModel):
answer: str = ""
log_messages: list[str] = []
cited_docs: Annotated[list[InferenceSection], dedup_inference_sections] = []
# answer_stat: AnswerStats
class RetrievalIngestionUpdate(BaseModel):
class RetrievalIngestionUpdate(LoggerUpdate, BaseModel):
expanded_retrieval_results: list[QueryResult] = []
documents: Annotated[list[InferenceSection], dedup_inference_sections] = []
context_documents: Annotated[list[InferenceSection], dedup_inference_sections] = []
@ -62,7 +63,7 @@ class AnswerQuestionState(
## Graph Output State
class AnswerQuestionOutput(BaseModel):
class AnswerQuestionOutput(LoggerUpdate, BaseModel):
"""
This is a list of results even though each call of this subgraph only returns one result.
This is because if we parallelize the answer query subgraph, there will be multiple

View File

@ -18,7 +18,7 @@ from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
def parallelize_initial_sub_question_answering(
state: SearchSQState,
) -> list[Send | Hashable]:
now_start = datetime.now()
edge_start_time = datetime.now()
if len(state.initial_decomp_questions) > 0:
# sub_question_record_ids = [subq_record.id for subq_record in state["sub_question_records"]]
# if len(state["sub_question_records"]) == 0:
@ -37,7 +37,7 @@ def parallelize_initial_sub_question_answering(
question=question,
question_id=make_question_id(0, question_nr + 1),
log_messages=[
f"{now_start} -- Main Edge - Parallelize Initial Sub-question Answering"
f"{edge_start_time} -- Main Edge - Parallelize Initial Sub-question Answering"
],
),
)

View File

@ -9,6 +9,6 @@ from onyx.agents.agent_search.deep_search_a.main.states import LoggerUpdate
def consolidate_retrieved_documents(
state: SearchSQState,
) -> LoggerUpdate:
now_start = datetime.now()
node_start_time = datetime.now()
return LoggerUpdate(log_messages=[f"{now_start} -- Retrieval consolidation"])
return LoggerUpdate(log_messages=[f"{node_start_time} -- Retrieval consolidation"])

View File

@ -45,6 +45,9 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
dispatch_main_answer_stop_info,
)
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import relevance_from_docs
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import AgentAnswerPiece
@ -58,9 +61,7 @@ from onyx.tools.tool_implementations.search.search_tool import yield_search_resp
def generate_initial_answer(
state: SearchSQState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> InitialAnswerUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------GENERATE INITIAL---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = agent_a_config.search_request.query
@ -240,8 +241,6 @@ def generate_initial_answer(
logger.debug(initial_agent_stats.sub_questions)
logger.debug(initial_agent_stats.agent_effectiveness)
now_end = datetime.now()
agent_base_end_time = datetime.now()
if agent_base_end_time and state.agent_start_time:
@ -268,10 +267,6 @@ def generate_initial_answer(
duration__s=duration__s,
)
logger.info(
f"{now_start} -- Main - Initial Answer generation, Time taken: {now_end - now_start}"
)
return InitialAnswerUpdate(
initial_answer=answer,
initial_agent_stats=initial_agent_stats,
@ -279,6 +274,11 @@ def generate_initial_answer(
agent_base_end_time=agent_base_end_time,
agent_base_metrics=agent_base_metrics,
log_messages=[
f"{now_start} -- Main - Initial Answer generation, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate initial answer",
node_name="generate initial answer",
node_start_time=node_start_time,
result="",
)
],
)

View File

@ -3,19 +3,19 @@ from datetime import datetime
from onyx.agents.agent_search.deep_search_a.initial.retrieve_orig_question_docs.states import (
BaseRawSearchOutput,
)
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
ExpandedRetrievalUpdate,
)
from onyx.agents.agent_search.shared_graph_utils.models import AgentChunkStats
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def ingest_retrieved_documents(
state: BaseRawSearchOutput,
) -> ExpandedRetrievalUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------INGEST INITIAL RETRIEVAL---")
node_start_time = datetime.now()
sub_question_retrieval_stats = (
state.base_expanded_retrieval_result.sub_question_retrieval_stats
@ -27,17 +27,16 @@ def ingest_retrieved_documents(
sub_question_retrieval_stats = sub_question_retrieval_stats or AgentChunkStats()
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INGEST INITIAL RETRIEVAL END---"
)
return ExpandedRetrievalUpdate(
original_question_retrieval_results=state.base_expanded_retrieval_result.expanded_queries_results,
all_original_question_documents=state.base_expanded_retrieval_result.context_documents,
original_question_retrieval_stats=sub_question_retrieval_stats,
log_messages=[
f"{now_start} -- Main - Ingestion base retrieval, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate initial answer",
node_name="ingest retrieved documents",
node_start_time=node_start_time,
result="",
)
],
)

View File

@ -7,6 +7,9 @@ from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
InitialAnswerQualityUpdate,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def validate_initial_answer(state: SearchSQState) -> InitialAnswerQualityUpdate:
@ -20,23 +23,22 @@ def validate_initial_answer(state: SearchSQState) -> InitialAnswerQualityUpdate:
InitialAnswerQualityUpdate
"""
now_start = datetime.now()
node_start_time = datetime.now()
logger.debug(
f"--------{now_start}--------Checking for base answer validity - for not set True/False manually"
f"--------{node_start_time}--------Checking for base answer validity - for not set True/False manually"
)
verdict = True
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INITIAL ANSWER QUALITY CHECK END---"
)
return InitialAnswerQualityUpdate(
initial_answer_quality_eval=verdict,
log_messages=[
f"{now_start} -- Main - Initial answer quality check, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate initial answer",
node_name="validate initial answer",
node_start_time=node_start_time,
result="",
)
],
)

View File

@ -18,7 +18,7 @@ from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
def parallelize_initial_sub_question_answering(
state: SearchSQState,
) -> list[Send | Hashable]:
now_start = datetime.now()
edge_start_time = datetime.now()
if len(state.initial_decomp_questions) > 0:
# sub_question_record_ids = [subq_record.id for subq_record in state["sub_question_records"]]
# if len(state["sub_question_records"]) == 0:
@ -37,7 +37,7 @@ def parallelize_initial_sub_question_answering(
question=question,
question_id=make_question_id(0, question_nr + 1),
log_messages=[
f"{now_start} -- Main Edge - Parallelize Initial Sub-question Answering"
f"{edge_start_time} -- Main Edge - Parallelize Initial Sub-question Answering"
],
),
)

View File

@ -15,7 +15,6 @@ from onyx.agents.agent_search.deep_search_a.main.models import (
from onyx.agents.agent_search.deep_search_a.main.operations import (
dispatch_subquestion,
)
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import BaseDecompUpdate
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
@ -28,6 +27,9 @@ from onyx.agents.agent_search.shared_graph_utils.prompts import (
INITIAL_DECOMPOSITION_PROMPT_QUESTIONS_AFTER_SEARCH,
)
from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import StreamStopInfo
from onyx.chat.models import StreamStopReason
@ -38,9 +40,7 @@ from onyx.configs.agent_configs import AGENT_NUM_DOCS_FOR_DECOMPOSITION
def decompose_orig_question(
state: SearchSQState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> BaseDecompUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------BASE DECOMP START---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = agent_a_config.search_request.query
@ -123,12 +123,6 @@ def decompose_orig_question(
decomp_list: list[str] = [sq.strip() for sq in list_of_subqs if sq.strip() != ""]
now_end = datetime.now()
logger.info(
f"{now_start} -- INITIAL SUBQUESTION ANSWERING - Base Decomposition, Time taken: {now_end - now_start}"
)
return BaseDecompUpdate(
initial_decomp_questions=decomp_list,
agent_start_time=agent_start_time,
@ -139,4 +133,12 @@ def decompose_orig_question(
refined_question_boost_factor=None,
duration__s=None,
),
log_messages=[
get_langgraph_node_log_string(
graph_component="initial - generate sub answers",
node_name="decompose original question",
node_start_time=node_start_time,
result=f"decomposed original question into {len(decomp_list)} subquestions",
)
],
)

View File

@ -3,21 +3,22 @@ from datetime import datetime
from onyx.agents.agent_search.deep_search_a.initial.generate_individual_sub_answer.states import (
AnswerQuestionOutput,
)
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
DecompAnswersUpdate,
)
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_sections,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def format_initial_sub_answers(
state: AnswerQuestionOutput,
) -> DecompAnswersUpdate:
now_start = datetime.now()
node_start_time = datetime.now()
logger.info(f"--------{now_start}--------INGEST ANSWERS---")
documents = []
context_documents = []
cited_docs = []
@ -26,11 +27,6 @@ def format_initial_sub_answers(
documents.extend(answer_result.documents)
context_documents.extend(answer_result.context_documents)
cited_docs.extend(answer_result.cited_docs)
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INGEST ANSWERS END---"
)
return DecompAnswersUpdate(
# Deduping is done by the documents operator for the main graph
@ -40,6 +36,11 @@ def format_initial_sub_answers(
cited_docs=dedup_inference_sections(cited_docs, []),
sub_question_results=answer_results,
log_messages=[
f"{now_start} -- Main - Ingest initial processed sub questions, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="initial - generate sub answers",
node_name="format initial sub answers",
node_start_time=node_start_time,
result="",
)
],
)

View File

@ -43,7 +43,7 @@ def route_initial_tool_choice(
def parallelize_initial_sub_question_answering(
state: MainState,
) -> list[Send | Hashable]:
now_start = datetime.now()
edge_start_time = datetime.now()
if len(state.initial_decomp_questions) > 0:
# sub_question_record_ids = [subq_record.id for subq_record in state["sub_question_records"]]
# if len(state["sub_question_records"]) == 0:
@ -62,7 +62,7 @@ def parallelize_initial_sub_question_answering(
question=question,
question_id=make_question_id(0, question_nr + 1),
log_messages=[
f"{now_start} -- Main Edge - Parallelize Initial Sub-question Answering"
f"{edge_start_time} -- Main Edge - Parallelize Initial Sub-question Answering"
],
),
)
@ -93,7 +93,7 @@ def continue_to_refined_answer_or_end(
def parallelize_refined_sub_question_answering(
state: MainState,
) -> list[Send | Hashable]:
now_start = datetime.now()
edge_start_time = datetime.now()
if len(state.refined_sub_questions) > 0:
return [
Send(
@ -102,7 +102,7 @@ def parallelize_refined_sub_question_answering(
question=question_data.sub_question,
question_id=make_question_id(1, question_nr),
log_messages=[
f"{now_start} -- Main Edge - Parallelize Refined Sub-question Answering"
f"{edge_start_time} -- Main Edge - Parallelize Refined Sub-question Answering"
],
),
)

View File

@ -5,11 +5,13 @@ from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph.types import StreamWriter
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import AnswerComparison
from onyx.agents.agent_search.deep_search_a.main.states import MainState
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.prompts import ANSWER_COMPARISON_PROMPT
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import RefinedAnswerImprovement
@ -17,15 +19,13 @@ from onyx.chat.models import RefinedAnswerImprovement
def compare_answers(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> AnswerComparison:
now_start = datetime.now()
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = agent_a_config.search_request.query
initial_answer = state.initial_answer
refined_answer = state.refined_answer
logger.info(f"--------{now_start}--------ANSWER COMPARISON STARTED--")
compare_answers_prompt = ANSWER_COMPARISON_PROMPT.format(
question=question, initial_answer=initial_answer, refined_answer=refined_answer
)
@ -50,15 +50,14 @@ def compare_answers(
writer,
)
now_end = datetime.now()
logger.info(
f"{now_start} -- MAIN - Answer comparison, Time taken: {now_end - now_start}"
)
return AnswerComparison(
refined_answer_improvement_eval=refined_answer_improvement,
log_messages=[
f"{now_start} -- Answer comparison: {refined_answer_improvement}, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="compare answers",
node_start_time=node_start_time,
result=f"Answer comparison: {refined_answer_improvement}",
)
],
)

View File

@ -12,7 +12,6 @@ from onyx.agents.agent_search.deep_search_a.main.models import (
from onyx.agents.agent_search.deep_search_a.main.operations import (
dispatch_subquestion,
)
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
FollowUpSubQuestionsUpdate,
)
@ -28,6 +27,9 @@ from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
format_entity_term_extraction,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.tools.models import ToolCallKickoff
@ -50,9 +52,7 @@ def create_refined_sub_questions(
writer,
)
now_start = datetime.now()
logger.info(f"--------{now_start}--------FOLLOW UP DECOMPOSE---")
node_start_time = datetime.now()
agent_refined_start_time = datetime.now()
@ -114,13 +114,15 @@ def create_refined_sub_questions(
refined_sub_question_dict[sub_question_nr + 1] = refined_sub_question
now_end = datetime.now()
logger.info(
f"{now_start} -- MAIN - Refined sub question creation, Time taken: {now_end - now_start}"
)
return FollowUpSubQuestionsUpdate(
refined_sub_questions=refined_sub_question_dict,
agent_refined_start_time=agent_refined_start_time,
log_messages=[
get_langgraph_node_log_string(
graph_component="main",
node_name="create refined sub questions",
node_start_time=node_start_time,
result=f"Created {len(refined_sub_question_dict)} refined sub questions",
)
],
)

View File

@ -3,39 +3,39 @@ from typing import cast
from langchain_core.runnables import RunnableConfig
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import MainState
from onyx.agents.agent_search.deep_search_a.main.states import (
RequireRefinedAnswerUpdate,
)
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def decide_refinement_need(
state: MainState, config: RunnableConfig
) -> RequireRefinedAnswerUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------REFINED ANSWER DECISION---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
decision = True # TODO: just for current testing purposes
now_end = datetime.now()
logger.info(
f"{now_start} -- MAIN - Refined answer decision, Time taken: {now_end - now_start}"
)
log_messages = [
f"{now_start} -- Main - Refined answer decision: {decision}, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="decide refinement need",
node_start_time=node_start_time,
result=f"Refinement decision: {decision}",
)
]
if agent_a_config.allow_refinement:
return RequireRefinedAnswerUpdate(
require_refined_answer_eval=decision,
log_messages=log_messages,
)
else:
return RequireRefinedAnswerUpdate(
require_refined_answer_eval=False,

View File

@ -23,18 +23,18 @@ from onyx.agents.agent_search.shared_graph_utils.models import Relationship
from onyx.agents.agent_search.shared_graph_utils.models import Term
from onyx.agents.agent_search.shared_graph_utils.prompts import ENTITY_TERM_PROMPT
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def extract_entities_terms(
state: MainState, config: RunnableConfig
) -> EntityTermExtractionUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------GENERATE ENTITIES & TERMS---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
if not agent_a_config.allow_refinement:
now_end = datetime.now()
return EntityTermExtractionUpdate(
entity_relation_term_extractions=EntityRelationshipTermExtraction(
entities=[],
@ -42,7 +42,12 @@ def extract_entities_terms(
terms=[],
),
log_messages=[
f"{now_start} -- Main - ETR Extraction, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="extract entities terms",
node_start_time=node_start_time,
result="Refinement is not allowed",
)
],
)
@ -115,12 +120,6 @@ def extract_entities_terms(
)
)
now_end = datetime.now()
logger.info(
f"{now_start} -- MAIN - Entity term extraction, Time taken: {now_end - now_start}"
)
return EntityTermExtractionUpdate(
entity_relation_term_extractions=EntityRelationshipTermExtraction(
entities=entities,
@ -128,6 +127,10 @@ def extract_entities_terms(
terms=terms,
),
log_messages=[
f"{now_start} -- Main - ETR Extraction, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="extract entities terms",
node_start_time=node_start_time,
)
],
)

View File

@ -43,6 +43,9 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
dispatch_main_answer_stop_info,
)
from onyx.agents.agent_search.shared_graph_utils.utils import format_docs
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import parse_question_id
from onyx.agents.agent_search.shared_graph_utils.utils import relevance_from_docs
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
@ -56,9 +59,7 @@ from onyx.tools.tool_implementations.search.search_tool import yield_search_resp
def generate_refined_answer(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> RefinedAnswerUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------GENERATE REFINED ANSWER---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = agent_a_config.search_request.query
@ -180,7 +181,7 @@ def generate_refined_answer(
# original answer
initial_answer = state.initial_answer
initial_answer = state.initial_answer or ""
# Determine which persona-specification prompt to use
@ -212,7 +213,9 @@ def generate_refined_answer(
sub_question_answer_str
),
relevant_docs=relevant_docs,
initial_answer=remove_document_citations(initial_answer),
initial_answer=remove_document_citations(initial_answer)
if initial_answer
else None,
persona_specification=persona_contextualized_prompt,
date_prompt=prompt_enrichment_components.date_str,
)
@ -305,12 +308,6 @@ def generate_refined_answer(
f"Revision Question Factor: {refined_agent_stats.revision_question_efficiency}"
)
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INITIAL AGENT ANSWER END---\n\n"
)
agent_refined_end_time = datetime.now()
if state.agent_refined_start_time:
agent_refined_duration = (
@ -325,12 +322,6 @@ def generate_refined_answer(
duration__s=agent_refined_duration,
)
now_end = datetime.now()
logger.info(
f"{now_start} -- MAIN - Generate refined answer, Time taken: {now_end - now_start}"
)
return RefinedAnswerUpdate(
refined_answer=answer,
refined_answer_quality=True, # TODO: replace this with the actual check value
@ -338,6 +329,10 @@ def generate_refined_answer(
agent_refined_end_time=agent_refined_end_time,
agent_refined_metrics=agent_refined_metrics,
log_messages=[
f"{now_start} -- MAIN - Generate refined answer, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="generate refined answer",
node_start_time=node_start_time,
)
],
)

View File

@ -3,39 +3,37 @@ from datetime import datetime
from onyx.agents.agent_search.deep_search_a.initial.generate_individual_sub_answer.states import (
AnswerQuestionOutput,
)
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
DecompAnswersUpdate,
)
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_sections,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
def ingest_refined_answers(
state: AnswerQuestionOutput,
) -> DecompAnswersUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------INGEST FOLLOW UP ANSWERS---")
node_start_time = datetime.now()
documents = []
answer_results = state.answer_results if hasattr(state, "answer_results") else []
for answer_result in answer_results:
documents.extend(answer_result.documents)
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INGEST FOLLOW UP ANSWERS END---"
)
return DecompAnswersUpdate(
# Deduping is done by the documents operator for the main graph
# so we might not need to dedup here
documents=dedup_inference_sections(documents, []),
sub_question_results=answer_results,
log_messages=[
f"{now_start} -- Main - Ingest refined answers, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="ingest refined answers",
node_start_time=node_start_time,
)
],
)

View File

@ -12,14 +12,15 @@ from onyx.agents.agent_search.deep_search_a.main.states import MainOutput
from onyx.agents.agent_search.deep_search_a.main.states import MainState
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.models import CombinedAgentMetrics
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.db.chat import log_agent_metrics
from onyx.db.chat import log_agent_sub_question_results
def persist_agent_results(state: MainState, config: RunnableConfig) -> MainOutput:
now_start = datetime.now()
logger.info(f"--------{now_start}--------LOGGING NODE---")
node_start_time = datetime.now()
agent_start_time = state.agent_start_time
agent_base_end_time = state.agent_base_end_time
@ -94,20 +95,19 @@ def persist_agent_results(state: MainState, config: RunnableConfig) -> MainOutpu
sub_question_answer_results=sub_question_answer_results,
)
now_end = datetime.now()
main_output = MainOutput(
log_messages=[
f"{now_start} -- Main - Logging, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="persist agent results",
node_start_time=node_start_time,
)
],
)
logger.info(f"--------{now_end}--{now_end - now_start}--------LOGGING NODE END---")
for log_message in state.log_messages:
logger.info(log_message)
logger.info("")
if state.agent_base_metrics:
logger.info(f"Initial loop: {state.agent_base_metrics.duration__s}")
if state.agent_refined_metrics:

View File

@ -3,7 +3,6 @@ from typing import cast
from langchain_core.runnables import RunnableConfig
from onyx.agents.agent_search.deep_search_a.main.operations import logger
from onyx.agents.agent_search.deep_search_a.main.states import (
ExploratorySearchUpdate,
)
@ -12,6 +11,9 @@ from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.agent_prompt_ops import (
build_history_prompt,
)
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import retrieve_search_docs
from onyx.configs.agent_configs import AGENT_EXPLORATORY_SEARCH_RESULTS
from onyx.context.search.models import InferenceSection
@ -20,9 +22,7 @@ from onyx.context.search.models import InferenceSection
def start_agent_search(
state: MainState, config: RunnableConfig
) -> ExploratorySearchUpdate:
now_start = datetime.now()
logger.info(f"--------{now_start}--------EXPLORATORY SEARCH START---")
node_start_time = datetime.now()
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
question = agent_a_config.search_request.query
@ -45,15 +45,15 @@ def start_agent_search(
retrieved_docs: list[InferenceSection] = retrieve_search_docs(search_tool, question)
exploratory_search_results = retrieved_docs[:AGENT_EXPLORATORY_SEARCH_RESULTS]
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------EXPLORATORY SEARCH END---"
)
return ExploratorySearchUpdate(
exploratory_search_results=exploratory_search_results,
previous_history_summary=history,
log_messages=[
f"{now_start} -- Main - Exploratory Search, Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="main",
node_name="start agent search",
node_start_time=node_start_time,
)
],
)

View File

@ -55,7 +55,7 @@ class RefinedAgentEndStats(BaseModel):
agent_refined_metrics: AgentRefinedMetrics = AgentRefinedMetrics()
class BaseDecompUpdate(RefinedAgentStartStats, RefinedAgentEndStats):
class BaseDecompUpdate(RefinedAgentStartStats, RefinedAgentEndStats, LoggerUpdate):
agent_start_time: datetime | None = None
previous_history: str | None = None
initial_decomp_questions: list[str] = []
@ -76,19 +76,19 @@ class RoutingDecision(LoggerUpdate):
# Not used in current graph
class InitialAnswerBASEUpdate(BaseModel):
initial_base_answer: str
initial_base_answer: str | None = None
class InitialAnswerUpdate(LoggerUpdate):
initial_answer: str
initial_answer: str | None = None
initial_agent_stats: InitialAgentResultStats | None = None
generated_sub_questions: list[str] = []
agent_base_end_time: datetime
agent_base_end_time: datetime | None = None
agent_base_metrics: AgentBaseMetrics | None = None
class RefinedAnswerUpdate(RefinedAgentEndStats, LoggerUpdate):
refined_answer: str
refined_answer: str | None = None
refined_agent_stats: RefinedAgentStats | None = None
refined_answer_quality: bool = False
@ -131,7 +131,7 @@ class EntityTermExtractionUpdate(LoggerUpdate):
)
class FollowUpSubQuestionsUpdate(RefinedAgentStartStats):
class FollowUpSubQuestionsUpdate(RefinedAgentStartStats, LoggerUpdate):
refined_sub_questions: dict[int, FollowUpSubQuestion] = {}

View File

@ -9,9 +9,6 @@ from langgraph.types import StreamWriter
from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.operations import (
dispatch_subquery,
)
from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.operations import (
logger,
)
from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.states import (
ExpandedRetrievalInput,
)
@ -23,6 +20,9 @@ from onyx.agents.agent_search.shared_graph_utils.prompts import (
REWRITE_PROMPT_MULTI_ORIGINAL,
)
from onyx.agents.agent_search.shared_graph_utils.utils import dispatch_separated
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.agents.agent_search.shared_graph_utils.utils import parse_question_id
@ -35,7 +35,7 @@ def expand_queries(
# When we are running this node on the original question, no question is explictly passed in.
# Instead, we use the original question from the search request.
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
now_start = datetime.now()
node_start_time = datetime.now()
question = state.question
llm = agent_a_config.fast_llm
@ -62,13 +62,15 @@ def expand_queries(
llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content
rewritten_queries = llm_response.split("\n")
now_end = datetime.now()
logger.info(
f"{now_start} -- Expanded Retrieval - Query Expansion - Time taken: {now_end - now_start}"
)
return QueryExpansionUpdate(
expanded_queries=rewritten_queries,
log_messages=[
f"{now_start} -- Expanded Retrieval - Query Expansion - Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="shared - expanded retrieval",
node_name="expand queries",
node_start_time=node_start_time,
result=f"Number of expanded queries: {len(rewritten_queries)}",
)
],
)

View File

@ -15,6 +15,9 @@ from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.states imp
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.calculations import get_fit_scores
from onyx.agents.agent_search.shared_graph_utils.models import RetrievalFitStats
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.configs.agent_configs import AGENT_RERANKING_MAX_QUERY_RETRIEVAL_RESULTS
from onyx.configs.agent_configs import AGENT_RERANKING_STATS
from onyx.context.search.models import InferenceSection
@ -27,7 +30,7 @@ from onyx.db.engine import get_session_context_manager
def rerank_documents(
state: ExpandedRetrievalState, config: RunnableConfig
) -> DocRerankingUpdate:
now_start = datetime.now()
node_start_time = datetime.now()
verified_documents = state.verified_documents
# Rerank post retrieval and verification. First, create a search query
@ -80,16 +83,16 @@ def rerank_documents(
else:
fit_scores = RetrievalFitStats(fit_score_lift=0, rerank_effect=0, fit_scores={})
now_end = datetime.now()
logger.info(
f"{now_start} -- Expanded Retrieval - Reranking - Time taken: {now_end - now_start}"
)
return DocRerankingUpdate(
reranked_documents=[
doc for doc in reranked_documents if type(doc) == InferenceSection
][:AGENT_RERANKING_MAX_QUERY_RETRIEVAL_RESULTS],
sub_question_retrieval_stats=fit_scores,
log_messages=[
f"{now_start} -- Expanded Retrieval - Reranking - Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="shared - expanded retrieval",
node_name="rerank documents",
node_start_time=node_start_time,
)
],
)

View File

@ -15,6 +15,9 @@ from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.states imp
from onyx.agents.agent_search.models import AgentSearchConfig
from onyx.agents.agent_search.shared_graph_utils.calculations import get_fit_scores
from onyx.agents.agent_search.shared_graph_utils.models import QueryResult
from onyx.agents.agent_search.shared_graph_utils.utils import (
get_langgraph_node_log_string,
)
from onyx.configs.agent_configs import AGENT_MAX_QUERY_RETRIEVAL_RESULTS
from onyx.configs.agent_configs import AGENT_RETRIEVAL_STATS
from onyx.context.search.models import InferenceSection
@ -40,7 +43,7 @@ def retrieve_documents(
expanded_retrieval_results: list[ExpandedRetrievalResult]
retrieved_documents: list[InferenceSection]
"""
now_start = datetime.now()
node_start_time = datetime.now()
query_to_retrieve = state.query_to_retrieve
agent_a_config = cast(AgentSearchConfig, config["metadata"]["config"])
search_tool = agent_a_config.search_tool
@ -48,12 +51,17 @@ def retrieve_documents(
retrieved_docs: list[InferenceSection] = []
if not query_to_retrieve.strip():
logger.warning("Empty query, skipping retrieval")
now_end = datetime.now()
return DocRetrievalUpdate(
expanded_retrieval_results=[],
retrieved_documents=[],
log_messages=[
f"{now_start} -- Expanded Retrieval - Retrieval - Empty Query - Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="shared - expanded retrieval",
node_name="retrieve documents",
node_start_time=node_start_time,
result="Empty query, skipping retrieval",
)
],
)
@ -99,14 +107,15 @@ def retrieve_documents(
stats=fit_scores,
query_info=query_info,
)
now_end = datetime.now()
logger.info(
f"{now_start} -- Expanded Retrieval - Retrieval - Time taken: {now_end - now_start}"
)
return DocRetrievalUpdate(
expanded_retrieval_results=[expanded_retrieval_result],
retrieved_documents=retrieved_docs,
log_messages=[
f"{now_start} -- Expanded Retrieval - Retrieval - Time taken: {now_end - now_start}"
get_langgraph_node_log_string(
graph_component="shared - expanded retrieval",
node_name="retrieve documents",
node_start_time=node_start_time,
)
],
)

View File

@ -4,6 +4,7 @@ from typing import Annotated
from pydantic import BaseModel
from onyx.agents.agent_search.core_state import SubgraphCoreState
from onyx.agents.agent_search.deep_search_a.main.states import LoggerUpdate
from onyx.agents.agent_search.deep_search_a.shared.expanded_retrieval.models import (
ExpandedRetrievalResult,
)
@ -14,7 +15,6 @@ from onyx.agents.agent_search.shared_graph_utils.operators import (
)
from onyx.context.search.models import InferenceSection
### States ###
## Graph Input State
@ -29,7 +29,7 @@ class ExpandedRetrievalInput(SubgraphCoreState):
## Update/Return States
class QueryExpansionUpdate(BaseModel):
class QueryExpansionUpdate(LoggerUpdate, BaseModel):
expanded_queries: list[str] = []
log_messages: list[str] = []
@ -38,31 +38,28 @@ class DocVerificationUpdate(BaseModel):
verified_documents: Annotated[list[InferenceSection], dedup_inference_sections] = []
class DocRetrievalUpdate(BaseModel):
class DocRetrievalUpdate(LoggerUpdate, BaseModel):
expanded_retrieval_results: Annotated[list[QueryResult], add] = []
retrieved_documents: Annotated[
list[InferenceSection], dedup_inference_sections
] = []
log_messages: list[str] = []
class DocRerankingUpdate(BaseModel):
class DocRerankingUpdate(LoggerUpdate, BaseModel):
reranked_documents: Annotated[list[InferenceSection], dedup_inference_sections] = []
sub_question_retrieval_stats: RetrievalFitStats | None = None
log_messages: list[str] = []
class ExpandedRetrievalUpdate(BaseModel):
class ExpandedRetrievalUpdate(LoggerUpdate, BaseModel):
expanded_retrieval_result: ExpandedRetrievalResult
## Graph Output State
class ExpandedRetrievalOutput(BaseModel):
class ExpandedRetrievalOutput(LoggerUpdate, BaseModel):
expanded_retrieval_result: ExpandedRetrievalResult = ExpandedRetrievalResult()
base_expanded_retrieval_result: ExpandedRetrievalResult = ExpandedRetrievalResult()
log_messages: list[str] = []
## Graph State

View File

@ -202,13 +202,13 @@ def run_basic_graph(
if __name__ == "__main__":
from onyx.llm.factory import get_default_llms
for _ in range(2):
now_start = datetime.now()
logger.debug(f"Start at {now_start}")
for _ in range(1):
query_start_time = datetime.now()
logger.debug(f"Start at {query_start_time}")
graph = main_graph_builder_a()
compiled_graph = graph.compile()
now_end = datetime.now()
logger.debug(f"Graph compiled in {now_end - now_start} seconds")
query_end_time = datetime.now()
logger.debug(f"Graph compiled in {query_end_time - query_start_time} seconds")
primary_llm, fast_llm = get_default_llms()
search_request = SearchRequest(
# query="what can you do with gitlab?",

View File

@ -160,23 +160,6 @@ def _format_time_delta(time: timedelta) -> str:
return f"{seconds_from_start}.{microseconds_from_start}"
def generate_log_message(
message: str,
node_start_time: datetime,
graph_start_time: datetime | None = None,
) -> str:
current_time = datetime.now()
if graph_start_time is not None:
graph_time_str = _format_time_delta(current_time - graph_start_time)
else:
graph_time_str = "N/A"
node_time_str = _format_time_delta(current_time - node_start_time)
return f"{graph_time_str} ({node_time_str} s): {message}"
def get_test_config(
db_session: Session,
primary_llm: LLM,
@ -405,3 +388,16 @@ def relevance_from_docs(
)
for doc in relevant_docs
]
def get_langgraph_node_log_string(
graph_component: str,
node_name: str,
node_start_time: datetime,
result: str | None = None,
) -> str:
duration = datetime.now() - node_start_time
if result is None:
return f"{node_start_time} -- {graph_component} - {node_name} -- Time taken: {duration}"
else:
return f"{node_start_time} -- {graph_component} - {node_name} -- Time taken: {duration} -- Result: {result}"