mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-17 21:32:36 +01:00
740 lines
26 KiB
Python
740 lines
26 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from os import listdir
|
|
from os.path import isfile
|
|
from os.path import join
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
from onyx.configs.constants import FASTAPI_USERS_AUTH_COOKIE_NAME
|
|
|
|
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.append(parent_dir)
|
|
|
|
from onyx.configs.app_configs import DOCUMENT_INDEX_NAME # noqa: E402
|
|
from onyx.configs.constants import SOURCE_TYPE # noqa: E402
|
|
|
|
ANALYSIS_FOLDER = f"{parent_dir}/scripts/.analysisfiles/"
|
|
|
|
|
|
def color_output(
|
|
text: str,
|
|
model: Optional[str] = None,
|
|
text_color: str = "white",
|
|
bg_color: str = "black",
|
|
text_style: str = "normal",
|
|
text_prefix: str = "",
|
|
) -> None:
|
|
"""Color and print a text
|
|
|
|
Args:
|
|
text (str): The text to display
|
|
model (str, optional): A pre-defined output model. Defaults to None.
|
|
text_color (str, optional): Define the text color. Defaults to "white".
|
|
bg_color (str, optional): Define the background color. Defaults to "black".
|
|
text_style (str, optional): Define the text style. Defaults to "normal".
|
|
text_prefix (str, optional): Set a text prefix. Defaults to "".
|
|
"""
|
|
if model:
|
|
if model == "alert":
|
|
text_color = "black"
|
|
bg_color = "red"
|
|
text_style = "bold"
|
|
elif model == "critical":
|
|
text_prefix = "CRITICAL: "
|
|
text_color = "white"
|
|
bg_color = "red"
|
|
text_style = "bold"
|
|
elif model == "note":
|
|
text_color = "yellow"
|
|
bg_color = "transparent"
|
|
text_style = "normal"
|
|
elif model == "info":
|
|
text_prefix = "INFO: "
|
|
text_color = "black"
|
|
bg_color = "yellow"
|
|
text_style = "bold"
|
|
elif model == "info2":
|
|
text_prefix = "INFO: "
|
|
text_color = "black"
|
|
bg_color = "white"
|
|
text_style = "bold"
|
|
elif model == "valid":
|
|
text_prefix = "INFO: "
|
|
text_color = "white"
|
|
bg_color = "green"
|
|
text_style = "bold"
|
|
elif model == "debug":
|
|
text_prefix = "DEBUG: "
|
|
text_color = "blue"
|
|
bg_color = "transparent"
|
|
text_style = "bold"
|
|
|
|
text_colors = {
|
|
"black": 30,
|
|
"red": 31,
|
|
"green": 32,
|
|
"yellow": 33,
|
|
"blue": 34,
|
|
"purple": 35,
|
|
"cian": 36,
|
|
"white": 37,
|
|
}
|
|
bg_colors = {
|
|
"black": 40,
|
|
"red": 41,
|
|
"green": 42,
|
|
"yellow": 43,
|
|
"blue": 44,
|
|
"purple": 45,
|
|
"cian": 46,
|
|
"white": 47,
|
|
"transparent": 49,
|
|
}
|
|
text_styles = {
|
|
"normal": 0,
|
|
"bold": 1,
|
|
"light": 2,
|
|
"italicized": 3,
|
|
"underlined": 4,
|
|
"blink": 5,
|
|
}
|
|
print(
|
|
f"\033[{text_styles[text_style]};{text_colors[text_color]};{bg_colors[bg_color]}m {text_prefix} {text} \033[0;0m"
|
|
)
|
|
|
|
|
|
class CompareAnalysis:
|
|
def __init__(
|
|
self, query: str, previous_content: dict, new_content: dict, threshold: float
|
|
) -> None:
|
|
"""Make the comparison between 2 analysis for a specific query
|
|
|
|
Args:
|
|
query (str): The analysed query
|
|
previous_content (dict): The previous analysis content for the selected query
|
|
new_content (dict): The new analysis content for the selected query
|
|
threshold (float): The minimum difference (percentage) between scores to raise an anomaly
|
|
"""
|
|
self._query = query
|
|
self._previous_content = previous_content
|
|
self._new_content = new_content
|
|
self._threshold = threshold
|
|
|
|
def _identify_diff(self, content_key: str) -> list[dict]:
|
|
"""Try to identify differences between the two analysis based
|
|
on the selected analysis key.
|
|
|
|
Args:
|
|
content_key (str): The analysis item's key to compare the versions.
|
|
Examples: score / document_id
|
|
|
|
Returns:
|
|
list[dict]: List of dict representing the information regarding the difference
|
|
Format: {
|
|
"previous_rank": XX,
|
|
"new_rank": XX,
|
|
"document_id": XXXX,
|
|
"previous_score": XX,
|
|
"new_score": XX,
|
|
"score_change_pct": XX
|
|
}
|
|
"""
|
|
changes = []
|
|
|
|
previous_content = {
|
|
k: v[content_key] for k, v in self._previous_content.items()
|
|
}
|
|
new_content = {k: v[content_key] for k, v in self._new_content.items()}
|
|
|
|
if previous_content != new_content:
|
|
for pos, data in previous_content.items():
|
|
if data != new_content[pos]:
|
|
try:
|
|
score_change_pct = round(
|
|
(
|
|
abs(
|
|
self._new_content[pos]["score"]
|
|
- self._previous_content[pos]["score"]
|
|
)
|
|
/ self._new_content[pos]["score"]
|
|
)
|
|
* 100.0,
|
|
2,
|
|
)
|
|
except ZeroDivisionError:
|
|
score_change_pct = 0
|
|
|
|
changes.append(
|
|
{
|
|
"previous_rank": pos,
|
|
"new_rank": pos
|
|
if content_key == "score"
|
|
else {
|
|
"x": k for k, v in new_content.items() if v == data
|
|
}.get("x", "not_ranked"),
|
|
"document_id": self._previous_content[pos]["document_id"],
|
|
"previous_score": self._previous_content[pos]["score"],
|
|
"new_score": self._new_content[pos]["score"],
|
|
"score_change_pct": score_change_pct,
|
|
}
|
|
)
|
|
return changes
|
|
|
|
def check_config_changes(self, previous_doc_rank: int, new_doc_rank: int) -> None:
|
|
"""Try to identify possible reasons why a change has been detected by
|
|
checking the latest document update date or the boost value.
|
|
|
|
Args:
|
|
previous_doc_rank (int): The document rank for the previous analysis
|
|
new_doc_rank (int): The document rank for the new analysis
|
|
"""
|
|
if new_doc_rank == "not_ranked":
|
|
color_output(
|
|
(
|
|
"NOTE: The document is missing in the 'current' analysis file. "
|
|
"Unable to identify more details about the reason for the change."
|
|
),
|
|
model="note",
|
|
)
|
|
return None
|
|
|
|
if (
|
|
self._previous_content[previous_doc_rank]["boost"]
|
|
!= self._new_content[new_doc_rank]["boost"]
|
|
):
|
|
color_output(
|
|
"NOTE: The 'boost' value has been changed which (maybe) explains the change.",
|
|
model="note",
|
|
)
|
|
color_output(
|
|
(
|
|
f"Previously it was '{self._previous_content[previous_doc_rank]['boost']}' "
|
|
f"and now is set to '{self._new_content[new_doc_rank]['boost']}'"
|
|
),
|
|
model="note",
|
|
)
|
|
if (
|
|
self._previous_content[previous_doc_rank]["updated_at"]
|
|
!= self._new_content[new_doc_rank]["updated_at"]
|
|
):
|
|
color_output("NOTE: The document seems to have been updated.", model="note")
|
|
color_output(
|
|
(
|
|
f"Previously the updated date was '{self._previous_content[previous_doc_rank]['updated_at']}' "
|
|
f"and now is '{self._new_content[new_doc_rank]['updated_at']}'"
|
|
),
|
|
model="note",
|
|
)
|
|
|
|
def check_documents_score(self) -> bool:
|
|
"""Check if the scores have changed between analysis.
|
|
|
|
Returns:
|
|
bool: True if at least one change has been detected. False otherwise.
|
|
"""
|
|
color_output("Checking documents Score....", model="info")
|
|
color_output(
|
|
f"Differences under '{self._threshold}%' are ignored (based on the '--threshold' argument)",
|
|
model="info",
|
|
)
|
|
|
|
if diff := [
|
|
x
|
|
for x in self._identify_diff("score")
|
|
if x["score_change_pct"] > self._threshold
|
|
]:
|
|
color_output("<<<<< Changes detected >>>>>", model="alert")
|
|
for change in diff:
|
|
color_output("-" * 100)
|
|
color_output(
|
|
(
|
|
f"The document '{change['document_id']}' (rank: {change['previous_rank']}) "
|
|
f"score has a changed of {change['score_change_pct']}%"
|
|
)
|
|
)
|
|
color_output(f"previous score: {change['previous_score']}")
|
|
color_output(f"current score: {change['new_score']}")
|
|
self.check_config_changes(change["previous_rank"], change["new_rank"])
|
|
|
|
color_output("<<<<< End of changes >>>>>", model="alert")
|
|
color_output(f"Number of changes detected {len(diff)}", model="info")
|
|
else:
|
|
color_output("No change detected", model="valid")
|
|
color_output("Documents Score check completed.", model="info")
|
|
|
|
return False if diff else True
|
|
|
|
def check_documents_order(self) -> bool:
|
|
"""Check if the selected documents are the same and in the same order.
|
|
|
|
Returns:
|
|
bool: True if at least one change has been detected. False otherwise.
|
|
"""
|
|
color_output("Checking documents Order....", model="info")
|
|
|
|
if diff := self._identify_diff("document_id"):
|
|
color_output("<<<<< Changes detected >>>>>", model="alert")
|
|
for change in diff:
|
|
color_output("-" * 100)
|
|
color_output(
|
|
(
|
|
f"The document '{change['document_id']}' was at a rank "
|
|
f"'{change['previous_rank']}' but now is at rank '{change['new_rank']}'"
|
|
)
|
|
)
|
|
color_output(f"previous score: {change['previous_score']}")
|
|
color_output(f"current score: {change['new_score']}")
|
|
self.check_config_changes(change["previous_rank"], change["new_rank"])
|
|
color_output("<<<<< End of changes >>>>>", model="alert")
|
|
color_output(f"Number of changes detected {len(diff)}", model="info")
|
|
|
|
else:
|
|
color_output("No change detected", model="valid")
|
|
color_output("Documents order check completed.", model="info")
|
|
|
|
return False if diff else True
|
|
|
|
def __call__(self) -> None:
|
|
"""Manage the analysis process"""
|
|
if not self.check_documents_order():
|
|
color_output(
|
|
"Skipping other checks as the documents order has changed", model="info"
|
|
)
|
|
return None
|
|
|
|
self.check_documents_score()
|
|
|
|
|
|
class SelectionAnalysis:
|
|
def __init__(
|
|
self,
|
|
exectype: str,
|
|
analysisfiles: list = [],
|
|
queries: list = [],
|
|
threshold: float = 0.0,
|
|
web_port: int = 3000,
|
|
auth_cookie: str = "",
|
|
wait: int = 10,
|
|
) -> None:
|
|
"""
|
|
|
|
Args:
|
|
exectype (str): The execution mode (new or compare)
|
|
analysisfiles (list, optional): List of analysis files to compare or if only one, to use as the base. Defaults to [].
|
|
Requiered only by the 'compare' mode
|
|
queries (list, optional): The queries to analysed. Defaults to [].
|
|
Required only by the 'new' mode
|
|
threshold (float, optional): The minimum difference (percentage) between scores to raise an anomaly
|
|
web_port (int, optional): The port of the UI. Defaults to 3000 (local exec port)
|
|
auth_cookie (str, optional): The Auth cookie value (fastapiusersauth). Defaults to None.
|
|
wait (int, optional): The waiting time (in seconds) to respect between queries.
|
|
It is helpful to avoid hitting the Generative AI rate limiting.
|
|
"""
|
|
self._exectype = exectype
|
|
self._analysisfiles = analysisfiles
|
|
self._queries = queries
|
|
self._threshold = threshold
|
|
self._web_port = web_port
|
|
self._auth_cookie = auth_cookie
|
|
self._wait = wait
|
|
|
|
def _wait_between_queries(self, query: str) -> None:
|
|
"""If there are remaining queries, waits for the defined time.
|
|
|
|
Args:
|
|
query (str): The latest executed query
|
|
"""
|
|
if query != self._queries[-1]:
|
|
color_output(f"Next query in {self._wait} seconds", model="debug")
|
|
time.sleep(self._wait)
|
|
|
|
def prepare(self) -> bool:
|
|
"""Create the requirements to execute this script
|
|
|
|
Returns:
|
|
bool: True if all the requirements are setup. False otherwise
|
|
"""
|
|
try:
|
|
os.makedirs(ANALYSIS_FOLDER, exist_ok=True)
|
|
return True
|
|
except Exception as e:
|
|
color_output(f"Unable to setup the requirements: {e}", model="critical")
|
|
return False
|
|
|
|
def do_request(self, query: str) -> dict:
|
|
"""Request the Onyx API
|
|
|
|
Args:
|
|
query (str): A query
|
|
|
|
Returns:
|
|
dict: The Onyx API response content
|
|
"""
|
|
cookies = (
|
|
{FASTAPI_USERS_AUTH_COOKIE_NAME: self._auth_cookie}
|
|
if self._auth_cookie
|
|
else {}
|
|
)
|
|
|
|
endpoint = f"http://127.0.0.1:{self._web_port}/api/direct-qa"
|
|
query_json = {
|
|
"query": query,
|
|
"collection": DOCUMENT_INDEX_NAME,
|
|
"filters": {SOURCE_TYPE: None},
|
|
"enable_auto_detect_filters": True,
|
|
"search_type": "hybrid",
|
|
"offset": 0,
|
|
"favor_recent": True,
|
|
}
|
|
try:
|
|
response = requests.post(endpoint, json=query_json, cookies=cookies)
|
|
if response.status_code != 200:
|
|
color_output(
|
|
(
|
|
"something goes wrong while requesting the Onyx API "
|
|
f"for the query '{query}': {response.text}"
|
|
),
|
|
model="critical",
|
|
)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
color_output(
|
|
f"Unable to request the Onyx API for the query '{query}': {e}",
|
|
model="critical",
|
|
)
|
|
sys.exit(1)
|
|
|
|
return json.loads(response.content)
|
|
|
|
def get_analysis_files(self) -> list[str]:
|
|
"""Returns the list of existing analysis files.
|
|
|
|
Returns:
|
|
list[str]: List of filename
|
|
"""
|
|
return [f for f in listdir(ANALYSIS_FOLDER) if isfile(join(ANALYSIS_FOLDER, f))]
|
|
|
|
def get_analysis_file_content(self, filename: str) -> list[dict]:
|
|
"""Returns the content of an analysis file
|
|
|
|
Args:
|
|
filename (str): The analysis filename
|
|
|
|
Returns:
|
|
list[dict]: Content of the selected file
|
|
"""
|
|
with open(f"{ANALYSIS_FOLDER}{filename}", "r") as f:
|
|
return json.load(f)
|
|
|
|
def extract_content(self, contents: dict) -> dict:
|
|
"""Extract the content returns by the Onyx API
|
|
|
|
Args:
|
|
contents (dict): The onyx response content
|
|
|
|
Returns:
|
|
dict: Data regarding the selected sources document
|
|
"""
|
|
return {
|
|
pos: doc
|
|
for pos, doc in enumerate(
|
|
sorted(
|
|
contents["top_ranked_docs"], key=lambda d: d["score"], reverse=True
|
|
)[:5]
|
|
)
|
|
}
|
|
|
|
def save_analysisfile(self, content: list[dict]) -> Optional[str]:
|
|
"""Save the extracted content
|
|
|
|
Args:
|
|
content (list[dict]): The content to save
|
|
|
|
Returns:
|
|
str: The filname
|
|
"""
|
|
filename = datetime.now().strftime("%Y_%m_%d-%I_%M_%S")
|
|
analysis_file = f"{ANALYSIS_FOLDER}{filename}.json"
|
|
|
|
try:
|
|
with open(analysis_file, "w") as f:
|
|
json.dump(content, f, indent=4)
|
|
except Exception as e:
|
|
color_output(f"Unable to create the analysis file: {e}", model="critical")
|
|
return None
|
|
|
|
color_output(f"Analysis file created: {analysis_file}", model="debug")
|
|
return analysis_file
|
|
|
|
def new(self) -> Optional[str]:
|
|
"""Manage the process to create a new analysis file
|
|
based on the submitted queries
|
|
|
|
Returns:
|
|
str: The new filename with the analysis content
|
|
"""
|
|
if not self._queries:
|
|
color_output("Missing queries", model="critical")
|
|
sys.exit(1)
|
|
|
|
color_output("Generating a new analysis file...", model="debug")
|
|
analysisfile = []
|
|
|
|
for query in self._queries:
|
|
color_output(f"Gathering data of the query: '{query}'", model="info2")
|
|
contents = self.do_request(query)
|
|
|
|
analysisfile.append(
|
|
{"query": query, "selected_documents": self.extract_content(contents)}
|
|
)
|
|
color_output("Data gathered", model="info2")
|
|
self._wait_between_queries(query)
|
|
|
|
return self.save_analysisfile(analysisfile)
|
|
|
|
def compare(
|
|
self,
|
|
previous_analysisfile_content: list[dict],
|
|
new_analysisfile_content: list[dict],
|
|
) -> None:
|
|
"""Manage the process to compare two analysis
|
|
|
|
Args:
|
|
previous_analysisfile_content (list): Previous content analysis
|
|
new_analysisfile_content (list): New content analysis
|
|
"""
|
|
for query in self._queries:
|
|
# Extract data regarding the selected source documents
|
|
prev_querie_content = [
|
|
x for x in previous_analysisfile_content if x["query"] == query
|
|
][0]["selected_documents"]
|
|
new_querie_content = [
|
|
x for x in new_analysisfile_content if x["query"] == query
|
|
][0]["selected_documents"]
|
|
|
|
color_output(f"Analysing the query: '{query}'", model="info2")
|
|
CompareAnalysis(
|
|
query, prev_querie_content, new_querie_content, self._threshold
|
|
)()
|
|
color_output(f"Analyse completed for the query: '{query}'", model="info2")
|
|
self._wait_between_queries(query)
|
|
|
|
color_output("All the defined queries have been evaluated.", model="info2")
|
|
|
|
def validate_analysisfiles(self) -> bool:
|
|
"""Validate that the selected analysis files exist
|
|
|
|
Returns:
|
|
bool: True if all of them exist. False otherwise
|
|
"""
|
|
existing_analysisfiles = self.get_analysis_files()
|
|
|
|
if missing_analysisfiles := [
|
|
x for x in self._analysisfiles if x not in existing_analysisfiles
|
|
]:
|
|
color_output(
|
|
f"Missing analysis file(s) '{', '.join(missing_analysisfiles)}' - NOT FOUND",
|
|
model="critical",
|
|
)
|
|
analysisfiles = "\n ".join(existing_analysisfiles)
|
|
color_output("Available analysis files:", model="info2")
|
|
color_output(analysisfiles)
|
|
return False
|
|
|
|
return True
|
|
|
|
def __call__(self) -> None:
|
|
if not self.prepare():
|
|
sys.exit(1)
|
|
|
|
if self._exectype == "new":
|
|
self.new()
|
|
|
|
elif self._exectype == "compare":
|
|
self._analysisfiles = [
|
|
x.replace(".json", "") + ".json" for x in self._analysisfiles
|
|
]
|
|
|
|
if not self.validate_analysisfiles():
|
|
sys.exit(1)
|
|
|
|
color_output(
|
|
"Extracting queries from the existing analysis file...", model="debug"
|
|
)
|
|
previous_analysisfile_content = self.get_analysis_file_content(
|
|
self._analysisfiles[0]
|
|
)
|
|
|
|
# Extract the queries
|
|
self._queries = sorted([x["query"] for x in previous_analysisfile_content])
|
|
color_output(
|
|
f"Extracted queries: {', '.join(self._queries)}", model="debug"
|
|
)
|
|
|
|
if len(self._analysisfiles) == 1:
|
|
if new_file := self.new():
|
|
new_analysisfile_content = self.get_analysis_file_content(
|
|
new_file.split("/")[-1:][0]
|
|
)
|
|
return self.compare(
|
|
previous_analysisfile_content, new_analysisfile_content
|
|
)
|
|
else:
|
|
color_output(
|
|
"Unable to generate a new analysis file", model="critical"
|
|
)
|
|
sys.exit(1)
|
|
else:
|
|
color_output(
|
|
(
|
|
f"For the rest of this execution, the analysis file '{self._analysisfiles[0]}' "
|
|
f"is identified as 'previous' and '{self._analysisfiles[1]}' as 'current'"
|
|
),
|
|
model="info2",
|
|
)
|
|
new_analysisfile_content = self.get_analysis_file_content(
|
|
self._analysisfiles[1]
|
|
)
|
|
new_queries = sorted([x["query"] for x in new_analysisfile_content])
|
|
if new_queries != self._queries:
|
|
color_output(
|
|
"Unable to compare analysis files as the queries are differents",
|
|
model="critical",
|
|
)
|
|
sys.exit(1)
|
|
self.compare(previous_analysisfile_content, new_analysisfile_content)
|
|
|
|
|
|
def validate_cmd_args(args: argparse.Namespace) -> bool:
|
|
"""Validate the CMD arguments
|
|
|
|
Args:
|
|
args (argparse.Namespace): The argparse data input
|
|
|
|
Returns:
|
|
bool: True if the CMD arguments are valid. False otherwise
|
|
"""
|
|
if not args.execution:
|
|
color_output(
|
|
"Missing argument. The execution mode ('--execution') must be defined ('new' or 'compare')",
|
|
model="critical",
|
|
)
|
|
return False
|
|
if args.execution == "new" and not args.q__queries:
|
|
color_output(
|
|
"Missing argument. When the execution type is set to 'new' the '--queries' argument must be defined",
|
|
model="critical",
|
|
)
|
|
return False
|
|
elif args.execution == "compare":
|
|
if not args.files:
|
|
color_output(
|
|
"Missing argument. When the execution type is set to 'compare' the '--files' argument must be defined",
|
|
model="critical",
|
|
)
|
|
return False
|
|
elif len(args.files) > 2:
|
|
color_output(
|
|
"Too many arguments. The '--files' argument cannot be repeated more than 2 times.",
|
|
model="critical",
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"-a",
|
|
"--auth",
|
|
type=str,
|
|
default=None,
|
|
help=(
|
|
"Currently, to get this script working when the Onyx Auth is "
|
|
"enabled, you must extract from the UI your cookie 'fastapiusersauth' "
|
|
"and then set it using this argument"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"-e",
|
|
"--execution",
|
|
type=str,
|
|
choices=["new", "compare"],
|
|
default=None,
|
|
help=(
|
|
"The execution type. Must be 'new' to generate a new analysis file "
|
|
"or 'compare' to compare a previous execution with a new one based on the same queries"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--files",
|
|
action="extend",
|
|
default=[],
|
|
nargs=1,
|
|
help=(
|
|
"Analysis file(s) to use for the comparison. Required if the execution arg is set "
|
|
"to 'compare'. NOTE: By repeating this argument, you can make a comparison between "
|
|
"two specific executions. If not repeated, a new execution will be performed and "
|
|
"compared with the selected one."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--port",
|
|
type=int,
|
|
default=3000,
|
|
help=(
|
|
"The Onyx Web (not the API) port. We use the UI to forward the requests to the API. "
|
|
"It should be '3000' for local dev and '80' if Onyx runs using docker compose."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"-q" "--queries",
|
|
type=str,
|
|
action="extend",
|
|
default=[],
|
|
nargs=1,
|
|
help=(
|
|
"The query to evaluate. Required if the execution arg is set to 'new'. "
|
|
"NOTE: This argument can be repeated multiple times"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--threshold",
|
|
type=float,
|
|
default=0.0,
|
|
help="The minimum score change (percentage) to detect an issue.",
|
|
)
|
|
parser.add_argument(
|
|
"-w",
|
|
"--wait",
|
|
type=int,
|
|
default=10,
|
|
help=(
|
|
"The waiting time (in seconds) to respect between queries. "
|
|
"It is helpful to avoid hitting the Generative AI rate limiting."
|
|
),
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
if not validate_cmd_args(args):
|
|
sys.exit(1)
|
|
|
|
SelectionAnalysis(
|
|
args.execution,
|
|
args.files,
|
|
args.q__queries,
|
|
args.threshold,
|
|
args.port,
|
|
args.auth,
|
|
args.wait,
|
|
)()
|