Compare commits

...

3 Commits

Author SHA1 Message Date
joachim-danswer
625936306f final examples and logging 2025-03-16 13:06:19 -07:00
joachim-danswer
ab11bf6552 writing data 2025-03-16 12:40:09 -07:00
joachim-danswer
83d5b3b503 question loop 2025-03-16 12:06:10 -07:00
2 changed files with 742 additions and 606 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,12 @@
import csv
import json
import os
import string
from collections.abc import Callable
from collections.abc import Mapping
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
@ -285,67 +288,147 @@ def parallel_visit_api_retrieval(
return inference_chunks
@retry(tries=3, delay=1, backoff=2)
def _append_ranking_stats_to_csv(
ranking_stats: list[tuple[str, float, str, str, str, float]],
csv_path: str = "/tmp/ranking_stats.csv",
) -> None:
"""
Append ranking statistics to a CSV file.
Args:
ranking_stats: List of tuples containing (query, hit_position, document_id)
csv_path: Path to the CSV file to append to
"""
file_exists = os.path.isfile(csv_path)
# Create directory if it doesn't exist
csv_dir = os.path.dirname(csv_path)
if csv_dir and not os.path.exists(csv_dir):
Path(csv_dir).mkdir(parents=True, exist_ok=True)
with open(csv_path, mode="a", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
# Write header if file is new
if not file_exists:
writer.writerow(
["query_alpha", "query", "hit_position", "document_id", "relevance"]
)
# Write the ranking stats
for cat, query_alpha, query, hit_pos, doc_chunk_id, relevance in ranking_stats:
writer.writerow([cat, query_alpha, query, hit_pos, doc_chunk_id, relevance])
logger.debug(f"Appended {len(ranking_stats)} ranking stats to {csv_path}")
@retry(tries=1, delay=1, backoff=2)
def query_vespa(
query_params: Mapping[str, str | int | float]
) -> list[InferenceChunkUncleaned]:
if "query" in query_params and not cast(str, query_params["query"]).strip():
raise ValueError("No/empty query received")
params = dict(
**query_params,
**{
"presentation.timing": True,
}
if LOG_VESPA_TIMING_INFORMATION
else {},
ranking_stats: list[tuple[str, float, str, str, str, float]] = []
search_time = 0.0
alphas: list[float] = [0.4, 0.7, 1.0]
for query_alpha in alphas:
date_time_start = datetime.now()
# Create a mutable copy of the query_params
mutable_params = dict(query_params)
# Now we can modify it without mypy errors
mutable_params["input.query(alpha)"] = query_alpha
params = dict(
**mutable_params,
**{
"presentation.timing": True,
}
if LOG_VESPA_TIMING_INFORMATION
else {},
)
try:
with get_vespa_http_client() as http_client:
response = http_client.post(SEARCH_ENDPOINT, json=params)
response.raise_for_status()
except httpx.HTTPError as e:
error_base = "Failed to query Vespa"
logger.error(
f"{error_base}:\n"
f"Request URL: {e.request.url}\n"
f"Request Headers: {e.request.headers}\n"
f"Request Payload: {params}\n"
f"Exception: {str(e)}"
+ (
f"\nResponse: {e.response.text}"
if isinstance(e, httpx.HTTPStatusError)
else ""
)
)
raise httpx.HTTPError(error_base) from e
response_json: dict[str, Any] = response.json()
if LOG_VESPA_TIMING_INFORMATION:
logger.debug("Vespa timing info: %s", response_json.get("timing"))
hits = response_json["root"].get("children", [])
if not hits:
logger.warning(
f"No hits found for YQL Query: {query_params.get('yql', 'No YQL Query')}"
)
logger.debug(f"Vespa Response: {response.text}")
for hit in hits:
if hit["fields"].get(CONTENT) is None:
identifier = hit["fields"].get("documentid") or hit["id"]
logger.error(
f"Vespa Index with Vespa ID {identifier} has no contents. "
f"This is invalid because the vector is not meaningful and keywordsearch cannot "
f"fetch this document"
)
for hit_pos, hit in enumerate(hits):
ranking_stats.append(
(
"Retrieval",
query_alpha,
cast(str, mutable_params["query"]),
str(hit_pos),
hit["fields"].get("document_id", "")
+ "__"
+ str(hit["fields"].get("chunk_id", "")),
hit.get("relevance", 0),
)
)
date_time_end = datetime.now()
search_time += (date_time_end - date_time_start).microseconds / 1000000
avg_search_time = search_time / len(alphas)
ranking_stats.append(
(
"Timing",
query_alpha,
cast(str, query_params["query"]).strip(),
"",
"Avg:",
avg_search_time,
)
)
try:
with get_vespa_http_client() as http_client:
response = http_client.post(SEARCH_ENDPOINT, json=params)
response.raise_for_status()
except httpx.HTTPError as e:
error_base = "Failed to query Vespa"
logger.error(
f"{error_base}:\n"
f"Request URL: {e.request.url}\n"
f"Request Headers: {e.request.headers}\n"
f"Request Payload: {params}\n"
f"Exception: {str(e)}"
+ (
f"\nResponse: {e.response.text}"
if isinstance(e, httpx.HTTPStatusError)
else ""
)
)
raise httpx.HTTPError(error_base) from e
response_json: dict[str, Any] = response.json()
if LOG_VESPA_TIMING_INFORMATION:
logger.debug("Vespa timing info: %s", response_json.get("timing"))
hits = response_json["root"].get("children", [])
if not hits:
logger.warning(
f"No hits found for YQL Query: {query_params.get('yql', 'No YQL Query')}"
)
logger.debug(f"Vespa Response: {response.text}")
for hit in hits:
if hit["fields"].get(CONTENT) is None:
identifier = hit["fields"].get("documentid") or hit["id"]
logger.error(
f"Vespa Index with Vespa ID {identifier} has no contents. "
f"This is invalid because the vector is not meaningful and keywordsearch cannot "
f"fetch this document"
)
if ranking_stats:
_append_ranking_stats_to_csv(ranking_stats)
filtered_hits = [hit for hit in hits if hit["fields"].get(CONTENT) is not None]
inference_chunks = [_vespa_hit_to_inference_chunk(hit) for hit in filtered_hits]
# Good Debugging Spot
logger.info(f"Search done for all alphs - avg timing: {avg_search_time}")
return inference_chunks