Improved eval logging and stability (#1843)

This commit is contained in:
hagen-danswer 2024-07-15 14:58:45 -07:00 committed by GitHub
parent bd1e0c5969
commit a1e638a73d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 221 additions and 96 deletions

View File

@ -11,16 +11,15 @@ from danswer.search.models import IndexFilters
from danswer.search.models import OptionalSearchSetting from danswer.search.models import OptionalSearchSetting
from danswer.search.models import RetrievalDetails from danswer.search.models import RetrievalDetails
from danswer.server.documents.models import ConnectorBase from danswer.server.documents.models import ConnectorBase
from tests.regression.answer_quality.cli_utils import ( from tests.regression.answer_quality.cli_utils import get_api_server_host_port
get_api_server_host_port, from tests.regression.answer_quality.cli_utils import restart_vespa_container
)
def _api_url_builder(run_suffix: str, api_path: str) -> str: def _api_url_builder(run_suffix: str, api_path: str) -> str:
return f"http://localhost:{get_api_server_host_port(run_suffix)}" + api_path return f"http://localhost:{get_api_server_host_port(run_suffix)}" + api_path
@retry(tries=5, delay=2, backoff=2) @retry(tries=15, delay=10, jitter=1)
def get_answer_from_query(query: str, run_suffix: str) -> tuple[list[str], str]: def get_answer_from_query(query: str, run_suffix: str) -> tuple[list[str], str]:
filters = IndexFilters( filters = IndexFilters(
source_type=None, source_type=None,
@ -58,8 +57,10 @@ def get_answer_from_query(query: str, run_suffix: str) -> tuple[list[str], str]:
context_data_list = response_json.get("contexts", {}).get("contexts", []) context_data_list = response_json.get("contexts", {}).get("contexts", [])
answer = response_json.get("answer", "") answer = response_json.get("answer", "")
except Exception as e: except Exception as e:
print("Failed to answer the questions, trying again") print("Failed to answer the questions:")
print(f"error: {str(e)}") print(f"\t {str(e)}")
print("Restarting vespa container and trying agian")
restart_vespa_container(run_suffix)
raise e raise e
return context_data_list, answer return context_data_list, answer

View File

@ -3,9 +3,12 @@ import os
import socket import socket
import subprocess import subprocess
import sys import sys
import time
from datetime import datetime
from threading import Thread from threading import Thread
from typing import IO from typing import IO
import yaml
from retry import retry from retry import retry
@ -65,7 +68,36 @@ def switch_to_branch(branch: str) -> None:
print("Repository updated successfully.") print("Repository updated successfully.")
def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> str: def get_docker_container_env_vars(suffix: str) -> dict:
"""
Retrieves environment variables from "background" and "api_server" Docker containers.
"""
print(f"Getting environment variables for containers with suffix: {suffix}")
combined_env_vars = {}
for container_type in ["background", "api_server"]:
container_name = _run_command(
f"docker ps -a --format '{{{{.Names}}}}' | grep '{container_type}' | grep '{suffix}'"
)[0].strip()
if not container_name:
raise RuntimeError(
f"No {container_type} container found with suffix: {suffix}"
)
env_vars_json = _run_command(
f"docker inspect --format='{{{{json .Config.Env}}}}' {container_name}"
)[0]
env_vars_list = json.loads(env_vars_json.strip())
for env_var in env_vars_list:
key, value = env_var.split("=", 1)
combined_env_vars[key] = value
print(f"Combined env variables: {combined_env_vars}")
return combined_env_vars
def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> None:
# Use the user's home directory as the base path # Use the user's home directory as the base path
target_path = os.path.join(os.path.expanduser(base_path), f"test{suffix}") target_path = os.path.join(os.path.expanduser(base_path), f"test{suffix}")
directories = { directories = {
@ -87,7 +119,6 @@ def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) ->
print(f"Set {env_var} to: {directory}") print(f"Set {env_var} to: {directory}")
relari_output_path = os.path.join(target_path, "relari_output/") relari_output_path = os.path.join(target_path, "relari_output/")
os.makedirs(relari_output_path, exist_ok=True) os.makedirs(relari_output_path, exist_ok=True)
return relari_output_path
def set_env_variables( def set_env_variables(
@ -150,7 +181,8 @@ def cleanup_docker(run_suffix: str) -> None:
stdout, _ = _run_command("docker ps -a --format '{{json .}}'") stdout, _ = _run_command("docker ps -a --format '{{json .}}'")
containers = [json.loads(line) for line in stdout.splitlines()] containers = [json.loads(line) for line in stdout.splitlines()]
if not run_suffix:
run_suffix = datetime.now().strftime("-%Y")
project_name = f"danswer-stack{run_suffix}" project_name = f"danswer-stack{run_suffix}"
containers_to_delete = [ containers_to_delete = [
c for c in containers if c["Names"].startswith(project_name) c for c in containers if c["Names"].startswith(project_name)
@ -247,3 +279,71 @@ def get_api_server_host_port(suffix: str) -> str:
f"No port found containing: {client_port} for container: {container_name} and suffix: {suffix}" f"No port found containing: {client_port} for container: {container_name} and suffix: {suffix}"
) )
return matching_ports[0] return matching_ports[0]
# Added function to check Vespa container health status
def is_vespa_container_healthy(suffix: str) -> bool:
print(f"Checking health status of Vespa container for suffix: {suffix}")
# Find the Vespa container
stdout, _ = _run_command(
f"docker ps -a --format '{{{{.Names}}}}' | grep vespa | grep {suffix}"
)
container_name = stdout.strip()
if not container_name:
print(f"No Vespa container found with suffix: {suffix}")
return False
# Get the health status
stdout, _ = _run_command(
f"docker inspect --format='{{{{.State.Health.Status}}}}' {container_name}"
)
health_status = stdout.strip()
is_healthy = health_status.lower() == "healthy"
print(f"Vespa container '{container_name}' health status: {health_status}")
return is_healthy
# Added function to restart Vespa container
def restart_vespa_container(suffix: str) -> None:
print(f"Restarting Vespa container for suffix: {suffix}")
# Find the Vespa container
stdout, _ = _run_command(
f"docker ps -a --format '{{{{.Names}}}}' | grep vespa | grep {suffix}"
)
container_name = stdout.strip()
if not container_name:
raise RuntimeError(f"No Vespa container found with suffix: {suffix}")
# Restart the container
_run_command(f"docker restart {container_name}")
print(f"Vespa container '{container_name}' has begun restarting")
time_to_wait = 5
while not is_vespa_container_healthy(suffix):
print(f"Waiting {time_to_wait} seconds for vespa container to restart")
time.sleep(5)
print(f"Vespa container '{container_name}' has been restarted")
if __name__ == "__main__":
"""
Running this just cleans up the docker environment for the container indicated by existing_test_suffix
If no existing_test_suffix is indicated, will just clean up all danswer docker containers/volumes/networks
Note: vespa/postgres mounts are not deleted
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "search_test_config.yaml")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
if not isinstance(config, dict):
raise TypeError("config must be a dictionary")
cleanup_docker(config["existing_test_suffix"])

View File

@ -26,7 +26,7 @@ def main() -> None:
run_suffix = config.existing_test_suffix run_suffix = config.existing_test_suffix
print("launching danswer with existing data suffix:", run_suffix) print("launching danswer with existing data suffix:", run_suffix)
else: else:
run_suffix = datetime.now().strftime("_%Y%m%d_%H%M%S") run_suffix = datetime.now().strftime("-%Y%m%d-%H%M%S")
print("run_suffix:", run_suffix) print("run_suffix:", run_suffix)
set_env_variables( set_env_variables(
@ -35,9 +35,7 @@ def main() -> None:
config.use_cloud_gpu, config.use_cloud_gpu,
config.llm, config.llm,
) )
relari_output_folder_path = manage_data_directories( manage_data_directories(run_suffix, config.output_folder, config.use_cloud_gpu)
run_suffix, config.output_folder, config.use_cloud_gpu
)
if config.branch: if config.branch:
switch_to_branch(config.branch) switch_to_branch(config.branch)
@ -46,9 +44,7 @@ def main() -> None:
if not config.existing_test_suffix: if not config.existing_test_suffix:
upload_test_files(config.zipped_documents_file, run_suffix) upload_test_files(config.zipped_documents_file, run_suffix)
run_qa_test_and_save_results( run_qa_test_and_save_results(run_suffix)
config.questions_file, relari_output_folder_path, run_suffix, config.limit
)
if config.clean_up_docker_containers: if config.clean_up_docker_containers:
cleanup_docker(run_suffix) cleanup_docker(run_suffix)

View File

@ -1,55 +1,32 @@
import json import json
import os import os
import time import time
from types import SimpleNamespace
import yaml import yaml
from tests.regression.answer_quality.api_utils import check_if_query_ready from tests.regression.answer_quality.api_utils import check_if_query_ready
from tests.regression.answer_quality.api_utils import get_answer_from_query from tests.regression.answer_quality.api_utils import get_answer_from_query
from tests.regression.answer_quality.cli_utils import get_current_commit_sha from tests.regression.answer_quality.cli_utils import get_current_commit_sha
from tests.regression.answer_quality.cli_utils import get_docker_container_env_vars
RESULTS_FILENAME = "results.jsonl"
METADATA_FILENAME = "metadata.yaml"
def _process_and_write_query_results( def _update_results_file(output_folder_path: str, qa_output: dict) -> None:
samples: list[dict], run_suffix: str, output_file_path: str output_file_path = os.path.join(output_folder_path, RESULTS_FILENAME)
) -> None:
while not check_if_query_ready(run_suffix):
time.sleep(5)
count = 0
with open(output_file_path, "w", encoding="utf-8") as file: with open(output_file_path, "w", encoding="utf-8") as file:
for sample in samples: file.write(json.dumps(qa_output) + "\n")
print(f"On question number {count}")
query = sample["question"]
print(f"query: {query}")
context_data_list, answer = get_answer_from_query(
query=query,
run_suffix=run_suffix,
)
print(f"answer: {answer[:50]}...")
if not context_data_list:
print("No context found")
else:
print(f"{len(context_data_list)} context docs found")
print("\n")
output = {
"question_data": sample,
"answer": answer,
"context_data_list": context_data_list,
}
file.write(json.dumps(output) + "\n")
file.flush() file.flush()
count += 1
def _write_metadata_file(run_suffix: str, metadata_file_path: str) -> None: def _update_metadata_file(test_output_folder: str, count: int) -> None:
metadata = {"commit_sha": get_current_commit_sha(), "run_suffix": run_suffix} metadata_path = os.path.join(test_output_folder, METADATA_FILENAME)
with open(metadata_path, "r", encoding="utf-8") as file:
metadata = yaml.safe_load(file)
print("saving metadata to:", metadata_file_path) metadata["number_of_questions_asked"] = count
with open(metadata_file_path, "w", encoding="utf-8") as yaml_file: with open(metadata_path, "w", encoding="utf-8") as yaml_file:
yaml.dump(metadata, yaml_file) yaml.dump(metadata, yaml_file)
@ -62,60 +39,111 @@ def _read_questions_jsonl(questions_file_path: str) -> list[dict]:
return questions return questions
def run_qa_test_and_save_results( def _get_test_output_folder(config: dict) -> str:
questions_file_path: str, base_output_folder = os.path.expanduser(config["output_folder"])
results_folder_path: str, if config["run_suffix"]:
run_suffix: str, base_output_folder = os.path.join(
limit: int | None = None, base_output_folder, ("test" + config["run_suffix"]), "relari_output"
) -> None: )
results_file = "run_results.jsonl" else:
metadata_file = "run_metadata.yaml" base_output_folder = os.path.join(base_output_folder, "no_defined_suffix")
samples = _read_questions_jsonl(questions_file_path)
if limit is not None:
samples = samples[:limit]
counter = 1 counter = 1
output_file_path = os.path.join(results_folder_path, results_file) run_suffix = config["run_suffix"][1:]
metadata_file_path = os.path.join(results_folder_path, metadata_file) output_folder_path = os.path.join(base_output_folder, f"{run_suffix}_run_1")
while os.path.exists(output_file_path): while os.path.exists(output_folder_path):
output_file_path = os.path.join( output_folder_path = os.path.join(
results_folder_path, output_folder_path.replace(
results_file.replace("run_results", f"run_results_{counter}"), f"{run_suffix}_run_{counter-1}", f"{run_suffix}_run_{counter}"
) ),
metadata_file_path = os.path.join(
results_folder_path,
metadata_file.replace("run_metadata", f"run_metadata_{counter}"),
) )
counter += 1 counter += 1
print("saving question results to:", output_file_path) os.makedirs(output_folder_path, exist_ok=True)
_write_metadata_file(run_suffix, metadata_file_path)
_process_and_write_query_results( return output_folder_path
samples=samples, run_suffix=run_suffix, output_file_path=output_file_path
def _initialize_files(config: dict) -> tuple[str, list[dict]]:
test_output_folder = _get_test_output_folder(config)
questions = _read_questions_jsonl(config["questions_file"])
metadata = {
"commit_sha": get_current_commit_sha(),
"run_suffix": config["run_suffix"],
"test_config": config,
"number_of_questions_in_dataset": len(questions),
}
env_vars = get_docker_container_env_vars(config["run_suffix"])
if env_vars["ENV_SEED_CONFIGURATION"]:
del env_vars["ENV_SEED_CONFIGURATION"]
if env_vars["GPG_KEY"]:
del env_vars["GPG_KEY"]
if metadata["config"]["llm"]["api_key"]:
del metadata["config"]["llm"]["api_key"]
metadata.update(env_vars)
metadata_path = os.path.join(test_output_folder, METADATA_FILENAME)
print("saving metadata to:", metadata_path)
with open(metadata_path, "w", encoding="utf-8") as yaml_file:
yaml.dump(metadata, yaml_file)
return test_output_folder, questions
def _process_and_write_query_results(config: dict) -> None:
test_output_folder, questions = _initialize_files(config)
print("saving test results to folder:", test_output_folder)
while not check_if_query_ready(config["run_suffix"]):
time.sleep(5)
if config["limit"] is not None:
questions = questions[: config["limit"]]
count = 1
for question_data in questions:
print(f"On question number {count}")
query = question_data["question"]
print(f"query: {query}")
context_data_list, answer = get_answer_from_query(
query=query,
run_suffix=config["run_suffix"],
) )
if not context_data_list:
print("No answer or context found")
else:
print(f"answer: {answer[:50]}...")
print(f"{len(context_data_list)} context docs found")
print("\n")
def main() -> None: output = {
"question_data": question_data,
"answer": answer,
"context_data_list": context_data_list,
}
_update_results_file(test_output_folder, output)
_update_metadata_file(test_output_folder, count)
count += 1
def run_qa_test_and_save_results(run_suffix: str = "") -> None:
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "search_test_config.yaml") config_path = os.path.join(current_dir, "search_test_config.yaml")
with open(config_path, "r") as file: with open(config_path, "r") as file:
config = SimpleNamespace(**yaml.safe_load(file)) config = yaml.safe_load(file)
current_output_folder = os.path.expanduser(config.output_folder) if not isinstance(config, dict):
if config.existing_test_suffix: raise TypeError("config must be a dictionary")
current_output_folder = os.path.join(
current_output_folder, "test" + config.existing_test_suffix, "relari_output"
)
else:
current_output_folder = os.path.join(current_output_folder, "no_defined_suffix")
run_qa_test_and_save_results( if not run_suffix:
config.questions_file, run_suffix = config["existing_test_suffix"]
current_output_folder,
config.existing_test_suffix, config["run_suffix"] = run_suffix
config.limit, _process_and_write_query_results(config)
)
if __name__ == "__main__": if __name__ == "__main__":
@ -123,4 +151,4 @@ if __name__ == "__main__":
To run a different set of questions, update the questions_file in search_test_config.yaml To run a different set of questions, update the questions_file in search_test_config.yaml
If there is more than one instance of Danswer running, specify the suffix in search_test_config.yaml If there is more than one instance of Danswer running, specify the suffix in search_test_config.yaml
""" """
main() run_qa_test_and_save_results()