LangGraph comments

This commit is contained in:
joachim-danswer 2025-02-03 14:35:55 -08:00 committed by Evan Lohn
parent 4834ee6223
commit 3cd057d7a2
40 changed files with 230 additions and 115 deletions

View File

@ -1,45 +0,0 @@
from datetime import datetime
from onyx.agents.agent_search.deep_search.initial.generate_individual_sub_answer.states import (
AnswerQuestionOutput,
)
from onyx.agents.agent_search.deep_search.main.operations import logger
from onyx.agents.agent_search.deep_search.main.states import (
SubQuestionResultsUpdate,
)
from onyx.agents.agent_search.shared_graph_utils.operators import (
dedup_inference_sections,
)
def format_initial_sub_answers(
state: AnswerQuestionOutput,
) -> SubQuestionResultsUpdate:
now_start = datetime.now()
logger.debug(f"--------{now_start}--------INGEST ANSWERS---")
documents = []
context_documents = []
cited_documents = []
answer_results = state.answer_results
for answer_result in answer_results:
documents.extend(answer_result.verified_reranked_documents)
context_documents.extend(answer_result.context_documents)
cited_documents.extend(answer_result.cited_documents)
now_end = datetime.now()
logger.debug(
f"--------{now_end}--{now_end - now_start}--------INGEST ANSWERS END---"
)
return SubQuestionResultsUpdate(
# Deduping is done by the documents operator for the main graph
# so we might not need to dedup here
verified_reranked_documents=dedup_inference_sections(documents, []),
context_documents=dedup_inference_sections(context_documents, []),
cited_documents=dedup_inference_sections(cited_documents, []),
sub_question_results=answer_results,
log_messages=[
f"{now_start} -- Main - Ingest initial processed sub questions, Time taken: {now_end - now_start}"
],
)

View File

@ -15,6 +15,9 @@ logger = setup_logger()
def send_to_expanded_retrieval(state: SubQuestionAnsweringInput) -> Send | Hashable:
"""
LangGraph edge to send a sub-question to the expanded retrieval.
"""
edge_start_time = datetime.now()
return Send(

View File

@ -36,6 +36,9 @@ logger = setup_logger()
def answer_query_graph_builder() -> StateGraph:
"""
LangGraph sub-graph builder for the initial individual sub-answer generation.
"""
graph = StateGraph(
state_schema=AnswerQuestionState,
input=SubQuestionAnsweringInput,
@ -44,27 +47,37 @@ def answer_query_graph_builder() -> StateGraph:
### Add nodes ###
# The sub-graph that executes the expanded retrieval process for a sub-question
expanded_retrieval = expanded_retrieval_graph_builder().compile()
graph.add_node(
node="initial_sub_question_expanded_retrieval",
action=expanded_retrieval,
)
# The node that ingests the retrieved documents and puts them into the proper
# state keys.
graph.add_node(
node="answer_check",
action=check_sub_answer,
node="ingest_retrieval",
action=ingest_retrieved_documents,
)
# The node that generates the sub-answer
graph.add_node(
node="generate_sub_answer",
action=generate_sub_answer,
)
# The node that checks the sub-answer
graph.add_node(
node="answer_check",
action=check_sub_answer,
)
# The node that formats the sub-answer for the following initial answer generation
graph.add_node(
node="format_answer",
action=format_sub_answer,
)
graph.add_node(
node="ingest_retrieval",
action=ingest_retrieved_documents,
)
### Add edges ###

View File

@ -23,6 +23,10 @@ from onyx.prompts.agent_search import UNKNOWN_ANSWER
def check_sub_answer(
state: AnswerQuestionState, config: RunnableConfig
) -> SubQuestionAnswerCheckUpdate:
"""
LangGraph node to check the quality of the sub-answer. The answer
is represented as a boolean value.
"""
node_start_time = datetime.now()
level, question_num = parse_question_id(state.question_id)

View File

@ -10,6 +10,9 @@ from onyx.agents.agent_search.shared_graph_utils.models import (
def format_sub_answer(state: AnswerQuestionState) -> AnswerQuestionOutput:
"""
LangGraph node to generate the sub-answer format.
"""
return AnswerQuestionOutput(
answer_results=[
SubQuestionAnswerResults(

View File

@ -41,6 +41,9 @@ def generate_sub_answer(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> SubQuestionAnswerGenerationUpdate:
"""
LangGraph node to generate a sub-answer.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -10,6 +10,9 @@ from onyx.agents.agent_search.shared_graph_utils.models import AgentChunkRetriev
def ingest_retrieved_documents(
state: ExpandedRetrievalOutput,
) -> SubQuestionRetrievalIngestionUpdate:
"""
LangGraph node to ingest the retrieved documents to format it for the sub-answer.
"""
sub_question_retrieval_stats = state.expanded_retrieval_result.retrieval_stats
if sub_question_retrieval_stats is None:
sub_question_retrieval_stats = [AgentChunkRetrievalStats()]

View File

@ -18,6 +18,11 @@ from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
def parallelize_initial_sub_question_answering(
state: SubQuestionRetrievalState,
) -> list[Send | Hashable]:
"""
LangGraph edge to parallelize the initial sub-question answering. If there are no sub-questions,
we send empty answers to the initial answer generation, and that answer would be generated
solely based on the documents retrieved for the original question.
"""
edge_start_time = datetime.now()
if len(state.initial_sub_questions) > 0:
# sub_question_record_ids = [subq_record.id for subq_record in state["sub_question_records"]]

View File

@ -26,33 +26,37 @@ logger = setup_logger()
def generate_initial_answer_graph_builder(test_mode: bool = False) -> StateGraph:
"""
LangGraph graph builder for the initial answer generation.
"""
graph = StateGraph(
state_schema=SubQuestionRetrievalState,
input=SubQuestionRetrievalInput,
)
# The sub-graph that generates the initial sub-answers
generate_sub_answers = generate_sub_answers_graph_builder().compile()
graph.add_node(
node="generate_sub_answers_subgraph",
action=generate_sub_answers,
)
# The sub-graph that retrieves the original question documents. This is run
# in parallel with the sub-answer generation process.
retrieve_orig_question_docs = retrieve_orig_question_docs_graph_builder().compile()
graph.add_node(
node="retrieve_orig_question_docs_subgraph_wrapper",
action=retrieve_orig_question_docs,
)
# graph.add_node(
# node="retrieval_consolidation",
# action=consolidate_retrieved_documents,
# )
# Node that generates the initial answer using the results of the previous
# two sub-.
graph.add_node(
node="generate_initial_answer",
action=generate_initial_answer,
)
# Node that validates the initial answer
graph.add_node(
node="validate_initial_answer",
action=validate_initial_answer,
@ -70,6 +74,7 @@ def generate_initial_answer_graph_builder(test_mode: bool = False) -> StateGraph
end_key="generate_sub_answers_subgraph",
)
# Wait for both, the original question docs and the sub-answers to be generated before proceeding
graph.add_edge(
start_key=[
"retrieve_orig_question_docs_subgraph_wrapper",

View File

@ -1,14 +0,0 @@
from datetime import datetime
from onyx.agents.agent_search.deep_search.initial.generate_initial_answer.states import (
SubQuestionRetrievalState,
)
from onyx.agents.agent_search.deep_search.main.states import LoggerUpdate
def consolidate_retrieved_documents(
state: SubQuestionRetrievalState,
) -> LoggerUpdate:
node_start_time = datetime.now()
return LoggerUpdate(log_messages=[f"{node_start_time} -- Retrieval consolidation"])

View File

@ -63,6 +63,10 @@ def generate_initial_answer(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> InitialAnswerUpdate:
"""
LangGraph node to generate the initial answer, using the initial sub-questions/sub-answers and the
documents retrieved for the original question.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])
@ -147,6 +151,9 @@ def generate_initial_answer(
else:
sub_question_answer_results = state.sub_question_results
# Collect the sub-questions and sub-answers and construct an appropriate
# prompt string.
# Consider replacing by a function.
answered_sub_questions: list[str] = []
all_sub_questions: list[str] = [] # Separate list for tracking all questions
@ -170,12 +177,13 @@ def generate_initial_answer(
)
)
# Use list comprehension for joining answers and determine prompt type
sub_question_answer_str = (
"\n\n------\n\n".join(answered_sub_questions)
if answered_sub_questions
else ""
)
# Use the appropriate prompt based on whether there are sub-questions.
base_prompt = (
INITIAL_ANSWER_PROMPT_W_SUB_QUESTIONS
if answered_sub_questions

View File

@ -16,13 +16,7 @@ def validate_initial_answer(
state: SubQuestionRetrievalState,
) -> InitialAnswerQualityUpdate:
"""
Check whether the final output satisfies the original user question
Args:
state (messages): The current state
Returns:
InitialAnswerQualityUpdate
Check whether the initial answer sufficiently addresses the original user question.
"""
node_start_time = datetime.now()

View File

@ -18,6 +18,9 @@ from onyx.agents.agent_search.shared_graph_utils.utils import make_question_id
def parallelize_initial_sub_question_answering(
state: SubQuestionRetrievalState,
) -> list[Send | Hashable]:
"""
LangGraph edge to parallelize the initial sub-question answering.
"""
edge_start_time = datetime.now()
if len(state.initial_sub_questions) > 0:
# sub_question_record_ids = [subq_record.id for subq_record in state["sub_question_records"]]

View File

@ -28,21 +28,31 @@ test_mode = False
def generate_sub_answers_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the initial sub-answer generation process.
It generates the initial sub-questions and produces the answers.
"""
graph = StateGraph(
state_schema=SubQuestionAnsweringState,
input=SubQuestionAnsweringInput,
)
# Decompose the original question into sub-questions
graph.add_node(
node="decompose_orig_question",
action=decompose_orig_question,
)
# The sub-graph that executes the initial sub-question answering for
# each of the sub-questions.
answer_sub_question_subgraphs = answer_query_graph_builder().compile()
graph.add_node(
node="answer_sub_question_subgraphs",
action=answer_sub_question_subgraphs,
)
# Node that collects and formats the initial sub-question answers
graph.add_node(
node="format_initial_sub_question_answers",
action=format_initial_sub_answers,

View File

@ -48,6 +48,9 @@ def decompose_orig_question(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> InitialQuestionDecompositionUpdate:
"""
LangGraph node to decompose the original question into sub-questions.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -17,6 +17,10 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
def format_initial_sub_answers(
state: AnswerQuestionOutput,
) -> SubQuestionResultsUpdate:
"""
LangGraph node to format the answers to the initial sub-questions, including
deduping verified documents and context documents.
"""
node_start_time = datetime.now()
documents = []

View File

@ -23,6 +23,12 @@ from onyx.agents.agent_search.deep_search.shared.expanded_retrieval.graph_builde
def retrieve_orig_question_docs_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the retrieval of documents
that are relevant to the original question. This is
largely a wrapper around the expanded retrieval process to
ensure parallelism with the sub-question answer process.
"""
graph = StateGraph(
state_schema=BaseRawSearchState,
input=BaseRawSearchInput,
@ -31,19 +37,23 @@ def retrieve_orig_question_docs_graph_builder() -> StateGraph:
### Add nodes ###
# Format the original question search output
graph.add_node(
node="format_orig_question_search_input",
action=format_orig_question_search_input,
node="format_orig_question_search_output",
action=format_orig_question_search_output,
)
# The sub-graph that executes the expanded retrieval process
expanded_retrieval = expanded_retrieval_graph_builder().compile()
graph.add_node(
node="retrieve_orig_question_docs_subgraph",
action=expanded_retrieval,
)
# Format the original question search input
graph.add_node(
node="format_orig_question_search_output",
action=format_orig_question_search_output,
node="format_orig_question_search_input",
action=format_orig_question_search_input,
)
### Add edges ###

View File

@ -15,6 +15,9 @@ logger = setup_logger()
def format_orig_question_search_input(
state: CoreState, config: RunnableConfig
) -> ExpandedRetrievalInput:
"""
LangGraph node to format the search input for the original question.
"""
logger.debug("generate_raw_search_data")
graph_config = cast(GraphConfig, config["metadata"]["config"])
return ExpandedRetrievalInput(

View File

@ -11,12 +11,10 @@ logger = setup_logger()
def format_orig_question_search_output(
state: ExpandedRetrievalOutput,
) -> OrigQuestionRetrievalUpdate:
# return BaseRawSearchOutput(
# base_expanded_retrieval_result=state.expanded_retrieval_result,
# # base_retrieval_results=[state.expanded_retrieval_result],
# # base_search_documents=[],
# )
"""
LangGraph node to format the search result for the original question into the
proper format.
"""
sub_question_retrieval_stats = state.expanded_retrieval_result.retrieval_stats
if sub_question_retrieval_stats is None:
sub_question_retrieval_stats = AgentChunkRetrievalStats()

View File

@ -26,6 +26,9 @@ logger = setup_logger()
def route_initial_tool_choice(
state: MainState, config: RunnableConfig
) -> Literal["tool_call", "start_agent_search", "logging_node"]:
"""
LangGraph edge to route to agent search.
"""
agent_config = cast(GraphConfig, config["metadata"]["config"])
if state.tool_choice is not None:
if (

View File

@ -29,8 +29,8 @@ from onyx.agents.agent_search.deep_search.main.nodes.extract_entities_terms impo
from onyx.agents.agent_search.deep_search.main.nodes.generate_refined_answer import (
generate_refined_answer,
)
from onyx.agents.agent_search.deep_search.main.nodes.ingest_refined_answers import (
ingest_refined_answers,
from onyx.agents.agent_search.deep_search.main.nodes.ingest_refined_sub_answers import (
ingest_refined_sub_answers,
)
from onyx.agents.agent_search.deep_search.main.nodes.persist_agent_results import (
persist_agent_results,
@ -60,72 +60,97 @@ test_mode = False
def main_graph_builder(test_mode: bool = False) -> StateGraph:
"""
LangGraph graph builder for the main agent search process.
"""
graph = StateGraph(
state_schema=MainState,
input=MainInput,
)
# Prepare the tool input
graph.add_node(
node="prepare_tool_input",
action=prepare_tool_input,
)
# Choose the initial tool
graph.add_node(
node="initial_tool_choice",
action=llm_tool_choice,
)
# Call the tool, if required
graph.add_node(
node="tool_call",
action=tool_call,
)
# Use the tool response
graph.add_node(
node="basic_use_tool_response",
action=basic_use_tool_response,
)
# Start the agent search process
graph.add_node(
node="start_agent_search",
action=start_agent_search,
)
# The sub-graph for the initial answer generation
generate_initial_answer_subgraph = generate_initial_answer_graph_builder().compile()
graph.add_node(
node="generate_initial_answer_subgraph",
action=generate_initial_answer_subgraph,
)
# Create the refined sub-questions
graph.add_node(
node="create_refined_sub_questions",
action=create_refined_sub_questions,
)
# Subgraph for the refined sub-answer generation
answer_refined_question = answer_refined_query_graph_builder().compile()
graph.add_node(
node="answer_refined_question_subgraphs",
action=answer_refined_question,
)
# Ingest the refined sub-answers
graph.add_node(
node="ingest_refined_answers",
action=ingest_refined_answers,
node="ingest_refined_sub_answers",
action=ingest_refined_sub_answers,
)
# Node to generate the refined answer
graph.add_node(
node="generate_refined_answer",
action=generate_refined_answer,
)
# Early node to extract the entities and terms from the initial answer,
# This information is used to inform the creation the refined sub-questions
graph.add_node(
node="extract_entity_term",
action=extract_entities_terms,
)
# Decide if the answer needs to be refined (current;ly always true)
graph.add_node(
node="decide_refinement_need",
action=decide_refinement_need,
)
# Compare the initial and refined answers, and determine whether
# the refined answer is sufficiently better
graph.add_node(
node="compare_answers",
action=compare_answers,
)
# Log the results. This will log the stats as well as the answers
graph.add_node(
node="logging_node",
action=persist_agent_results,
@ -165,6 +190,8 @@ def main_graph_builder(test_mode: bool = False) -> StateGraph:
end_key="extract_entity_term",
)
# Wait for the initial answer generation and the entity/term extraction to be complete
# before deciding if a refinement is needed.
graph.add_edge(
start_key=["generate_initial_answer_subgraph", "extract_entity_term"],
end_key="decide_refinement_need",
@ -183,11 +210,11 @@ def main_graph_builder(test_mode: bool = False) -> StateGraph:
)
graph.add_edge(
start_key="answer_refined_question_subgraphs", # HERE
end_key="ingest_refined_answers",
end_key="ingest_refined_sub_answers",
)
graph.add_edge(
start_key="ingest_refined_answers",
start_key="ingest_refined_sub_answers",
end_key="generate_refined_answer",
)

View File

@ -23,6 +23,10 @@ from onyx.prompts.agent_search import (
def compare_answers(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> InitialRefinedAnswerComparisonUpdate:
"""
LangGraph node to compare the initial answer and the refined answer and determine if the
refined answer is sufficiently better than the initial answer.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -38,7 +38,10 @@ from onyx.tools.models import ToolCallKickoff
def create_refined_sub_questions(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> RefinedQuestionDecompositionUpdate:
""" """
"""
LangGraph node to create refined sub-questions based on the initial answer, the history,
the entity term extraction results found earlier, and the sub-questions that were answered and failed.
"""
graph_config = cast(GraphConfig, config["metadata"]["config"])
write_custom_event(
"start_refined_answer_creation",

View File

@ -16,6 +16,10 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
def decide_refinement_need(
state: MainState, config: RunnableConfig
) -> RequireRefinemenEvalUpdate:
"""
LangGraph node to decide if refinement is needed based on the initial answer and the question.
At present, we always refine.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -29,6 +29,10 @@ from onyx.prompts.agent_search import ENTITY_TERM_EXTRACTION_PROMPT_JSON_EXAMPLE
def extract_entities_terms(
state: MainState, config: RunnableConfig
) -> EntityTermExtractionUpdate:
"""
LangGraph node to extract entities, relationships, and terms from the initial search results.
This data is used to inform particularly the sub-questions that are created for the refined answer.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -61,6 +61,10 @@ from onyx.tools.tool_implementations.search.search_tool import yield_search_resp
def generate_refined_answer(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> RefinedAnswerUpdate:
"""
LangGraph node to generate the refined answer.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -14,9 +14,12 @@ from onyx.agents.agent_search.shared_graph_utils.utils import (
)
def ingest_refined_answers(
def ingest_refined_sub_answers(
state: AnswerQuestionOutput,
) -> SubQuestionResultsUpdate:
"""
LangGraph node to ingest and format the refined sub-answers and retrieved documents.
"""
node_start_time = datetime.now()
documents = []

View File

@ -20,6 +20,9 @@ from onyx.db.chat import log_agent_sub_question_results
def persist_agent_results(state: MainState, config: RunnableConfig) -> MainOutput:
"""
LangGraph node to persist the agent results, including agent logging data.
"""
node_start_time = datetime.now()
agent_start_time = state.agent_start_time

View File

@ -22,6 +22,9 @@ from onyx.context.search.models import InferenceSection
def start_agent_search(
state: MainState, config: RunnableConfig
) -> ExploratorySearchUpdate:
"""
LangGraph node to start the agentic search process.
"""
node_start_time = datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])

View File

@ -17,6 +17,9 @@ logger = setup_logger()
def send_to_expanded_refined_retrieval(
state: SubQuestionAnsweringInput,
) -> Send | Hashable:
"""
LangGraph edge to sends a refined sub-question extended retrieval.
"""
logger.debug("sending to expanded retrieval for follow up question via edge")
datetime.now()
return Send(

View File

@ -35,6 +35,9 @@ logger = setup_logger()
def answer_refined_query_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the refined sub-answer generation process.
"""
graph = StateGraph(
state_schema=AnswerQuestionState,
input=SubQuestionAnsweringInput,
@ -43,27 +46,36 @@ def answer_refined_query_graph_builder() -> StateGraph:
### Add nodes ###
# Subgraph for the expanded retrieval process
expanded_retrieval = expanded_retrieval_graph_builder().compile()
graph.add_node(
node="refined_sub_question_expanded_retrieval",
action=expanded_retrieval,
)
# Ingest the retrieved documents
graph.add_node(
node="refined_sub_answer_check",
action=check_sub_answer,
node="ingest_refined_retrieval",
action=ingest_retrieved_documents,
)
# Generate the refined sub-answer
graph.add_node(
node="generate_refined_sub_answer",
action=generate_sub_answer,
)
# Check if the refined sub-answer is correct
graph.add_node(
node="refined_sub_answer_check",
action=check_sub_answer,
)
# Format the refined sub-answer
graph.add_node(
node="format_refined_sub_answer",
action=format_sub_answer,
)
graph.add_node(
node="ingest_refined_retrieval",
action=ingest_retrieved_documents,
)
### Add edges ###

View File

@ -16,6 +16,10 @@ from onyx.agents.agent_search.models import GraphConfig
def parallel_retrieval_edge(
state: ExpandedRetrievalState, config: RunnableConfig
) -> list[Send | Hashable]:
"""
LangGraph edge to parallelize the retrieval process for each of the
generated sub-queries and the original question.
"""
graph_config = cast(GraphConfig, config["metadata"]["config"])
question = (
state.question if state.question else graph_config.inputs.search_request.query

View File

@ -42,6 +42,9 @@ logger = setup_logger()
def expanded_retrieval_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the expanded retrieval process.
"""
graph = StateGraph(
state_schema=ExpandedRetrievalState,
input=ExpandedRetrievalInput,
@ -50,32 +53,43 @@ def expanded_retrieval_graph_builder() -> StateGraph:
### Add nodes ###
# Convert the question into multiple sub-queries
graph.add_node(
node="expand_queries",
action=expand_queries,
)
# Format the sub-queries into a list of strings
graph.add_node(
node="dummy",
node="format_queries",
action=format_queries,
)
# Retrieve the documents for each sub-query
graph.add_node(
node="retrieve_documents",
action=retrieve_documents,
)
# Start verification process that the documents are relevant to the question (not the query)
graph.add_node(
node="kickoff_verification",
action=kickoff_verification,
)
# Verify that a given document is relevant to the question (not the query)
graph.add_node(
node="verify_documents",
action=verify_documents,
)
# Rerank the documents that have been verified
graph.add_node(
node="rerank_documents",
action=rerank_documents,
)
# Format the results into a list of strings
graph.add_node(
node="format_results",
action=format_results,
@ -88,11 +102,11 @@ def expanded_retrieval_graph_builder() -> StateGraph:
)
graph.add_edge(
start_key="expand_queries",
end_key="dummy",
end_key="format_queries",
)
graph.add_conditional_edges(
source="dummy",
source="format_queries",
path=parallel_retrieval_edge,
path_map=["retrieve_documents"],
)

View File

@ -31,6 +31,9 @@ def expand_queries(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> QueryExpansionUpdate:
"""
LangGraph node to expand a question into multiple search queries.
"""
# Sometimes we want to expand the original question, sometimes we want to expand a sub-question.
# When we are running this node on the original question, no question is explictly passed in.
# Instead, we use the original question from the search request.

View File

@ -11,6 +11,9 @@ from onyx.agents.agent_search.deep_search.shared.expanded_retrieval.states impor
def format_queries(
state: ExpandedRetrievalState, config: RunnableConfig
) -> QueryExpansionUpdate:
"""
LangGraph node to format the expanded queries into a list of strings.
"""
return QueryExpansionUpdate(
expanded_queries=state.expanded_queries,
)

View File

@ -30,6 +30,9 @@ def format_results(
config: RunnableConfig,
writer: StreamWriter = lambda _: None,
) -> ExpandedRetrievalUpdate:
"""
LangGraph node that constructs the proper expanded retrieval format.
"""
level, question_num = parse_question_id(state.sub_question_id or "0_0")
query_info = get_query_info(state.query_retrieval_results)

View File

@ -16,6 +16,12 @@ def kickoff_verification(
state: ExpandedRetrievalState,
config: RunnableConfig,
) -> Command[Literal["verify_documents"]]:
"""
LangGraph node (Command node!) that kicks off the verification process for the retrieved documents.
Note that this is a Command node and does the routing as well. (At present, no state updates
are done here, so this could be replaced with an edge. But we may choose to make state
updates later.)
"""
retrieved_documents = state.retrieved_documents
verification_question = state.question

View File

@ -30,6 +30,10 @@ from onyx.db.engine import get_session_context_manager
def rerank_documents(
state: ExpandedRetrievalState, config: RunnableConfig
) -> DocRerankingUpdate:
"""
LangGraph node to rerank the retrieved and verified documents. A part of the
pre-existing pipeline is used here.
"""
node_start_time = datetime.now()
verified_documents = state.verified_documents

View File

@ -33,15 +33,7 @@ def retrieve_documents(
state: RetrievalInput, config: RunnableConfig
) -> DocRetrievalUpdate:
"""
Retrieve documents
Args:
state (RetrievalInput): Primary state + the query to retrieve
config (RunnableConfig): Configuration containing ProSearchConfig
Updates:
expanded_retrieval_results: list[ExpandedRetrievalResult]
retrieved_documents: list[InferenceSection]
LangGraph node to retrieve documents from the search tool.
"""
node_start_time = datetime.now()
query_to_retrieve = state.query_to_retrieve

View File

@ -22,7 +22,7 @@ def verify_documents(
state: DocVerificationInput, config: RunnableConfig
) -> DocVerificationUpdate:
"""
Check whether the document is relevant for the original user question
LangGraph node to check whether the document is relevant for the original user question
Args:
state (DocVerificationInput): The current state