Add delete all for tenants in Vespa (#3970)

This commit is contained in:
pablonyx 2025-02-13 14:33:49 -08:00 committed by GitHub
parent 667b9e04c5
commit c6434db7eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -256,16 +256,28 @@ def get_documents_for_tenant_connector(
def search_for_document(
index_name: str, document_id: str, max_hits: int | None = 10
index_name: str,
document_id: str | None = None,
tenant_id: str | None = None,
max_hits: int | None = 10,
) -> List[Dict[str, Any]]:
yql_query = (
f'select * from sources {index_name} where document_id contains "{document_id}"'
)
yql_query = f"select * from sources {index_name}"
conditions = []
if document_id is not None:
conditions.append(f'document_id contains "{document_id}"')
if tenant_id is not None:
conditions.append(f'tenant_id contains "{tenant_id}"')
if conditions:
yql_query += " where " + " and ".join(conditions)
params: dict[str, Any] = {"yql": yql_query}
if max_hits is not None:
params["hits"] = max_hits
with get_vespa_http_client() as client:
response = client.get(f"{SEARCH_ENDPOINT}/search/", params=params)
response = client.get(f"{SEARCH_ENDPOINT}search/", params=params)
response.raise_for_status()
result = response.json()
documents = result.get("root", {}).get("children", [])
@ -582,8 +594,15 @@ class VespaDebugging:
) -> None:
update_document(self.tenant_id, connector_id, doc_id, fields)
def search_for_document(self, document_id: str) -> List[Dict[str, Any]]:
return search_for_document(self.index_name, document_id)
def delete_documents_for_tenant(self, count: int | None = None) -> None:
if not self.tenant_id:
raise Exception("Tenant ID is not set")
delete_documents_for_tenant(self.index_name, self.tenant_id, count=count)
def search_for_document(
self, document_id: str | None = None, tenant_id: str | None = None
) -> List[Dict[str, Any]]:
return search_for_document(self.index_name, document_id, tenant_id)
def delete_document(self, connector_id: int, doc_id: str) -> None:
# Delete a document.
@ -600,6 +619,147 @@ class VespaDebugging:
get_document_acls(self.tenant_id, cc_pair_id, n)
def delete_where(
index_name: str,
selection: str,
cluster: str = "default",
bucket_space: str | None = None,
continuation: str | None = None,
time_chunk: str | None = None,
timeout: str | None = None,
tracelevel: int | None = None,
) -> None:
"""
Removes visited documents in `cluster` where the given selection
is true, using Vespa's 'delete where' endpoint.
:param index_name: Typically <namespace>/<document-type> from your schema
:param selection: The selection string, e.g., "true" or "foo contains 'bar'"
:param cluster: The name of the cluster where documents reside
:param bucket_space: e.g. 'global' or 'default'
:param continuation: For chunked visits
:param time_chunk: If you want to chunk the visit by time
:param timeout: e.g. '10s'
:param tracelevel: Increase for verbose logs
"""
# Using index_name of form <namespace>/<document-type>, e.g. "nomic_ai_nomic_embed_text_v1"
# This route ends with "/docid/" since the actual ID is not specified — we rely on "selection".
path = f"/document/v1/{index_name}/docid/"
params = {
"cluster": cluster,
"selection": selection,
}
# Optional parameters
if bucket_space is not None:
params["bucketSpace"] = bucket_space
if continuation is not None:
params["continuation"] = continuation
if time_chunk is not None:
params["timeChunk"] = time_chunk
if timeout is not None:
params["timeout"] = timeout
if tracelevel is not None:
params["tracelevel"] = tracelevel # type: ignore
with get_vespa_http_client() as client:
url = f"{VESPA_APPLICATION_ENDPOINT}{path}"
logger.info(f"Performing 'delete where' on {url} with selection={selection}...")
response = client.delete(url, params=params)
# (Optionally, you can keep fetching `continuation` from the JSON response
# if you have more documents to delete in chunks.)
response.raise_for_status() # will raise HTTPError if not 2xx
logger.info(f"Delete where completed with status: {response.status_code}")
print(f"Delete where completed with status: {response.status_code}")
def delete_documents_for_tenant(
index_name: str,
tenant_id: str,
route: str | None = None,
condition: str | None = None,
timeout: str | None = None,
tracelevel: int | None = None,
count: int | None = None,
) -> None:
"""
For the given tenant_id and index_name (often in the form <namespace>/<document-type>),
find documents via search_for_document, then delete them one at a time using Vespa's
/document/v1/<namespace>/<document-type>/docid/<document-id> endpoint.
:param index_name: Typically <namespace>/<document-type> from your schema
:param tenant_id: The tenant to match in your Vespa search
:param route: Optional route parameter for delete
:param condition: Optional conditional remove
:param timeout: e.g. '10s'
:param tracelevel: Increase for verbose logs
"""
deleted_count = 0
while True:
# Search for documents with the given tenant_id
docs = search_for_document(
index_name=index_name,
document_id=None,
tenant_id=tenant_id,
max_hits=100, # Fetch in batches of 100
)
if not docs:
logger.info("No more documents found to delete.")
break
with get_vespa_http_client() as client:
for doc in docs:
if count is not None and deleted_count >= count:
logger.info(f"Reached maximum delete limit of {count} documents.")
return
fields = doc.get("fields", {})
doc_id_value = fields.get("document_id") or fields.get("documentid")
tenant_id = fields.get("tenant_id")
if tenant_id != tenant_id:
raise Exception("Tenant ID mismatch")
if not doc_id_value:
logger.warning(
"Skipping a document that has no document_id in 'fields'."
)
continue
url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_id_value}"
params = {}
if condition:
params["condition"] = condition
if route:
params["route"] = route
if timeout:
params["timeout"] = timeout
if tracelevel is not None:
params["tracelevel"] = str(tracelevel)
response = client.delete(url, params=params)
if response.status_code == 200:
logger.info(f"Successfully deleted doc_id={doc_id_value}")
deleted_count += 1
else:
logger.error(
f"Failed to delete doc_id={doc_id_value}, "
f"status={response.status_code}, response={response.text}"
)
print(
f"Could not delete doc_id={doc_id_value}. "
f"Status={response.status_code}, response={response.text}"
)
raise Exception(
f"Could not delete doc_id={doc_id_value}. "
f"Status={response.status_code}, response={response.text}"
)
logger.info(f"Deleted {deleted_count} documents in total.")
def main() -> None:
parser = argparse.ArgumentParser(description="Vespa debugging tool")
parser.add_argument(
@ -630,7 +790,9 @@ def main() -> None:
args = parser.parse_args()
vespa_debug = VespaDebugging(args.tenant_id)
if args.action == "config":
if args.action == "delete-all-documents":
vespa_debug.delete_documents_for_tenant(args.count)
elif args.action == "config":
vespa_debug.print_config()
elif args.action == "connect":
vespa_debug.check_connectivity()