Improve eval pipeline qol (#1908)

This commit is contained in:
hagen-danswer 2024-07-23 17:16:34 -07:00 committed by GitHub
parent 2470c68506
commit 6ff8e6c0ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 105 additions and 115 deletions

View File

@ -9,7 +9,7 @@ This Python script automates the process of running search quality tests for a b
- Manages environment variables
- Switches to specified Git branch
- Uploads test documents
- Runs search quality tests using Relari
- Runs search quality tests
- Cleans up Docker containers (optional)
## Usage
@ -29,9 +29,17 @@ export PYTHONPATH=$PYTHONPATH:$PWD/backend
```
cd backend/tests/regression/answer_quality
```
7. Run the script:
7. To launch the evaluation environment, run the launch_eval_env.py script (this step can be skipped if you are running the env outside of docker, just leave "environment_name" blank):
```
python run_eval_pipeline.py
python launch_eval_env.py
```
8. Run the file_uploader.py script to upload the zip files located at the path "zipped_documents_file"
```
python file_uploader.py
```
9. Run the run_qa.py script to ask questions from the jsonl located at the path "questions_file". This will hit the "query/answer-with-quote" API endpoint.
```
python run_qa.py
```
Note: All data will be saved even after the containers are shut down. There are instructions below to re-launching docker containers using this data.
@ -75,12 +83,10 @@ Edit `search_test_config.yaml` to set:
- model_server_port
- This is the port of the remote model server
- Only need to set this if use_cloud_gpu is true
- existing_test_suffix (THIS IS NOT A SUFFIX ANYMORE, TODO UPDATE THE DOCS HERE)
- environment_name
- Use this if you would like to relaunch a previous test instance
- Input the suffix of the test you'd like to re-launch
- (E.g. to use the data from folder "test-1234-5678" put "-1234-5678")
- No new files will automatically be uploaded
- Leave empty to run a new test
- Input the env_name of the test you'd like to re-launch
- Leave empty to launch referencing local default network locations
- limit
- Max number of questions you'd like to ask against the dataset
- Set to null for no limit
@ -90,7 +96,7 @@ Edit `search_test_config.yaml` to set:
## Relaunching From Existing Data
To launch an existing set of containers that has already completed indexing, set the existing_test_suffix variable. This will launch the docker containers mounted on the volumes of the indicated suffix and will not automatically index any documents or run any QA.
To launch an existing set of containers that has already completed indexing, set the environment_name variable. This will launch the docker containers mounted on the volumes of the indicated env_name and will not automatically index any documents or run any QA.
Once these containers are launched you can run file_uploader.py or run_qa.py (assuming you have run the steps in the Usage section above).
- file_uploader.py will upload and index additional zipped files located at the zipped_documents_file path.

View File

@ -16,13 +16,16 @@ from tests.regression.answer_quality.cli_utils import get_api_server_host_port
GENERAL_HEADERS = {"Content-Type": "application/json"}
def _api_url_builder(run_suffix: str, api_path: str) -> str:
return f"http://localhost:{get_api_server_host_port(run_suffix)}" + api_path
def _api_url_builder(env_name: str, api_path: str) -> str:
if env_name:
return f"http://localhost:{get_api_server_host_port(env_name)}" + api_path
else:
return "http://localhost:8080" + api_path
@retry(tries=5, delay=5)
def get_answer_from_query(
query: str, only_retrieve_docs: bool, run_suffix: str
query: str, only_retrieve_docs: bool, env_name: str
) -> tuple[list[str], str]:
filters = IndexFilters(
source_type=None,
@ -49,7 +52,7 @@ def get_answer_from_query(
skip_gen_ai_answer_generation=only_retrieve_docs,
)
url = _api_url_builder(run_suffix, "/query/answer-with-quote/")
url = _api_url_builder(env_name, "/query/answer-with-quote/")
headers = {
"Content-Type": "application/json",
}
@ -70,8 +73,8 @@ def get_answer_from_query(
@retry(tries=10, delay=10)
def check_indexing_status(run_suffix: str) -> tuple[int, bool]:
url = _api_url_builder(run_suffix, "/manage/admin/connector/indexing-status/")
def check_indexing_status(env_name: str) -> tuple[int, bool]:
url = _api_url_builder(env_name, "/manage/admin/connector/indexing-status/")
try:
indexing_status_dict = requests.get(url, headers=GENERAL_HEADERS).json()
except Exception as e:
@ -99,8 +102,8 @@ def check_indexing_status(run_suffix: str) -> tuple[int, bool]:
return doc_count, ongoing_index_attempts
def run_cc_once(run_suffix: str, connector_id: int, credential_id: int) -> None:
url = _api_url_builder(run_suffix, "/manage/admin/connector/run-once/")
def run_cc_once(env_name: str, connector_id: int, credential_id: int) -> None:
url = _api_url_builder(env_name, "/manage/admin/connector/run-once/")
body = {
"connector_id": connector_id,
"credential_ids": [credential_id],
@ -115,9 +118,9 @@ def run_cc_once(run_suffix: str, connector_id: int, credential_id: int) -> None:
print("Failed text:", response.text)
def create_cc_pair(run_suffix: str, connector_id: int, credential_id: int) -> None:
def create_cc_pair(env_name: str, connector_id: int, credential_id: int) -> None:
url = _api_url_builder(
run_suffix, f"/manage/connector/{connector_id}/credential/{credential_id}"
env_name, f"/manage/connector/{connector_id}/credential/{credential_id}"
)
body = {"name": "zip_folder_contents", "is_public": True}
@ -130,8 +133,8 @@ def create_cc_pair(run_suffix: str, connector_id: int, credential_id: int) -> No
print("Failed text:", response.text)
def _get_existing_connector_names(run_suffix: str) -> list[str]:
url = _api_url_builder(run_suffix, "/manage/connector")
def _get_existing_connector_names(env_name: str) -> list[str]:
url = _api_url_builder(env_name, "/manage/connector")
body = {
"credential_json": {},
@ -145,10 +148,10 @@ def _get_existing_connector_names(run_suffix: str) -> list[str]:
raise RuntimeError(response.__dict__)
def create_connector(run_suffix: str, file_paths: list[str]) -> int:
url = _api_url_builder(run_suffix, "/manage/admin/connector")
def create_connector(env_name: str, file_paths: list[str]) -> int:
url = _api_url_builder(env_name, "/manage/admin/connector")
connector_name = base_connector_name = "search_eval_connector"
existing_connector_names = _get_existing_connector_names(run_suffix)
existing_connector_names = _get_existing_connector_names(env_name)
count = 1
while connector_name in existing_connector_names:
@ -175,8 +178,8 @@ def create_connector(run_suffix: str, file_paths: list[str]) -> int:
raise RuntimeError(response.__dict__)
def create_credential(run_suffix: str) -> int:
url = _api_url_builder(run_suffix, "/manage/credential")
def create_credential(env_name: str) -> int:
url = _api_url_builder(env_name, "/manage/credential")
body = {
"credential_json": {},
"admin_public": True,
@ -190,12 +193,12 @@ def create_credential(run_suffix: str) -> int:
@retry(tries=10, delay=2, backoff=2)
def upload_file(run_suffix: str, zip_file_path: str) -> list[str]:
def upload_file(env_name: str, zip_file_path: str) -> list[str]:
files = [
("files", open(zip_file_path, "rb")),
]
api_path = _api_url_builder(run_suffix, "/manage/admin/connector/file/upload")
api_path = _api_url_builder(env_name, "/manage/admin/connector/file/upload")
try:
response = requests.post(api_path, files=files)
response.raise_for_status() # Raises an HTTPError for bad responses

View File

@ -67,20 +67,20 @@ def switch_to_commit(commit_sha: str) -> None:
print("Repository updated successfully.")
def get_docker_container_env_vars(suffix: str) -> dict:
def get_docker_container_env_vars(env_name: str) -> dict:
"""
Retrieves environment variables from "background" and "api_server" Docker containers.
"""
print(f"Getting environment variables for containers with suffix: {suffix}")
print(f"Getting environment variables for containers with env_name: {env_name}")
combined_env_vars = {}
for container_type in ["background", "api_server"]:
container_name = _run_command(
f"docker ps -a --format '{{{{.Names}}}}' | awk '/{container_type}/ && /{suffix}/'"
f"docker ps -a --format '{{{{.Names}}}}' | awk '/{container_type}/ && /{env_name}/'"
)[0].strip()
if not container_name:
raise RuntimeError(
f"No {container_type} container found with suffix: {suffix}"
f"No {container_type} container found with env_name: {env_name}"
)
env_vars_json = _run_command(
@ -95,9 +95,9 @@ def get_docker_container_env_vars(suffix: str) -> dict:
return combined_env_vars
def manage_data_directories(suffix: str, base_path: str, use_cloud_gpu: bool) -> None:
def manage_data_directories(env_name: str, base_path: str, use_cloud_gpu: bool) -> None:
# Use the user's home directory as the base path
target_path = os.path.join(os.path.expanduser(base_path), suffix)
target_path = os.path.join(os.path.expanduser(base_path), env_name)
directories = {
"DANSWER_POSTGRES_DATA_DIR": os.path.join(target_path, "postgres/"),
"DANSWER_VESPA_DATA_DIR": os.path.join(target_path, "vespa/"),
@ -144,12 +144,12 @@ def _is_port_in_use(port: int) -> bool:
def start_docker_compose(
run_suffix: str, launch_web_ui: bool, use_cloud_gpu: bool, only_state: bool = False
env_name: str, launch_web_ui: bool, use_cloud_gpu: bool, only_state: bool = False
) -> None:
print("Starting Docker Compose...")
os.chdir(os.path.dirname(__file__))
os.chdir("../../../../deployment/docker_compose/")
command = f"docker compose -f docker-compose.search-testing.yml -p danswer-stack-{run_suffix} up -d"
command = f"docker compose -f docker-compose.search-testing.yml -p danswer-stack-{env_name} up -d"
command += " --build"
command += " --force-recreate"
@ -175,17 +175,17 @@ def start_docker_compose(
print("Containers have been launched")
def cleanup_docker(run_suffix: str) -> None:
def cleanup_docker(env_name: str) -> None:
print(
f"Deleting Docker containers, volumes, and networks for project suffix: {run_suffix}"
f"Deleting Docker containers, volumes, and networks for project env_name: {env_name}"
)
stdout, _ = _run_command("docker ps -a --format '{{json .}}'")
containers = [json.loads(line) for line in stdout.splitlines()]
if not run_suffix:
run_suffix = datetime.now().strftime("-%Y")
project_name = f"danswer-stack{run_suffix}"
if not env_name:
env_name = datetime.now().strftime("-%Y")
project_name = f"danswer-stack{env_name}"
containers_to_delete = [
c for c in containers if c["Names"].startswith(project_name)
]
@ -221,23 +221,23 @@ def cleanup_docker(run_suffix: str) -> None:
networks = stdout.splitlines()
networks_to_delete = [n for n in networks if run_suffix in n]
networks_to_delete = [n for n in networks if env_name in n]
if not networks_to_delete:
print(f"No networks found containing suffix: {run_suffix}")
print(f"No networks found containing env_name: {env_name}")
else:
network_names = " ".join(networks_to_delete)
_run_command(f"docker network rm {network_names}")
print(
f"Successfully deleted {len(networks_to_delete)} networks containing suffix: {run_suffix}"
f"Successfully deleted {len(networks_to_delete)} networks containing env_name: {env_name}"
)
@retry(tries=5, delay=5, backoff=2)
def get_api_server_host_port(suffix: str) -> str:
def get_api_server_host_port(env_name: str) -> str:
"""
This pulls all containers with the provided suffix
This pulls all containers with the provided env_name
It then grabs the JSON specific container with a name containing "api_server"
It then grabs the port info from the JSON and strips out the relevent data
"""
@ -248,16 +248,16 @@ def get_api_server_host_port(suffix: str) -> str:
server_jsons = []
for container in containers:
if container_name in container["Names"] and suffix in container["Names"]:
if container_name in container["Names"] and env_name in container["Names"]:
server_jsons.append(container)
if not server_jsons:
raise RuntimeError(
f"No container found containing: {container_name} and {suffix}"
f"No container found containing: {container_name} and {env_name}"
)
elif len(server_jsons) > 1:
raise RuntimeError(
f"Too many containers matching {container_name} found, please indicate a suffix"
f"Too many containers matching {container_name} found, please indicate a env_name"
)
server_json = server_jsons[0]
@ -278,23 +278,23 @@ def get_api_server_host_port(suffix: str) -> str:
raise RuntimeError(f"Too many ports matching {client_port} found")
if not matching_ports:
raise RuntimeError(
f"No port found containing: {client_port} for container: {container_name} and suffix: {suffix}"
f"No port found containing: {client_port} for container: {container_name} and env_name: {env_name}"
)
return matching_ports[0]
# Added function to restart Vespa container
def restart_vespa_container(suffix: str) -> None:
print(f"Restarting Vespa container for suffix: {suffix}")
def restart_vespa_container(env_name: str) -> None:
print(f"Restarting Vespa container for env_name: {env_name}")
# Find the Vespa container
stdout, _ = _run_command(
f"docker ps -a --format '{{{{.Names}}}}' | awk '/index-1/ && /{suffix}/'"
f"docker ps -a --format '{{{{.Names}}}}' | awk '/index-1/ && /{env_name}/'"
)
container_name = stdout.strip()
if not container_name:
raise RuntimeError(f"No Vespa container found with suffix: {suffix}")
raise RuntimeError(f"No Vespa container found with env_name: {env_name}")
# Restart the container
_run_command(f"docker restart {container_name}")
@ -307,8 +307,8 @@ def restart_vespa_container(suffix: str) -> None:
if __name__ == "__main__":
"""
Running this just cleans up the docker environment for the container indicated by existing_test_suffix
If no existing_test_suffix is indicated, will just clean up all danswer docker containers/volumes/networks
Running this just cleans up the docker environment for the container indicated by environment_name
If no environment_name is indicated, will just clean up all danswer docker containers/volumes/networks
Note: vespa/postgres mounts are not deleted
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
@ -318,4 +318,4 @@ if __name__ == "__main__":
if not isinstance(config, dict):
raise TypeError("config must be a dictionary")
cleanup_docker(config["existing_test_suffix"])
cleanup_docker(config["environment_name"])

View File

@ -13,7 +13,6 @@ from tests.regression.answer_quality.api_utils import create_connector
from tests.regression.answer_quality.api_utils import create_credential
from tests.regression.answer_quality.api_utils import run_cc_once
from tests.regression.answer_quality.api_utils import upload_file
from tests.regression.answer_quality.cli_utils import restart_vespa_container
def unzip_and_get_file_paths(zip_file_path: str) -> list[str]:
@ -35,40 +34,37 @@ def create_temp_zip_from_files(file_paths: list[str]) -> str:
return zip_file_path
def upload_test_files(zip_file_path: str, run_suffix: str) -> None:
def upload_test_files(zip_file_path: str, env_name: str) -> None:
print("zip:", zip_file_path)
file_paths = upload_file(run_suffix, zip_file_path)
file_paths = upload_file(env_name, zip_file_path)
conn_id = create_connector(run_suffix, file_paths)
cred_id = create_credential(run_suffix)
conn_id = create_connector(env_name, file_paths)
cred_id = create_credential(env_name)
create_cc_pair(run_suffix, conn_id, cred_id)
run_cc_once(run_suffix, conn_id, cred_id)
create_cc_pair(env_name, conn_id, cred_id)
run_cc_once(env_name, conn_id, cred_id)
def manage_file_upload(zip_file_path: str, run_suffix: str) -> None:
def manage_file_upload(zip_file_path: str, env_name: str) -> None:
unzipped_file_paths = unzip_and_get_file_paths(zip_file_path)
total_file_count = len(unzipped_file_paths)
while True:
doc_count, ongoing_index_attempts = check_indexing_status(run_suffix)
doc_count, ongoing_index_attempts = check_indexing_status(env_name)
if not doc_count:
print("No docs indexed, waiting for indexing to start")
upload_test_files(zip_file_path, run_suffix)
elif ongoing_index_attempts:
if ongoing_index_attempts:
print(
f"{doc_count} docs indexed but waiting for ongoing indexing jobs to finish..."
)
elif not doc_count:
print("No docs indexed, waiting for indexing to start")
upload_test_files(zip_file_path, env_name)
elif doc_count < total_file_count:
print(f"No ongooing indexing attempts but only {doc_count} docs indexed")
print("Restarting vespa...")
restart_vespa_container(run_suffix)
print(f"Rerunning with {total_file_count - doc_count} missing docs")
remaining_files = unzipped_file_paths[doc_count:]
print(f"Grabbed last {len(remaining_files)} docs to try agian")
temp_zip_file_path = create_temp_zip_from_files(remaining_files)
upload_test_files(temp_zip_file_path, run_suffix)
upload_test_files(temp_zip_file_path, env_name)
os.unlink(temp_zip_file_path)
else:
print(f"Successfully uploaded {doc_count} docs!")
@ -86,5 +82,5 @@ if __name__ == "__main__":
with open(config_path, "r") as file:
config = SimpleNamespace(**yaml.safe_load(file))
file_location = config.zipped_documents_file
run_suffix = config.existing_test_suffix
manage_file_upload(file_location, run_suffix)
env_name = config.environment_name
manage_file_upload(file_location, env_name)

View File

@ -1,16 +1,12 @@
import os
from datetime import datetime
from types import SimpleNamespace
import yaml
from tests.regression.answer_quality.cli_utils import cleanup_docker
from tests.regression.answer_quality.cli_utils import manage_data_directories
from tests.regression.answer_quality.cli_utils import set_env_variables
from tests.regression.answer_quality.cli_utils import start_docker_compose
from tests.regression.answer_quality.cli_utils import switch_to_commit
from tests.regression.answer_quality.file_uploader import upload_test_files
from tests.regression.answer_quality.run_qa import run_qa_test_and_save_results
def load_config(config_filename: str) -> SimpleNamespace:
@ -22,12 +18,16 @@ def load_config(config_filename: str) -> SimpleNamespace:
def main() -> None:
config = load_config("search_test_config.yaml")
if config.existing_test_suffix:
run_suffix = config.existing_test_suffix
print("launching danswer with existing data suffix:", run_suffix)
if config.environment_name:
env_name = config.environment_name
print("launching danswer with environment name:", env_name)
else:
run_suffix = datetime.now().strftime("-%Y%m%d-%H%M%S")
print("run_suffix:", run_suffix)
print("No env name defined. Not launching docker.")
print(
"Please define a name in the config yaml to start a new env "
"or use an existing env"
)
return
set_env_variables(
config.model_server_ip,
@ -35,22 +35,14 @@ def main() -> None:
config.use_cloud_gpu,
config.llm,
)
manage_data_directories(run_suffix, config.output_folder, config.use_cloud_gpu)
manage_data_directories(env_name, config.output_folder, config.use_cloud_gpu)
if config.commit_sha:
switch_to_commit(config.commit_sha)
start_docker_compose(
run_suffix, config.launch_web_ui, config.use_cloud_gpu, config.only_state
env_name, config.launch_web_ui, config.use_cloud_gpu, config.only_state
)
if not config.existing_test_suffix and not config.only_state:
upload_test_files(config.zipped_documents_file, run_suffix)
run_qa_test_and_save_results(run_suffix)
if config.clean_up_docker_containers:
cleanup_docker(run_suffix)
if __name__ == "__main__":
main()

View File

@ -43,12 +43,12 @@ def _read_questions_jsonl(questions_file_path: str) -> list[dict]:
def _get_test_output_folder(config: dict) -> str:
base_output_folder = os.path.expanduser(config["output_folder"])
if config["run_suffix"]:
if config["env_name"]:
base_output_folder = os.path.join(
base_output_folder, config["run_suffix"], "evaluations_output"
base_output_folder, config["env_name"], "evaluations_output"
)
else:
base_output_folder = os.path.join(base_output_folder, "no_defined_suffix")
base_output_folder = os.path.join(base_output_folder, "no_defined_env_name")
counter = 1
output_folder_path = os.path.join(base_output_folder, "run_1")
@ -72,12 +72,12 @@ def _initialize_files(config: dict) -> tuple[str, list[dict]]:
metadata = {
"commit_sha": get_current_commit_sha(),
"run_suffix": config["run_suffix"],
"env_name": config["env_name"],
"test_config": config,
"number_of_questions_in_dataset": len(questions),
}
env_vars = get_docker_container_env_vars(config["run_suffix"])
env_vars = get_docker_container_env_vars(config["env_name"])
if env_vars["ENV_SEED_CONFIGURATION"]:
del env_vars["ENV_SEED_CONFIGURATION"]
if env_vars["GPG_KEY"]:
@ -118,7 +118,7 @@ def _process_question(question_data: dict, config: dict, question_number: int) -
context_data_list, answer = get_answer_from_query(
query=query,
only_retrieve_docs=config["only_retrieve_docs"],
run_suffix=config["run_suffix"],
env_name=config["env_name"],
)
if not context_data_list:
@ -173,7 +173,7 @@ def _process_and_write_query_results(config: dict) -> None:
print("saved test results to folder:", test_output_folder)
def run_qa_test_and_save_results(run_suffix: str = "") -> None:
def run_qa_test_and_save_results(env_name: str = "") -> None:
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "search_test_config.yaml")
with open(config_path, "r") as file:
@ -182,16 +182,16 @@ def run_qa_test_and_save_results(run_suffix: str = "") -> None:
if not isinstance(config, dict):
raise TypeError("config must be a dictionary")
if not run_suffix:
run_suffix = config["existing_test_suffix"]
if not env_name:
env_name = config["environment_name"]
config["run_suffix"] = run_suffix
config["env_name"] = env_name
_process_and_write_query_results(config)
if __name__ == "__main__":
"""
To run a different set of questions, update the questions_file in search_test_config.yaml
If there is more than one instance of Danswer running, specify the suffix in search_test_config.yaml
If there is more than one instance of Danswer running, specify the env_name in search_test_config.yaml
"""
run_qa_test_and_save_results()

View File

@ -13,15 +13,9 @@ questions_file: "~/sample_questions.yaml"
# Git commit SHA to use (null means use current code as is)
commit_sha: null
# Whether to remove Docker containers after the test
clean_up_docker_containers: true
# Whether to launch a web UI for the test
launch_web_ui: false
# Whether to only run Vespa and Postgres
only_state: false
# Only retrieve documents, not LLM response
only_retrieve_docs: false
@ -34,9 +28,8 @@ model_server_ip: "PUT_PUBLIC_CLOUD_IP_HERE"
# Port of the model server (placeholder)
model_server_port: "PUT_PUBLIC_CLOUD_PORT_HERE"
# Suffix for existing test results (E.g. -1234-5678)
# empty string means no suffix
existing_test_suffix: ""
# Name for existing testing env (empty string uses default ports)
environment_name: ""
# Limit on number of tests to run (null means no limit)
limit: null