writing data

This commit is contained in:
joachim-danswer 2025-03-16 12:40:09 -07:00
parent 83d5b3b503
commit ab11bf6552

View File

@ -1,9 +1,12 @@
import csv
import json
import os
import string
from collections.abc import Callable
from collections.abc import Mapping
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
@ -285,62 +288,139 @@ def parallel_visit_api_retrieval(
return inference_chunks
@retry(tries=3, delay=1, backoff=2)
def _append_ranking_stats_to_csv(
ranking_stats: list[tuple[str, float, str, str, str, float]],
csv_path: str = "/tmp/ranking_stats.csv",
) -> None:
"""
Append ranking statistics to a CSV file.
Args:
ranking_stats: List of tuples containing (query, hit_position, document_id)
csv_path: Path to the CSV file to append to
"""
file_exists = os.path.isfile(csv_path)
# Create directory if it doesn't exist
csv_dir = os.path.dirname(csv_path)
if csv_dir and not os.path.exists(csv_dir):
Path(csv_dir).mkdir(parents=True, exist_ok=True)
with open(csv_path, mode="a", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
# Write header if file is new
if not file_exists:
writer.writerow(
["query_alpha", "query", "hit_position", "document_id", "relevance"]
)
# Write the ranking stats
for cat, query_alpha, query, hit_pos, doc_chunk_id, relevance in ranking_stats:
writer.writerow([cat, query_alpha, query, hit_pos, doc_chunk_id, relevance])
logger.debug(f"Appended {len(ranking_stats)} ranking stats to {csv_path}")
@retry(tries=1, delay=1, backoff=2)
def query_vespa(
query_params: Mapping[str, str | int | float]
) -> list[InferenceChunkUncleaned]:
if "query" in query_params and not cast(str, query_params["query"]).strip():
raise ValueError("No/empty query received")
params = dict(
**query_params,
**{
"presentation.timing": True,
}
if LOG_VESPA_TIMING_INFORMATION
else {},
ranking_stats: list[tuple[str, float, str, str, str, float]] = []
search_time = 0.0
for query_alpha in [0.4, 0.7, 1.0]:
date_time_start = datetime.now()
# Create a mutable copy of the query_params
mutable_params = dict(query_params)
# Now we can modify it without mypy errors
mutable_params["input.query(alpha)"] = query_alpha
params = dict(
**mutable_params,
**{
"presentation.timing": True,
}
if LOG_VESPA_TIMING_INFORMATION
else {},
)
try:
with get_vespa_http_client() as http_client:
response = http_client.post(SEARCH_ENDPOINT, json=params)
response.raise_for_status()
except httpx.HTTPError as e:
error_base = "Failed to query Vespa"
logger.error(
f"{error_base}:\n"
f"Request URL: {e.request.url}\n"
f"Request Headers: {e.request.headers}\n"
f"Request Payload: {params}\n"
f"Exception: {str(e)}"
+ (
f"\nResponse: {e.response.text}"
if isinstance(e, httpx.HTTPStatusError)
else ""
)
)
raise httpx.HTTPError(error_base) from e
response_json: dict[str, Any] = response.json()
if LOG_VESPA_TIMING_INFORMATION:
logger.debug("Vespa timing info: %s", response_json.get("timing"))
hits = response_json["root"].get("children", [])
if not hits:
logger.warning(
f"No hits found for YQL Query: {query_params.get('yql', 'No YQL Query')}"
)
logger.debug(f"Vespa Response: {response.text}")
for hit in hits:
if hit["fields"].get(CONTENT) is None:
identifier = hit["fields"].get("documentid") or hit["id"]
logger.error(
f"Vespa Index with Vespa ID {identifier} has no contents. "
f"This is invalid because the vector is not meaningful and keywordsearch cannot "
f"fetch this document"
)
for hit_pos, hit in enumerate(hits):
ranking_stats.append(
(
"Retrieval",
query_alpha,
cast(str, mutable_params["query"]),
str(hit_pos),
hit["fields"].get("document_id", "")
+ "__"
+ str(hit["fields"].get("chunk_id", "")),
hit.get("relevance", 0),
)
)
date_time_end = datetime.now()
search_time += (date_time_end - date_time_start).microseconds / 1000000
ranking_stats.append(
(
"Timing",
query_alpha,
cast(str, query_params["query"]).strip(),
"",
"",
search_time,
)
)
try:
with get_vespa_http_client() as http_client:
response = http_client.post(SEARCH_ENDPOINT, json=params)
response.raise_for_status()
except httpx.HTTPError as e:
error_base = "Failed to query Vespa"
logger.error(
f"{error_base}:\n"
f"Request URL: {e.request.url}\n"
f"Request Headers: {e.request.headers}\n"
f"Request Payload: {params}\n"
f"Exception: {str(e)}"
+ (
f"\nResponse: {e.response.text}"
if isinstance(e, httpx.HTTPStatusError)
else ""
)
)
raise httpx.HTTPError(error_base) from e
response_json: dict[str, Any] = response.json()
if LOG_VESPA_TIMING_INFORMATION:
logger.debug("Vespa timing info: %s", response_json.get("timing"))
hits = response_json["root"].get("children", [])
if not hits:
logger.warning(
f"No hits found for YQL Query: {query_params.get('yql', 'No YQL Query')}"
)
logger.debug(f"Vespa Response: {response.text}")
for hit in hits:
if hit["fields"].get(CONTENT) is None:
identifier = hit["fields"].get("documentid") or hit["id"]
logger.error(
f"Vespa Index with Vespa ID {identifier} has no contents. "
f"This is invalid because the vector is not meaningful and keywordsearch cannot "
f"fetch this document"
)
if ranking_stats:
_append_ranking_stats_to_csv(ranking_stats)
filtered_hits = [hit for hit in hits if hit["fields"].get(CONTENT) is not None]