mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-09 20:39:29 +02:00
Better vespa interface (#3781)
* k * much cleaner vespa util class * log * typing * improvement * improve
This commit is contained in:
parent
f2d4024783
commit
7f10494bbe
@ -42,24 +42,22 @@ def _fetch_permissions_for_permission_ids(
|
||||
if not permission_info or not doc_id:
|
||||
return []
|
||||
|
||||
# Check cache first for all permission IDs
|
||||
permissions = [
|
||||
_PERMISSION_ID_PERMISSION_MAP[pid]
|
||||
for pid in permission_ids
|
||||
if pid in _PERMISSION_ID_PERMISSION_MAP
|
||||
]
|
||||
|
||||
# If we found all permissions in cache, return them
|
||||
if len(permissions) == len(permission_ids):
|
||||
return permissions
|
||||
|
||||
owner_email = permission_info.get("owner_email")
|
||||
|
||||
drive_service = get_drive_service(
|
||||
creds=google_drive_connector.creds,
|
||||
user_email=(owner_email or google_drive_connector.primary_admin_email),
|
||||
)
|
||||
|
||||
# Otherwise, fetch all permissions and update cache
|
||||
fetched_permissions = execute_paginated_retrieval(
|
||||
retrieval_function=drive_service.permissions().list,
|
||||
list_key="permissions",
|
||||
@ -69,7 +67,6 @@ def _fetch_permissions_for_permission_ids(
|
||||
)
|
||||
|
||||
permissions_for_doc_id = []
|
||||
# Update cache and return all permissions
|
||||
for permission in fetched_permissions:
|
||||
permissions_for_doc_id.append(permission)
|
||||
_PERMISSION_ID_PERMISSION_MAP[permission["id"]] = permission
|
||||
|
@ -21,35 +21,144 @@ Options:
|
||||
--doc-id : Document ID
|
||||
--fields : Fields to update (JSON)
|
||||
|
||||
Example: (gets docs for a given tenant id and connector id)
|
||||
Example:
|
||||
python vespa_debug_tool.py --action list_docs --tenant-id my_tenant --connector-id 1 --n 5
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from uuid import UUID
|
||||
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import and_
|
||||
|
||||
from onyx.configs.constants import INDEX_SEPARATOR
|
||||
from onyx.context.search.models import IndexFilters
|
||||
from onyx.context.search.models import SearchRequest
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.models import Document
|
||||
from onyx.db.models import DocumentByConnectorCredentialPair
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_document_chunk_ids
|
||||
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
|
||||
from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST
|
||||
from onyx.document_index.vespa_constants import DOC_UPDATED_AT
|
||||
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
|
||||
from onyx.document_index.vespa_constants import DOCUMENT_SETS
|
||||
from onyx.document_index.vespa_constants import HIDDEN
|
||||
from onyx.document_index.vespa_constants import METADATA_LIST
|
||||
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT
|
||||
from onyx.document_index.vespa_constants import SOURCE_TYPE
|
||||
from onyx.document_index.vespa_constants import TENANT_ID
|
||||
from onyx.document_index.vespa_constants import VESPA_APP_CONTAINER_URL
|
||||
from onyx.document_index.vespa_constants import VESPA_APPLICATION_ENDPOINT
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class DocumentFilter(BaseModel):
|
||||
# Document filter for link matching.
|
||||
link: str | None = None
|
||||
|
||||
|
||||
def build_vespa_filters(
|
||||
filters: IndexFilters,
|
||||
*,
|
||||
include_hidden: bool = False,
|
||||
remove_trailing_and: bool = False,
|
||||
) -> str:
|
||||
# Build a combined Vespa filter string from the given IndexFilters.
|
||||
def _build_or_filters(key: str, vals: list[str] | None) -> str:
|
||||
if vals is None:
|
||||
return ""
|
||||
valid_vals = [val for val in vals if val]
|
||||
if not key or not valid_vals:
|
||||
return ""
|
||||
eq_elems = [f'{key} contains "{elem}"' for elem in valid_vals]
|
||||
or_clause = " or ".join(eq_elems)
|
||||
return f"({or_clause})"
|
||||
|
||||
def _build_time_filter(
|
||||
cutoff: datetime | None,
|
||||
untimed_doc_cutoff: timedelta = timedelta(days=92),
|
||||
) -> str:
|
||||
if not cutoff:
|
||||
return ""
|
||||
include_untimed = datetime.now(timezone.utc) - untimed_doc_cutoff > cutoff
|
||||
cutoff_secs = int(cutoff.timestamp())
|
||||
if include_untimed:
|
||||
return f"!({DOC_UPDATED_AT} < {cutoff_secs})"
|
||||
return f"({DOC_UPDATED_AT} >= {cutoff_secs})"
|
||||
|
||||
filter_str = ""
|
||||
if not include_hidden:
|
||||
filter_str += f"AND !({HIDDEN}=true) "
|
||||
|
||||
if filters.tenant_id and MULTI_TENANT:
|
||||
filter_str += f'AND ({TENANT_ID} contains "{filters.tenant_id}") '
|
||||
|
||||
if filters.access_control_list is not None:
|
||||
acl_str = _build_or_filters(ACCESS_CONTROL_LIST, filters.access_control_list)
|
||||
if acl_str:
|
||||
filter_str += f"AND {acl_str} "
|
||||
|
||||
source_strs = (
|
||||
[s.value for s in filters.source_type] if filters.source_type else None
|
||||
)
|
||||
source_str = _build_or_filters(SOURCE_TYPE, source_strs)
|
||||
if source_str:
|
||||
filter_str += f"AND {source_str} "
|
||||
|
||||
tags = filters.tags
|
||||
if tags:
|
||||
tag_attributes = [tag.tag_key + INDEX_SEPARATOR + tag.tag_value for tag in tags]
|
||||
else:
|
||||
tag_attributes = None
|
||||
tag_str = _build_or_filters(METADATA_LIST, tag_attributes)
|
||||
if tag_str:
|
||||
filter_str += f"AND {tag_str} "
|
||||
|
||||
doc_set_str = _build_or_filters(DOCUMENT_SETS, filters.document_set)
|
||||
if doc_set_str:
|
||||
filter_str += f"AND {doc_set_str} "
|
||||
|
||||
time_filter = _build_time_filter(filters.time_cutoff)
|
||||
if time_filter:
|
||||
filter_str += f"AND {time_filter} "
|
||||
|
||||
if remove_trailing_and:
|
||||
while filter_str.endswith(" and "):
|
||||
filter_str = filter_str[:-5]
|
||||
while filter_str.endswith("AND "):
|
||||
filter_str = filter_str[:-4]
|
||||
|
||||
return filter_str.strip()
|
||||
|
||||
|
||||
# Print Vespa configuration URLs
|
||||
def print_vespa_config() -> None:
|
||||
# Print Vespa configuration.
|
||||
logger.info("Printing Vespa configuration.")
|
||||
print(f"Vespa Application Endpoint: {VESPA_APPLICATION_ENDPOINT}")
|
||||
print(f"Vespa App Container URL: {VESPA_APP_CONTAINER_URL}")
|
||||
print(f"Vespa Search Endpoint: {SEARCH_ENDPOINT}")
|
||||
print(f"Vespa Document ID Endpoint: {DOCUMENT_ID_ENDPOINT}")
|
||||
|
||||
|
||||
# Check connectivity to Vespa endpoints
|
||||
def check_vespa_connectivity() -> None:
|
||||
# Check connectivity to Vespa endpoints.
|
||||
logger.info("Checking Vespa connectivity.")
|
||||
endpoints = [
|
||||
f"{VESPA_APPLICATION_ENDPOINT}/ApplicationStatus",
|
||||
f"{VESPA_APPLICATION_ENDPOINT}/tenant",
|
||||
@ -61,17 +170,21 @@ def check_vespa_connectivity() -> None:
|
||||
try:
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.get(endpoint)
|
||||
logger.info(
|
||||
f"Connected to Vespa at {endpoint}, status code {response.status_code}"
|
||||
)
|
||||
print(f"Successfully connected to Vespa at {endpoint}")
|
||||
print(f"Status code: {response.status_code}")
|
||||
print(f"Response: {response.text[:200]}...")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Vespa at {endpoint}: {str(e)}")
|
||||
print(f"Failed to connect to Vespa at {endpoint}: {str(e)}")
|
||||
|
||||
print("Vespa connectivity check completed.")
|
||||
|
||||
|
||||
# Get info about the default Vespa application
|
||||
def get_vespa_info() -> Dict[str, Any]:
|
||||
# Get info about the default Vespa application.
|
||||
url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/application/default"
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.get(url)
|
||||
@ -79,121 +192,298 @@ def get_vespa_info() -> Dict[str, Any]:
|
||||
return response.json()
|
||||
|
||||
|
||||
# Get index name for a tenant and connector pair
|
||||
def get_index_name(tenant_id: str, connector_id: int) -> str:
|
||||
def get_index_name(tenant_id: str) -> str:
|
||||
# Return the index name for a given tenant.
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
|
||||
cc_pair = get_connector_credential_pair_from_id(db_session, connector_id)
|
||||
if not cc_pair:
|
||||
raise ValueError(f"No connector found for id {connector_id}")
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
return search_settings.index_name if search_settings else "public"
|
||||
if not search_settings:
|
||||
raise ValueError(f"No search settings found for tenant {tenant_id}")
|
||||
return search_settings.index_name
|
||||
|
||||
|
||||
# Perform a Vespa query using YQL syntax
|
||||
def query_vespa(yql: str) -> List[Dict[str, Any]]:
|
||||
params = {
|
||||
"yql": yql,
|
||||
"timeout": "10s",
|
||||
}
|
||||
def query_vespa(
|
||||
yql: str, tenant_id: Optional[str] = None, limit: int = 10
|
||||
) -> List[Dict[str, Any]]:
|
||||
# Perform a Vespa query using YQL syntax.
|
||||
filters = IndexFilters(tenant_id=tenant_id, access_control_list=[])
|
||||
filter_string = build_vespa_filters(filters, remove_trailing_and=True)
|
||||
full_yql = yql.strip()
|
||||
if filter_string:
|
||||
full_yql = f"{full_yql} {filter_string}"
|
||||
full_yql = f"{full_yql} limit {limit}"
|
||||
|
||||
params = {"yql": full_yql, "timeout": "10s"}
|
||||
search_request = SearchRequest(query="", limit=limit, offset=0)
|
||||
params.update(search_request.model_dump())
|
||||
|
||||
logger.info(f"Executing Vespa query: {full_yql}")
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.get(SEARCH_ENDPOINT, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json()["root"]["children"]
|
||||
result = response.json()
|
||||
documents = result.get("root", {}).get("children", [])
|
||||
logger.info(f"Found {len(documents)} documents from query.")
|
||||
return documents
|
||||
|
||||
|
||||
# Get first N documents
|
||||
def get_first_n_documents(n: int = 10) -> List[Dict[str, Any]]:
|
||||
yql = f"select * from sources * where true limit {n};"
|
||||
return query_vespa(yql)
|
||||
# Get the first n documents from any source.
|
||||
yql = "select * from sources * where true"
|
||||
return query_vespa(yql, limit=n)
|
||||
|
||||
|
||||
# Pretty-print a list of documents
|
||||
def print_documents(documents: List[Dict[str, Any]]) -> None:
|
||||
# Pretty-print a list of documents.
|
||||
for doc in documents:
|
||||
print(json.dumps(doc, indent=2))
|
||||
print("-" * 80)
|
||||
|
||||
|
||||
# Get and print documents for a specific tenant and connector
|
||||
def get_documents_for_tenant_connector(
|
||||
tenant_id: str, connector_id: int, n: int = 10
|
||||
) -> None:
|
||||
get_index_name(tenant_id, connector_id)
|
||||
documents = get_first_n_documents(n)
|
||||
print(f"First {n} documents for tenant {tenant_id}, connector {connector_id}:")
|
||||
# Get and print documents for a specific tenant and connector.
|
||||
index_name = get_index_name(tenant_id)
|
||||
logger.info(
|
||||
f"Fetching documents for tenant={tenant_id}, connector_id={connector_id}"
|
||||
)
|
||||
yql = f"select * from sources {index_name} where true"
|
||||
documents = query_vespa(yql, tenant_id, limit=n)
|
||||
print(
|
||||
f"First {len(documents)} documents for tenant {tenant_id}, connector {connector_id}:"
|
||||
)
|
||||
print_documents(documents)
|
||||
|
||||
|
||||
# Search documents for a specific tenant and connector
|
||||
def search_documents(
|
||||
tenant_id: str, connector_id: int, query: str, n: int = 10
|
||||
) -> None:
|
||||
index_name = get_index_name(tenant_id, connector_id)
|
||||
yql = f"select * from sources {index_name} where userInput(@query) limit {n};"
|
||||
documents = query_vespa(yql)
|
||||
print(f"Search results for query '{query}':")
|
||||
# Search documents for a specific tenant and connector.
|
||||
index_name = get_index_name(tenant_id)
|
||||
logger.info(
|
||||
f"Searching documents for tenant={tenant_id}, connector_id={connector_id}, query='{query}'"
|
||||
)
|
||||
yql = f"select * from sources {index_name} where userInput(@query)"
|
||||
documents = query_vespa(yql, tenant_id, limit=n)
|
||||
print(f"Search results for query '{query}' in tenant {tenant_id}:")
|
||||
print_documents(documents)
|
||||
|
||||
|
||||
# Update a specific document
|
||||
def update_document(
|
||||
tenant_id: str, connector_id: int, doc_id: str, fields: Dict[str, Any]
|
||||
) -> None:
|
||||
index_name = get_index_name(tenant_id, connector_id)
|
||||
# Update a specific document.
|
||||
index_name = get_index_name(tenant_id)
|
||||
logger.info(
|
||||
f"Updating document doc_id={doc_id} in tenant={tenant_id}, connector_id={connector_id}"
|
||||
)
|
||||
url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name) + f"/{doc_id}"
|
||||
update_request = {"fields": {k: {"assign": v} for k, v in fields.items()}}
|
||||
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.put(url, json=update_request)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Document {doc_id} updated successfully.")
|
||||
print(f"Document {doc_id} updated successfully")
|
||||
|
||||
|
||||
# Delete a specific document
|
||||
def delete_document(tenant_id: str, connector_id: int, doc_id: str) -> None:
|
||||
index_name = get_index_name(tenant_id, connector_id)
|
||||
# Delete a specific document.
|
||||
index_name = get_index_name(tenant_id)
|
||||
logger.info(
|
||||
f"Deleting document doc_id={doc_id} in tenant={tenant_id}, connector_id={connector_id}"
|
||||
)
|
||||
url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name) + f"/{doc_id}"
|
||||
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.delete(url)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Document {doc_id} deleted successfully.")
|
||||
print(f"Document {doc_id} deleted successfully")
|
||||
|
||||
|
||||
# List documents from any source
|
||||
def list_documents(n: int = 10) -> None:
|
||||
yql = f"select * from sources * where true limit {n};"
|
||||
url = f"{VESPA_APP_CONTAINER_URL}/search/"
|
||||
params = {
|
||||
"yql": yql,
|
||||
"timeout": "10s",
|
||||
}
|
||||
try:
|
||||
with get_vespa_http_client() as client:
|
||||
response = client.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
documents = response.json()["root"]["children"]
|
||||
print(f"First {n} documents:")
|
||||
print_documents(documents)
|
||||
except Exception as e:
|
||||
print(f"Failed to list documents: {str(e)}")
|
||||
|
||||
|
||||
# Get and print ACLs for documents of a specific tenant and connector
|
||||
def get_document_acls(tenant_id: str, connector_id: int, n: int = 10) -> None:
|
||||
index_name = get_index_name(tenant_id, connector_id)
|
||||
yql = f"select documentid, access_control_list from sources {index_name} where true limit {n};"
|
||||
documents = query_vespa(yql)
|
||||
print(f"ACLs for {n} documents from tenant {tenant_id}, connector {connector_id}:")
|
||||
for doc in documents:
|
||||
print(f"Document ID: {doc['fields']['documentid']}")
|
||||
print(
|
||||
f"ACL: {json.dumps(doc['fields'].get('access_control_list', {}), indent=2)}"
|
||||
)
|
||||
def list_documents(n: int = 10, tenant_id: Optional[str] = None) -> None:
|
||||
# List documents from any source, filtered by tenant if provided.
|
||||
logger.info(f"Listing up to {n} documents for tenant={tenant_id or 'ALL'}")
|
||||
yql = "select * from sources * where true"
|
||||
if tenant_id:
|
||||
yql += f" and tenant_id contains '{tenant_id}'"
|
||||
documents = query_vespa(yql, tenant_id=tenant_id, limit=n)
|
||||
print(f"Total documents found: {len(documents)}")
|
||||
logger.info(f"Total documents found: {len(documents)}")
|
||||
print(f"First {min(n, len(documents))} documents:")
|
||||
for doc in documents[:n]:
|
||||
print(json.dumps(doc, indent=2))
|
||||
print("-" * 80)
|
||||
|
||||
|
||||
def get_document_and_chunk_counts(
|
||||
tenant_id: str, cc_pair_id: int, filter_doc: DocumentFilter | None = None
|
||||
) -> Dict[str, int]:
|
||||
# Return a dict mapping each document ID to its chunk count for a given connector.
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as session:
|
||||
doc_ids_data = (
|
||||
session.query(DocumentByConnectorCredentialPair.id, Document.link)
|
||||
.join(
|
||||
ConnectorCredentialPair,
|
||||
and_(
|
||||
DocumentByConnectorCredentialPair.connector_id
|
||||
== ConnectorCredentialPair.connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id
|
||||
== ConnectorCredentialPair.credential_id,
|
||||
),
|
||||
)
|
||||
.join(Document, DocumentByConnectorCredentialPair.id == Document.id)
|
||||
.filter(ConnectorCredentialPair.id == cc_pair_id)
|
||||
.distinct()
|
||||
.all()
|
||||
)
|
||||
doc_ids = []
|
||||
for doc_id, link in doc_ids_data:
|
||||
if filter_doc and filter_doc.link:
|
||||
if link and filter_doc.link.lower() in link.lower():
|
||||
doc_ids.append(doc_id)
|
||||
else:
|
||||
doc_ids.append(doc_id)
|
||||
chunk_counts_data = (
|
||||
session.query(Document.id, Document.chunk_count)
|
||||
.filter(Document.id.in_(doc_ids))
|
||||
.all()
|
||||
)
|
||||
return {
|
||||
doc_id: chunk_count
|
||||
for doc_id, chunk_count in chunk_counts_data
|
||||
if chunk_count is not None
|
||||
}
|
||||
|
||||
|
||||
def get_chunk_ids_for_connector(
|
||||
tenant_id: str,
|
||||
cc_pair_id: int,
|
||||
index_name: str,
|
||||
filter_doc: DocumentFilter | None = None,
|
||||
) -> List[UUID]:
|
||||
# Return chunk IDs for a given connector.
|
||||
doc_id_to_new_chunk_cnt = get_document_and_chunk_counts(
|
||||
tenant_id, cc_pair_id, filter_doc
|
||||
)
|
||||
doc_infos: List[EnrichedDocumentIndexingInfo] = [
|
||||
VespaIndex.enrich_basic_chunk_info(
|
||||
index_name=index_name,
|
||||
http_client=get_vespa_http_client(),
|
||||
document_id=doc_id,
|
||||
previous_chunk_count=doc_id_to_new_chunk_cnt.get(doc_id, 0),
|
||||
new_chunk_count=0,
|
||||
)
|
||||
for doc_id in doc_id_to_new_chunk_cnt.keys()
|
||||
]
|
||||
chunk_ids = get_document_chunk_ids(
|
||||
enriched_document_info_list=doc_infos,
|
||||
tenant_id=tenant_id,
|
||||
large_chunks_enabled=False,
|
||||
)
|
||||
if not isinstance(chunk_ids, list):
|
||||
raise ValueError(f"Expected list of chunk IDs, got {type(chunk_ids)}")
|
||||
return chunk_ids
|
||||
|
||||
|
||||
def get_document_acls(
|
||||
tenant_id: str,
|
||||
cc_pair_id: int,
|
||||
n: int | None = 10,
|
||||
filter_doc: DocumentFilter | None = None,
|
||||
) -> None:
|
||||
# Fetch document ACLs for the given tenant and connector pair.
|
||||
index_name = get_index_name(tenant_id)
|
||||
logger.info(
|
||||
f"Fetching document ACLs for tenant={tenant_id}, cc_pair_id={cc_pair_id}"
|
||||
)
|
||||
chunk_ids: List[UUID] = get_chunk_ids_for_connector(
|
||||
tenant_id, cc_pair_id, index_name, filter_doc
|
||||
)
|
||||
vespa_client = get_vespa_http_client()
|
||||
|
||||
target_ids = chunk_ids if n is None else chunk_ids[:n]
|
||||
logger.info(
|
||||
f"Found {len(chunk_ids)} chunk IDs, showing ACLs for {len(target_ids)}."
|
||||
)
|
||||
for doc_chunk_id in target_ids:
|
||||
document_url = (
|
||||
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{str(doc_chunk_id)}"
|
||||
)
|
||||
response = vespa_client.get(document_url)
|
||||
if response.status_code == 200:
|
||||
fields = response.json().get("fields", {})
|
||||
document_id = fields.get("document_id") or fields.get(
|
||||
"documentid", "Unknown"
|
||||
)
|
||||
acls = fields.get("access_control_list", {})
|
||||
title = fields.get("title", "")
|
||||
source_type = fields.get("source_type", "")
|
||||
source_links_raw = fields.get("source_links", "{}")
|
||||
try:
|
||||
source_links = json.loads(source_links_raw)
|
||||
except json.JSONDecodeError:
|
||||
source_links = {}
|
||||
|
||||
print(f"Document Chunk ID: {doc_chunk_id}")
|
||||
print(f"Document ID: {document_id}")
|
||||
print(f"ACLs:\n{json.dumps(acls, indent=2)}")
|
||||
print(f"Source Links: {source_links}")
|
||||
print(f"Title: {title}")
|
||||
print(f"Source Type: {source_type}")
|
||||
if MULTI_TENANT:
|
||||
print(f"Tenant ID: {fields.get('tenant_id', 'N/A')}")
|
||||
print("-" * 80)
|
||||
else:
|
||||
logger.error(f"Failed to fetch document for chunk ID: {doc_chunk_id}")
|
||||
print(f"Failed to fetch document for chunk ID: {doc_chunk_id}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
print("-" * 80)
|
||||
|
||||
|
||||
class VespaDebugging:
|
||||
# Class for managing Vespa debugging actions.
|
||||
def __init__(self, tenant_id: str | None = None):
|
||||
self.tenant_id = POSTGRES_DEFAULT_SCHEMA if not tenant_id else tenant_id
|
||||
|
||||
def print_config(self) -> None:
|
||||
# Print Vespa config.
|
||||
print_vespa_config()
|
||||
|
||||
def check_connectivity(self) -> None:
|
||||
# Check Vespa connectivity.
|
||||
check_vespa_connectivity()
|
||||
|
||||
def list_documents(self, n: int = 10) -> None:
|
||||
# List documents for a tenant.
|
||||
list_documents(n, self.tenant_id)
|
||||
|
||||
def search_documents(self, connector_id: int, query: str, n: int = 10) -> None:
|
||||
# Search documents for a tenant and connector.
|
||||
search_documents(self.tenant_id, connector_id, query, n)
|
||||
|
||||
def update_document(
|
||||
self, connector_id: int, doc_id: str, fields: Dict[str, Any]
|
||||
) -> None:
|
||||
# Update a document.
|
||||
update_document(self.tenant_id, connector_id, doc_id, fields)
|
||||
|
||||
def delete_document(self, connector_id: int, doc_id: str) -> None:
|
||||
# Delete a document.
|
||||
delete_document(self.tenant_id, connector_id, doc_id)
|
||||
|
||||
def acls_by_link(self, cc_pair_id: int, link: str) -> None:
|
||||
# Get ACLs for a document matching a link.
|
||||
get_document_acls(
|
||||
self.tenant_id, cc_pair_id, n=None, filter_doc=DocumentFilter(link=link)
|
||||
)
|
||||
|
||||
def acls(self, cc_pair_id: int, n: int | None = 10) -> None:
|
||||
# Get ACLs for a connector.
|
||||
get_document_acls(self.tenant_id, cc_pair_id, n)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
# Main CLI entry point.
|
||||
parser = argparse.ArgumentParser(description="Vespa debugging tool")
|
||||
parser.add_argument(
|
||||
"--action",
|
||||
@ -209,60 +499,45 @@ def main() -> None:
|
||||
required=True,
|
||||
help="Action to perform",
|
||||
)
|
||||
parser.add_argument("--tenant-id", help="Tenant ID")
|
||||
parser.add_argument("--connector-id", type=int, help="Connector ID")
|
||||
parser.add_argument(
|
||||
"--tenant-id", help="Tenant ID (for update, delete, and get_acls actions)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--connector-id",
|
||||
type=int,
|
||||
help="Connector ID (for update, delete, and get_acls actions)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--n",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Number of documents to retrieve (for list_docs, search, update, and get_acls actions)",
|
||||
"--n", type=int, default=10, help="Number of documents to retrieve"
|
||||
)
|
||||
parser.add_argument("--query", help="Search query (for search action)")
|
||||
parser.add_argument("--doc-id", help="Document ID (for update and delete actions)")
|
||||
parser.add_argument(
|
||||
"--fields", help="Fields to update, in JSON format (for update action)"
|
||||
"--fields", help="Fields to update, in JSON format (for update)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
vespa_debug = VespaDebugging(args.tenant_id)
|
||||
|
||||
if args.action == "config":
|
||||
print_vespa_config()
|
||||
vespa_debug.print_config()
|
||||
elif args.action == "connect":
|
||||
check_vespa_connectivity()
|
||||
vespa_debug.check_connectivity()
|
||||
elif args.action == "list_docs":
|
||||
# If tenant_id and connector_id are provided, list docs for that tenant/connector.
|
||||
# Otherwise, list documents from any source.
|
||||
if args.tenant_id and args.connector_id:
|
||||
get_documents_for_tenant_connector(
|
||||
args.tenant_id, args.connector_id, args.n
|
||||
)
|
||||
else:
|
||||
list_documents(args.n)
|
||||
vespa_debug.list_documents(args.n)
|
||||
elif args.action == "search":
|
||||
if not args.query:
|
||||
parser.error("--query is required for search action")
|
||||
search_documents(args.tenant_id, args.connector_id, args.query, args.n)
|
||||
if not args.query or args.connector_id is None:
|
||||
parser.error("--query and --connector-id are required for search action")
|
||||
vespa_debug.search_documents(args.connector_id, args.query, args.n)
|
||||
elif args.action == "update":
|
||||
if not args.doc_id or not args.fields:
|
||||
parser.error("--doc-id and --fields are required for update action")
|
||||
fields = json.loads(args.fields)
|
||||
update_document(args.tenant_id, args.connector_id, args.doc_id, fields)
|
||||
elif args.action == "delete":
|
||||
if not args.doc_id:
|
||||
parser.error("--doc-id is required for delete action")
|
||||
delete_document(args.tenant_id, args.connector_id, args.doc_id)
|
||||
elif args.action == "get_acls":
|
||||
if not args.tenant_id or args.connector_id is None:
|
||||
if not args.doc_id or not args.fields or args.connector_id is None:
|
||||
parser.error(
|
||||
"--tenant-id and --connector-id are required for get_acls action"
|
||||
"--doc-id, --fields, and --connector-id are required for update action"
|
||||
)
|
||||
get_document_acls(args.tenant_id, args.connector_id, args.n)
|
||||
fields = json.loads(args.fields)
|
||||
vespa_debug.update_document(args.connector_id, args.doc_id, fields)
|
||||
elif args.action == "delete":
|
||||
if not args.doc_id or args.connector_id is None:
|
||||
parser.error("--doc-id and --connector-id are required for delete action")
|
||||
vespa_debug.delete_document(args.connector_id, args.doc_id)
|
||||
elif args.action == "get_acls":
|
||||
if args.connector_id is None:
|
||||
parser.error("--connector-id is required for get_acls action")
|
||||
vespa_debug.acls(args.connector_id, args.n)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
x
Reference in New Issue
Block a user