diff --git a/backend/tests/regression/answer_quality/api_utils.py b/backend/tests/regression/answer_quality/api_utils.py index cdb77c2b0..8d02d2ca9 100644 --- a/backend/tests/regression/answer_quality/api_utils.py +++ b/backend/tests/regression/answer_quality/api_utils.py @@ -11,16 +11,15 @@ from danswer.search.models import IndexFilters from danswer.search.models import OptionalSearchSetting from danswer.search.models import RetrievalDetails from danswer.server.documents.models import ConnectorBase -from tests.regression.answer_quality.cli_utils import ( - get_api_server_host_port, -) +from tests.regression.answer_quality.cli_utils import get_api_server_host_port +from tests.regression.answer_quality.cli_utils import restart_vespa_container def _api_url_builder(run_suffix: str, api_path: str) -> str: return f"http://localhost:{get_api_server_host_port(run_suffix)}" + api_path -@retry(tries=5, delay=2, backoff=2) +@retry(tries=15, delay=10, jitter=1) def get_answer_from_query(query: str, run_suffix: str) -> tuple[list[str], str]: filters = IndexFilters( source_type=None, @@ -58,8 +57,10 @@ def get_answer_from_query(query: str, run_suffix: str) -> tuple[list[str], str]: context_data_list = response_json.get("contexts", {}).get("contexts", []) answer = response_json.get("answer", "") except Exception as e: - print("Failed to answer the questions, trying again") - print(f"error: {str(e)}") + print("Failed to answer the questions:") + print(f"\t {str(e)}") + print("Restarting vespa container and trying agian") + restart_vespa_container(run_suffix) raise e return context_data_list, answer diff --git a/backend/tests/regression/answer_quality/cli_utils.py b/backend/tests/regression/answer_quality/cli_utils.py index eace27da4..47efaf76e 100644 --- a/backend/tests/regression/answer_quality/cli_utils.py +++ b/backend/tests/regression/answer_quality/cli_utils.py @@ -3,9 +3,12 @@ import os import socket import subprocess import sys +import time +from datetime import datetime from threading import Thread from typing import IO +import yaml from retry import retry @@ -65,7 +68,36 @@ def switch_to_branch(branch: str) -> None: print("Repository updated successfully.") -def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> str: +def get_docker_container_env_vars(suffix: str) -> dict: + """ + Retrieves environment variables from "background" and "api_server" Docker containers. + """ + print(f"Getting environment variables for containers with suffix: {suffix}") + + combined_env_vars = {} + for container_type in ["background", "api_server"]: + container_name = _run_command( + f"docker ps -a --format '{{{{.Names}}}}' | grep '{container_type}' | grep '{suffix}'" + )[0].strip() + if not container_name: + raise RuntimeError( + f"No {container_type} container found with suffix: {suffix}" + ) + + env_vars_json = _run_command( + f"docker inspect --format='{{{{json .Config.Env}}}}' {container_name}" + )[0] + env_vars_list = json.loads(env_vars_json.strip()) + + for env_var in env_vars_list: + key, value = env_var.split("=", 1) + combined_env_vars[key] = value + + print(f"Combined env variables: {combined_env_vars}") + return combined_env_vars + + +def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> None: # Use the user's home directory as the base path target_path = os.path.join(os.path.expanduser(base_path), f"test{suffix}") directories = { @@ -87,7 +119,6 @@ def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> print(f"Set {env_var} to: {directory}") relari_output_path = os.path.join(target_path, "relari_output/") os.makedirs(relari_output_path, exist_ok=True) - return relari_output_path def set_env_variables( @@ -150,7 +181,8 @@ def cleanup_docker(run_suffix: str) -> None: stdout, _ = _run_command("docker ps -a --format '{{json .}}'") containers = [json.loads(line) for line in stdout.splitlines()] - + if not run_suffix: + run_suffix = datetime.now().strftime("-%Y") project_name = f"danswer-stack{run_suffix}" containers_to_delete = [ c for c in containers if c["Names"].startswith(project_name) @@ -247,3 +279,71 @@ def get_api_server_host_port(suffix: str) -> str: f"No port found containing: {client_port} for container: {container_name} and suffix: {suffix}" ) return matching_ports[0] + + +# Added function to check Vespa container health status +def is_vespa_container_healthy(suffix: str) -> bool: + print(f"Checking health status of Vespa container for suffix: {suffix}") + + # Find the Vespa container + stdout, _ = _run_command( + f"docker ps -a --format '{{{{.Names}}}}' | grep vespa | grep {suffix}" + ) + container_name = stdout.strip() + + if not container_name: + print(f"No Vespa container found with suffix: {suffix}") + return False + + # Get the health status + stdout, _ = _run_command( + f"docker inspect --format='{{{{.State.Health.Status}}}}' {container_name}" + ) + health_status = stdout.strip() + + is_healthy = health_status.lower() == "healthy" + print(f"Vespa container '{container_name}' health status: {health_status}") + + return is_healthy + + +# Added function to restart Vespa container +def restart_vespa_container(suffix: str) -> None: + print(f"Restarting Vespa container for suffix: {suffix}") + + # Find the Vespa container + stdout, _ = _run_command( + f"docker ps -a --format '{{{{.Names}}}}' | grep vespa | grep {suffix}" + ) + container_name = stdout.strip() + + if not container_name: + raise RuntimeError(f"No Vespa container found with suffix: {suffix}") + + # Restart the container + _run_command(f"docker restart {container_name}") + + print(f"Vespa container '{container_name}' has begun restarting") + + time_to_wait = 5 + while not is_vespa_container_healthy(suffix): + print(f"Waiting {time_to_wait} seconds for vespa container to restart") + time.sleep(5) + + print(f"Vespa container '{container_name}' has been restarted") + + +if __name__ == "__main__": + """ + Running this just cleans up the docker environment for the container indicated by existing_test_suffix + If no existing_test_suffix is indicated, will just clean up all danswer docker containers/volumes/networks + Note: vespa/postgres mounts are not deleted + """ + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(current_dir, "search_test_config.yaml") + with open(config_path, "r") as file: + config = yaml.safe_load(file) + + if not isinstance(config, dict): + raise TypeError("config must be a dictionary") + cleanup_docker(config["existing_test_suffix"]) diff --git a/backend/tests/regression/answer_quality/run_eval_pipeline.py b/backend/tests/regression/answer_quality/run_eval_pipeline.py index 0ddd10502..b9c2cadbd 100644 --- a/backend/tests/regression/answer_quality/run_eval_pipeline.py +++ b/backend/tests/regression/answer_quality/run_eval_pipeline.py @@ -26,7 +26,7 @@ def main() -> None: run_suffix = config.existing_test_suffix print("launching danswer with existing data suffix:", run_suffix) else: - run_suffix = datetime.now().strftime("_%Y%m%d_%H%M%S") + run_suffix = datetime.now().strftime("-%Y%m%d-%H%M%S") print("run_suffix:", run_suffix) set_env_variables( @@ -35,9 +35,7 @@ def main() -> None: config.use_cloud_gpu, config.llm, ) - relari_output_folder_path = manage_data_directories( - run_suffix, config.output_folder, config.use_cloud_gpu - ) + manage_data_directories(run_suffix, config.output_folder, config.use_cloud_gpu) if config.branch: switch_to_branch(config.branch) @@ -46,9 +44,7 @@ def main() -> None: if not config.existing_test_suffix: upload_test_files(config.zipped_documents_file, run_suffix) - run_qa_test_and_save_results( - config.questions_file, relari_output_folder_path, run_suffix, config.limit - ) + run_qa_test_and_save_results(run_suffix) if config.clean_up_docker_containers: cleanup_docker(run_suffix) diff --git a/backend/tests/regression/answer_quality/run_qa.py b/backend/tests/regression/answer_quality/run_qa.py index fcb8c49db..c8ec09466 100644 --- a/backend/tests/regression/answer_quality/run_qa.py +++ b/backend/tests/regression/answer_quality/run_qa.py @@ -1,55 +1,32 @@ import json import os import time -from types import SimpleNamespace import yaml from tests.regression.answer_quality.api_utils import check_if_query_ready from tests.regression.answer_quality.api_utils import get_answer_from_query from tests.regression.answer_quality.cli_utils import get_current_commit_sha +from tests.regression.answer_quality.cli_utils import get_docker_container_env_vars + +RESULTS_FILENAME = "results.jsonl" +METADATA_FILENAME = "metadata.yaml" -def _process_and_write_query_results( - samples: list[dict], run_suffix: str, output_file_path: str -) -> None: - while not check_if_query_ready(run_suffix): - time.sleep(5) - - count = 0 +def _update_results_file(output_folder_path: str, qa_output: dict) -> None: + output_file_path = os.path.join(output_folder_path, RESULTS_FILENAME) with open(output_file_path, "w", encoding="utf-8") as file: - for sample in samples: - print(f"On question number {count}") - query = sample["question"] - print(f"query: {query}") - context_data_list, answer = get_answer_from_query( - query=query, - run_suffix=run_suffix, - ) - - print(f"answer: {answer[:50]}...") - if not context_data_list: - print("No context found") - else: - print(f"{len(context_data_list)} context docs found") - print("\n") - - output = { - "question_data": sample, - "answer": answer, - "context_data_list": context_data_list, - } - - file.write(json.dumps(output) + "\n") - file.flush() - count += 1 + file.write(json.dumps(qa_output) + "\n") + file.flush() -def _write_metadata_file(run_suffix: str, metadata_file_path: str) -> None: - metadata = {"commit_sha": get_current_commit_sha(), "run_suffix": run_suffix} +def _update_metadata_file(test_output_folder: str, count: int) -> None: + metadata_path = os.path.join(test_output_folder, METADATA_FILENAME) + with open(metadata_path, "r", encoding="utf-8") as file: + metadata = yaml.safe_load(file) - print("saving metadata to:", metadata_file_path) - with open(metadata_file_path, "w", encoding="utf-8") as yaml_file: + metadata["number_of_questions_asked"] = count + with open(metadata_path, "w", encoding="utf-8") as yaml_file: yaml.dump(metadata, yaml_file) @@ -62,60 +39,111 @@ def _read_questions_jsonl(questions_file_path: str) -> list[dict]: return questions -def run_qa_test_and_save_results( - questions_file_path: str, - results_folder_path: str, - run_suffix: str, - limit: int | None = None, -) -> None: - results_file = "run_results.jsonl" - metadata_file = "run_metadata.yaml" - samples = _read_questions_jsonl(questions_file_path) - - if limit is not None: - samples = samples[:limit] +def _get_test_output_folder(config: dict) -> str: + base_output_folder = os.path.expanduser(config["output_folder"]) + if config["run_suffix"]: + base_output_folder = os.path.join( + base_output_folder, ("test" + config["run_suffix"]), "relari_output" + ) + else: + base_output_folder = os.path.join(base_output_folder, "no_defined_suffix") counter = 1 - output_file_path = os.path.join(results_folder_path, results_file) - metadata_file_path = os.path.join(results_folder_path, metadata_file) - while os.path.exists(output_file_path): - output_file_path = os.path.join( - results_folder_path, - results_file.replace("run_results", f"run_results_{counter}"), - ) - metadata_file_path = os.path.join( - results_folder_path, - metadata_file.replace("run_metadata", f"run_metadata_{counter}"), + run_suffix = config["run_suffix"][1:] + output_folder_path = os.path.join(base_output_folder, f"{run_suffix}_run_1") + while os.path.exists(output_folder_path): + output_folder_path = os.path.join( + output_folder_path.replace( + f"{run_suffix}_run_{counter-1}", f"{run_suffix}_run_{counter}" + ), ) counter += 1 - print("saving question results to:", output_file_path) - _write_metadata_file(run_suffix, metadata_file_path) - _process_and_write_query_results( - samples=samples, run_suffix=run_suffix, output_file_path=output_file_path - ) + os.makedirs(output_folder_path, exist_ok=True) + + return output_folder_path -def main() -> None: +def _initialize_files(config: dict) -> tuple[str, list[dict]]: + test_output_folder = _get_test_output_folder(config) + + questions = _read_questions_jsonl(config["questions_file"]) + + metadata = { + "commit_sha": get_current_commit_sha(), + "run_suffix": config["run_suffix"], + "test_config": config, + "number_of_questions_in_dataset": len(questions), + } + + env_vars = get_docker_container_env_vars(config["run_suffix"]) + if env_vars["ENV_SEED_CONFIGURATION"]: + del env_vars["ENV_SEED_CONFIGURATION"] + if env_vars["GPG_KEY"]: + del env_vars["GPG_KEY"] + if metadata["config"]["llm"]["api_key"]: + del metadata["config"]["llm"]["api_key"] + metadata.update(env_vars) + metadata_path = os.path.join(test_output_folder, METADATA_FILENAME) + print("saving metadata to:", metadata_path) + with open(metadata_path, "w", encoding="utf-8") as yaml_file: + yaml.dump(metadata, yaml_file) + + return test_output_folder, questions + + +def _process_and_write_query_results(config: dict) -> None: + test_output_folder, questions = _initialize_files(config) + print("saving test results to folder:", test_output_folder) + + while not check_if_query_ready(config["run_suffix"]): + time.sleep(5) + + if config["limit"] is not None: + questions = questions[: config["limit"]] + count = 1 + for question_data in questions: + print(f"On question number {count}") + + query = question_data["question"] + print(f"query: {query}") + context_data_list, answer = get_answer_from_query( + query=query, + run_suffix=config["run_suffix"], + ) + + if not context_data_list: + print("No answer or context found") + else: + print(f"answer: {answer[:50]}...") + print(f"{len(context_data_list)} context docs found") + print("\n") + + output = { + "question_data": question_data, + "answer": answer, + "context_data_list": context_data_list, + } + + _update_results_file(test_output_folder, output) + _update_metadata_file(test_output_folder, count) + count += 1 + + +def run_qa_test_and_save_results(run_suffix: str = "") -> None: current_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(current_dir, "search_test_config.yaml") with open(config_path, "r") as file: - config = SimpleNamespace(**yaml.safe_load(file)) + config = yaml.safe_load(file) - current_output_folder = os.path.expanduser(config.output_folder) - if config.existing_test_suffix: - current_output_folder = os.path.join( - current_output_folder, "test" + config.existing_test_suffix, "relari_output" - ) - else: - current_output_folder = os.path.join(current_output_folder, "no_defined_suffix") + if not isinstance(config, dict): + raise TypeError("config must be a dictionary") - run_qa_test_and_save_results( - config.questions_file, - current_output_folder, - config.existing_test_suffix, - config.limit, - ) + if not run_suffix: + run_suffix = config["existing_test_suffix"] + + config["run_suffix"] = run_suffix + _process_and_write_query_results(config) if __name__ == "__main__": @@ -123,4 +151,4 @@ if __name__ == "__main__": To run a different set of questions, update the questions_file in search_test_config.yaml If there is more than one instance of Danswer running, specify the suffix in search_test_config.yaml """ - main() + run_qa_test_and_save_results()