From a202e2bf9dfc8c556d498c0c4ebf5cc1cbceba23 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 6 Feb 2025 13:30:06 -0800 Subject: [PATCH] Improvements to Redis + Vespa debugging --- backend/Dockerfile | 3 +- .../server/query_and_chat/chat_backend.py | 1 + backend/scripts/{ => debugging}/onyx_redis.py | 33 ++++- backend/scripts/{ => debugging}/onyx_vespa.py | 121 +++++++++++++++++- 4 files changed, 154 insertions(+), 4 deletions(-) rename backend/scripts/{ => debugging}/onyx_redis.py (89%) rename backend/scripts/{ => debugging}/onyx_vespa.py (81%) diff --git a/backend/Dockerfile b/backend/Dockerfile index 7301649091..98280814a1 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -101,7 +101,8 @@ COPY ./alembic_tenants /app/alembic_tenants COPY ./alembic.ini /app/alembic.ini COPY supervisord.conf /usr/etc/supervisord.conf -# Escape hatch +# Escape hatch scripts +COPY ./scripts/debugging /app/scripts/debugging COPY ./scripts/force_delete_connector_by_id.py /app/scripts/force_delete_connector_by_id.py # Put logo in assets diff --git a/backend/onyx/server/query_and_chat/chat_backend.py b/backend/onyx/server/query_and_chat/chat_backend.py index c80a696818..6ee783ec89 100644 --- a/backend/onyx/server/query_and_chat/chat_backend.py +++ b/backend/onyx/server/query_and_chat/chat_backend.py @@ -749,6 +749,7 @@ def upload_files_for_chat( file_name=file.filename or "", ) text_file_id = str(uuid.uuid4()) + file_store.save_file( file_name=text_file_id, content=io.BytesIO(extracted_text.encode()), diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/debugging/onyx_redis.py similarity index 89% rename from backend/scripts/onyx_redis.py rename to backend/scripts/debugging/onyx_redis.py index 10eab4086c..bae0ead621 100644 --- a/backend/scripts/onyx_redis.py +++ b/backend/scripts/debugging/onyx_redis.py @@ -10,6 +10,8 @@ from uuid import UUID from redis import Redis from ee.onyx.server.tenants.user_mapping import get_tenant_id_for_email +from onyx.auth.invited_users import get_invited_users +from onyx.auth.invited_users import write_invited_users from onyx.configs.app_configs import REDIS_AUTH_KEY_PREFIX from onyx.configs.app_configs import REDIS_DB_NUMBER from onyx.configs.app_configs import REDIS_HOST @@ -21,6 +23,7 @@ from onyx.db.users import get_user_by_email from onyx.redis.redis_pool import RedisPool from shared_configs.configs import MULTI_TENANT from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR # Tool to run helpful operations on Redis in production # This is targeted for internal usage and may not have all the necessary parameters @@ -310,6 +313,13 @@ if __name__ == "__main__": required=False, ) + parser.add_argument( + "--tenant-id", + type=str, + help="Tenant ID for get, delete user token, or add to invited users", + required=False, + ) + parser.add_argument( "--batch", type=int, @@ -328,11 +338,32 @@ if __name__ == "__main__": parser.add_argument( "--user-email", type=str, - help="User email for get or delete user token", + help="User email for get, delete user token, or add to invited users", required=False, ) args = parser.parse_args() + + if args.tenant_id: + CURRENT_TENANT_ID_CONTEXTVAR.set(args.tenant_id) + + if args.command == "add_invited_user": + if not args.user_email: + print("Error: --user-email is required for add_invited_user command") + sys.exit(1) + + current_invited_users = get_invited_users() + if args.user_email not in current_invited_users: + current_invited_users.append(args.user_email) + if args.dry_run: + print(f"(DRY-RUN) Would add {args.user_email} to invited users") + else: + write_invited_users(current_invited_users) + print(f"Added {args.user_email} to invited users") + else: + print(f"{args.user_email} is already in the invited users list") + sys.exit(0) + exitcode = onyx_redis( command=args.command, batch=args.batch, diff --git a/backend/scripts/onyx_vespa.py b/backend/scripts/debugging/onyx_vespa.py similarity index 81% rename from backend/scripts/onyx_vespa.py rename to backend/scripts/debugging/onyx_vespa.py index bb1f334929..39c1b8b0d9 100644 --- a/backend/scripts/onyx_vespa.py +++ b/backend/scripts/debugging/onyx_vespa.py @@ -255,6 +255,24 @@ def get_documents_for_tenant_connector( print_documents(documents) +def search_for_document( + index_name: str, document_id: str, max_hits: int | None = 10 +) -> List[Dict[str, Any]]: + yql_query = ( + f'select * from sources {index_name} where document_id contains "{document_id}"' + ) + params: dict[str, Any] = {"yql": yql_query} + if max_hits is not None: + params["hits"] = max_hits + with get_vespa_http_client() as client: + response = client.get(f"{SEARCH_ENDPOINT}/search/", params=params) + response.raise_for_status() + result = response.json() + documents = result.get("root", {}).get("children", []) + logger.info(f"Found {len(documents)} documents from query.") + return documents + + def search_documents( tenant_id: str, connector_id: int, query: str, n: int = 10 ) -> None: @@ -440,10 +458,98 @@ def get_document_acls( print("-" * 80) +def get_current_chunk_count( + document_id: str, index_name: str, tenant_id: str +) -> int | None: + with get_session_with_tenant(tenant_id=tenant_id) as session: + return ( + session.query(Document.chunk_count) + .filter(Document.id == document_id) + .scalar() + ) + + +def get_number_of_chunks_we_think_exist( + document_id: str, index_name: str, tenant_id: str +) -> int: + current_chunk_count = get_current_chunk_count(document_id, index_name, tenant_id) + print(f"Current chunk count: {current_chunk_count}") + + doc_info = VespaIndex.enrich_basic_chunk_info( + index_name=index_name, + http_client=get_vespa_http_client(), + document_id=document_id, + previous_chunk_count=current_chunk_count, + new_chunk_count=0, + ) + + chunk_ids = get_document_chunk_ids( + enriched_document_info_list=[doc_info], + tenant_id=tenant_id, + large_chunks_enabled=False, + ) + return len(chunk_ids) + + class VespaDebugging: # Class for managing Vespa debugging actions. def __init__(self, tenant_id: str | None = None): self.tenant_id = POSTGRES_DEFAULT_SCHEMA if not tenant_id else tenant_id + self.index_name = get_index_name(self.tenant_id) + + def sample_document_counts(self) -> None: + # Sample random documents and compare chunk counts + mismatches = [] + no_chunks = [] + with get_session_with_tenant(tenant_id=self.tenant_id) as session: + # Get a sample of random documents + from sqlalchemy import func + + sample_docs = ( + session.query(Document.id, Document.link, Document.semantic_id) + .order_by(func.random()) + .limit(1000) + .all() + ) + + for doc in sample_docs: + document_id, link, semantic_id = doc + ( + number_of_chunks_in_vespa, + number_of_chunks_we_think_exist, + ) = self.compare_chunk_count(document_id) + if number_of_chunks_in_vespa != number_of_chunks_we_think_exist: + mismatches.append( + ( + document_id, + link, + semantic_id, + number_of_chunks_in_vespa, + number_of_chunks_we_think_exist, + ) + ) + elif number_of_chunks_in_vespa == 0: + no_chunks.append((document_id, link, semantic_id)) + + # Print results + print("\nDocuments with mismatched chunk counts:") + for doc_id, link, semantic_id, vespa_count, expected_count in mismatches: + print(f"Document ID: {doc_id}") + print(f"Link: {link}") + print(f"Semantic ID: {semantic_id}") + print(f"Chunks in Vespa: {vespa_count}") + print(f"Expected chunks: {expected_count}") + print("-" * 80) + + print("\nDocuments with no chunks in Vespa:") + for doc_id, link, semantic_id in no_chunks: + print(f"Document ID: {doc_id}") + print(f"Link: {link}") + print(f"Semantic ID: {semantic_id}") + print("-" * 80) + + print(f"\nTotal mismatches: {len(mismatches)}") + print(f"Total documents with no chunks: {len(no_chunks)}") def print_config(self) -> None: # Print Vespa config. @@ -457,6 +563,16 @@ class VespaDebugging: # List documents for a tenant. list_documents(n, self.tenant_id) + def compare_chunk_count(self, document_id: str) -> tuple[int, int]: + docs = search_for_document(self.index_name, document_id, max_hits=None) + number_of_chunks_we_think_exist = get_number_of_chunks_we_think_exist( + document_id, self.index_name, self.tenant_id + ) + print( + f"Number of chunks in Vespa: {len(docs)}, Number of chunks we think exist: {number_of_chunks_we_think_exist}" + ) + return len(docs), number_of_chunks_we_think_exist + def search_documents(self, connector_id: int, query: str, n: int = 10) -> None: # Search documents for a tenant and connector. search_documents(self.tenant_id, connector_id, query, n) @@ -464,9 +580,11 @@ class VespaDebugging: def update_document( self, connector_id: int, doc_id: str, fields: Dict[str, Any] ) -> None: - # Update a document. update_document(self.tenant_id, connector_id, doc_id, fields) + def search_for_document(self, document_id: str) -> List[Dict[str, Any]]: + return search_for_document(self.index_name, document_id) + def delete_document(self, connector_id: int, doc_id: str) -> None: # Delete a document. delete_document(self.tenant_id, connector_id, doc_id) @@ -483,7 +601,6 @@ class VespaDebugging: def main() -> None: - # Main CLI entry point. parser = argparse.ArgumentParser(description="Vespa debugging tool") parser.add_argument( "--action",