diff --git a/backend/danswer/search/pipeline.py b/backend/danswer/search/pipeline.py index 4f9a2880d..7be81983b 100644 --- a/backend/danswer/search/pipeline.py +++ b/backend/danswer/search/pipeline.py @@ -34,6 +34,7 @@ from danswer.utils.logger import setup_logger from danswer.utils.threadpool_concurrency import FunctionCall from danswer.utils.threadpool_concurrency import run_functions_in_parallel from danswer.utils.threadpool_concurrency import run_functions_tuples_in_parallel +from danswer.utils.timing import log_function_time logger = setup_logger() @@ -154,6 +155,7 @@ class SearchPipeline: return cast(list[InferenceChunk], self._retrieved_chunks) + @log_function_time(print_only=True) def _get_sections(self) -> list[InferenceSection]: """Returns an expanded section from each of the chunks. If whole docs (instead of above/below context) is specified then it will give back all of the whole docs @@ -173,9 +175,11 @@ class SearchPipeline: expanded_inference_sections = [] # Full doc setting takes priority + if self.search_query.full_doc: seen_document_ids = set() unique_chunks = [] + # This preserves the ordering since the chunks are retrieved in score order for chunk in retrieved_chunks: if chunk.document_id not in seen_document_ids: @@ -195,7 +199,6 @@ class SearchPipeline: ), ) ) - list_inference_chunks = run_functions_tuples_in_parallel( functions_with_args, allow_failures=False ) @@ -240,32 +243,35 @@ class SearchPipeline: merged_ranges = [ merge_chunk_intervals(ranges) for ranges in doc_chunk_ranges_map.values() ] - flat_ranges = [r for ranges in merged_ranges for r in ranges] + + flat_ranges: list[ChunkRange] = [r for ranges in merged_ranges for r in ranges] + flattened_inference_chunks: list[InferenceChunk] = [] + parallel_functions_with_args = [] for chunk_range in flat_ranges: - functions_with_args.append( - ( - # If Large Chunks are introduced, additional filters need to be added here - self.document_index.id_based_retrieval, - ( - # Only need the document_id here, just use any chunk in the range is fine - chunk_range.chunks[0].document_id, - chunk_range.start, - chunk_range.end, - # There is no chunk level permissioning, this expansion around chunks - # can be assumed to be safe - IndexFilters(access_control_list=None), - ), - ) - ) + # Don't need to fetch chunks within range for merging if chunk_above / below are 0. + if above == below == 0: + flattened_inference_chunks.extend(chunk_range.chunks) - # list of list of inference chunks where the inner list needs to be combined for content - list_inference_chunks = run_functions_tuples_in_parallel( - functions_with_args, allow_failures=False - ) - flattened_inference_chunks = [ - chunk for sublist in list_inference_chunks for chunk in sublist - ] + else: + parallel_functions_with_args.append( + ( + self.document_index.id_based_retrieval, + ( + chunk_range.chunks[0].document_id, + chunk_range.start, + chunk_range.end, + IndexFilters(access_control_list=None), + ), + ) + ) + + if parallel_functions_with_args: + list_inference_chunks = run_functions_tuples_in_parallel( + parallel_functions_with_args, allow_failures=False + ) + for inference_chunks in list_inference_chunks: + flattened_inference_chunks.extend(inference_chunks) doc_chunk_ind_to_chunk = { (chunk.document_id, chunk.chunk_id): chunk