From ca988f5c5f8fea5dc14b5a00e1f885fa85cf3bdc Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Tue, 10 Dec 2024 16:06:47 -0800 Subject: [PATCH] Max File Size (#3422) * k * k * k --- backend/danswer/configs/app_configs.py | 15 ++- .../connectors/google_drive/connector.py | 15 ++- .../connectors/google_drive/file_retrieval.py | 2 +- backend/danswer/indexing/indexing_pipeline.py | 85 ++++++++----- backend/danswer/seeding/load_docs.py | 2 +- .../indexing/test_indexing_pipeline.py | 120 ++++++++++++++++++ .../docker_compose/docker-compose.dev.yml | 2 + deployment/kubernetes/env-configmap.yaml | 2 + 8 files changed, 206 insertions(+), 37 deletions(-) create mode 100644 backend/tests/unit/danswer/indexing/test_indexing_pipeline.py diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index c7e1c45e20..09d0c6162c 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -417,21 +417,28 @@ LARGE_CHUNK_RATIO = 4 # We don't want the metadata to overwhelm the actual contents of the chunk SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true" # Timeout to wait for job's last update before killing it, in hours -CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 3)) +CLEANUP_INDEXING_JOBS_TIMEOUT = int( + os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT") or 3 +) # The indexer will warn in the logs whenver a document exceeds this threshold (in bytes) INDEXING_SIZE_WARNING_THRESHOLD = int( - os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD", 100 * 1024 * 1024) + os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD") or 100 * 1024 * 1024 ) # during indexing, will log verbose memory diff stats every x batches and at the end. # 0 disables this behavior and is the default. -INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0)) +INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL") or 0) # During an indexing attempt, specifies the number of batches which are allowed to # exception without aborting the attempt. -INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0)) +INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT") or 0) +# Maximum file size in a document to be indexed +MAX_DOCUMENT_CHARS = int(os.environ.get("MAX_DOCUMENT_CHARS") or 5_000_000) +MAX_FILE_SIZE_BYTES = int( + os.environ.get("MAX_FILE_SIZE_BYTES") or 2 * 1024 * 1024 * 1024 +) # 2GB in bytes ##### # Miscellaneous diff --git a/backend/danswer/connectors/google_drive/connector.py b/backend/danswer/connectors/google_drive/connector.py index ad929eb090..1b03c703db 100644 --- a/backend/danswer/connectors/google_drive/connector.py +++ b/backend/danswer/connectors/google_drive/connector.py @@ -4,11 +4,13 @@ from concurrent.futures import as_completed from concurrent.futures import ThreadPoolExecutor from functools import partial from typing import Any +from typing import cast from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore from danswer.configs.app_configs import INDEX_BATCH_SIZE +from danswer.configs.app_configs import MAX_FILE_SIZE_BYTES from danswer.configs.constants import DocumentSource from danswer.connectors.google_drive.doc_conversion import build_slim_document from danswer.connectors.google_drive.doc_conversion import ( @@ -452,12 +454,14 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector): if isinstance(self.creds, ServiceAccountCredentials) else self._manage_oauth_retrieval ) - return retrieval_method( + drive_files = retrieval_method( is_slim=is_slim, start=start, end=end, ) + return drive_files + def _extract_docs_from_google_drive( self, start: SecondsSinceUnixEpoch | None = None, @@ -473,6 +477,15 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector): files_to_process = [] # Gather the files into batches to be processed in parallel for file in self._fetch_drive_items(is_slim=False, start=start, end=end): + if ( + file.get("size") + and int(cast(str, file.get("size"))) > MAX_FILE_SIZE_BYTES + ): + logger.warning( + f"Skipping file {file.get('name', 'Unknown')} as it is too large: {file.get('size')} bytes" + ) + continue + files_to_process.append(file) if len(files_to_process) >= LARGE_BATCH_SIZE: yield from _process_files_batch( diff --git a/backend/danswer/connectors/google_drive/file_retrieval.py b/backend/danswer/connectors/google_drive/file_retrieval.py index 962d531b07..9b9b17a8c2 100644 --- a/backend/danswer/connectors/google_drive/file_retrieval.py +++ b/backend/danswer/connectors/google_drive/file_retrieval.py @@ -16,7 +16,7 @@ logger = setup_logger() FILE_FIELDS = ( "nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, " - "shortcutDetails, owners(emailAddress))" + "shortcutDetails, owners(emailAddress), size)" ) SLIM_FILE_FIELDS = ( "nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), " diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index b1ee8f4d94..bace61cec8 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -1,4 +1,5 @@ import traceback +from collections.abc import Callable from functools import partial from http import HTTPStatus from typing import Protocol @@ -12,6 +13,7 @@ from danswer.access.access import get_access_for_documents from danswer.access.models import DocumentAccess from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT +from danswer.configs.app_configs import MAX_DOCUMENT_CHARS from danswer.configs.constants import DEFAULT_BOOST from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, @@ -202,40 +204,13 @@ def index_doc_batch_with_handler( def index_doc_batch_prepare( - document_batch: list[Document], + documents: list[Document], index_attempt_metadata: IndexAttemptMetadata, db_session: Session, ignore_time_skip: bool = False, ) -> DocumentBatchPrepareContext | None: """Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc. This preceeds indexing it into the actual document index.""" - documents: list[Document] = [] - for document in document_batch: - empty_contents = not any(section.text.strip() for section in document.sections) - if ( - (not document.title or not document.title.strip()) - and not document.semantic_identifier.strip() - and empty_contents - ): - # Skip documents that have neither title nor content - # If the document doesn't have either, then there is no useful information in it - # This is again verified later in the pipeline after chunking but at that point there should - # already be no documents that are empty. - logger.warning( - f"Skipping document with ID {document.id} as it has neither title nor content." - ) - continue - - if document.title is not None and not document.title.strip() and empty_contents: - # The title is explicitly empty ("" and not None) and the document is empty - # so when building the chunk text representation, it will be empty and unuseable - logger.warning( - f"Skipping document with ID {document.id} as the chunks will be empty." - ) - continue - - documents.append(document) - # Create a trimmed list of docs that don't have a newer updated at # Shortcuts the time-consuming flow on connector index retries document_ids: list[str] = [document.id for document in documents] @@ -282,17 +257,64 @@ def index_doc_batch_prepare( ) +def filter_documents(document_batch: list[Document]) -> list[Document]: + documents: list[Document] = [] + for document in document_batch: + empty_contents = not any(section.text.strip() for section in document.sections) + if ( + (not document.title or not document.title.strip()) + and not document.semantic_identifier.strip() + and empty_contents + ): + # Skip documents that have neither title nor content + # If the document doesn't have either, then there is no useful information in it + # This is again verified later in the pipeline after chunking but at that point there should + # already be no documents that are empty. + logger.warning( + f"Skipping document with ID {document.id} as it has neither title nor content." + ) + continue + + if document.title is not None and not document.title.strip() and empty_contents: + # The title is explicitly empty ("" and not None) and the document is empty + # so when building the chunk text representation, it will be empty and unuseable + logger.warning( + f"Skipping document with ID {document.id} as the chunks will be empty." + ) + continue + + section_chars = sum(len(section.text) for section in document.sections) + if ( + MAX_DOCUMENT_CHARS + and len(document.title or document.semantic_identifier) + section_chars + > MAX_DOCUMENT_CHARS + ): + # Skip documents that are too long, later on there are more memory intensive steps done on the text + # and the container will run out of memory and crash. Several other checks are included upstream but + # those are at the connector level so a catchall is still needed. + # Assumption here is that files that are that long, are generated files and not the type users + # generally care for. + logger.warning( + f"Skipping document with ID {document.id} as it is too long." + ) + continue + + documents.append(document) + return documents + + @log_function_time(debug_only=True) def index_doc_batch( *, + document_batch: list[Document], chunker: Chunker, embedder: IndexingEmbedder, document_index: DocumentIndex, - document_batch: list[Document], index_attempt_metadata: IndexAttemptMetadata, db_session: Session, ignore_time_skip: bool = False, tenant_id: str | None = None, + filter_fnc: Callable[[list[Document]], list[Document]] = filter_documents, ) -> tuple[int, int]: """Takes different pieces of the indexing pipeline and applies it to a batch of documents Note that the documents should already be batched at this point so that it does not inflate the @@ -309,8 +331,11 @@ def index_doc_batch( is_public=False, ) + logger.debug("Filtering Documents") + filtered_documents = filter_fnc(document_batch) + ctx = index_doc_batch_prepare( - document_batch=document_batch, + documents=filtered_documents, index_attempt_metadata=index_attempt_metadata, ignore_time_skip=ignore_time_skip, db_session=db_session, diff --git a/backend/danswer/seeding/load_docs.py b/backend/danswer/seeding/load_docs.py index 1567f7f6bb..5fe591423f 100644 --- a/backend/danswer/seeding/load_docs.py +++ b/backend/danswer/seeding/load_docs.py @@ -196,7 +196,7 @@ def seed_initial_documents( docs, chunks = _create_indexable_chunks(processed_docs, tenant_id) index_doc_batch_prepare( - document_batch=docs, + documents=docs, index_attempt_metadata=IndexAttemptMetadata( connector_id=connector_id, credential_id=PUBLIC_CREDENTIAL_ID, diff --git a/backend/tests/unit/danswer/indexing/test_indexing_pipeline.py b/backend/tests/unit/danswer/indexing/test_indexing_pipeline.py new file mode 100644 index 0000000000..612535f67e --- /dev/null +++ b/backend/tests/unit/danswer/indexing/test_indexing_pipeline.py @@ -0,0 +1,120 @@ +from typing import List + +from danswer.configs.app_configs import MAX_DOCUMENT_CHARS +from danswer.connectors.models import Document +from danswer.connectors.models import DocumentSource +from danswer.connectors.models import Section +from danswer.indexing.indexing_pipeline import filter_documents + + +def create_test_document( + doc_id: str = "test_id", + title: str | None = "Test Title", + semantic_id: str = "test_semantic_id", + sections: List[Section] | None = None, +) -> Document: + if sections is None: + sections = [Section(text="Test content", link="test_link")] + return Document( + id=doc_id, + title=title, + semantic_identifier=semantic_id, + sections=sections, + source=DocumentSource.FILE, + metadata={}, + ) + + +def test_filter_documents_empty_title_and_content() -> None: + doc = create_test_document( + title="", semantic_id="", sections=[Section(text="", link="test_link")] + ) + result = filter_documents([doc]) + assert len(result) == 0 + + +def test_filter_documents_empty_title_with_content() -> None: + doc = create_test_document( + title="", sections=[Section(text="Valid content", link="test_link")] + ) + result = filter_documents([doc]) + assert len(result) == 1 + assert result[0].id == "test_id" + + +def test_filter_documents_empty_content_with_title() -> None: + doc = create_test_document( + title="Valid Title", sections=[Section(text="", link="test_link")] + ) + result = filter_documents([doc]) + assert len(result) == 1 + assert result[0].id == "test_id" + + +def test_filter_documents_exceeding_max_chars() -> None: + if not MAX_DOCUMENT_CHARS: # Skip if no max chars configured + return + long_text = "a" * (MAX_DOCUMENT_CHARS + 1) + doc = create_test_document(sections=[Section(text=long_text, link="test_link")]) + result = filter_documents([doc]) + assert len(result) == 0 + + +def test_filter_documents_valid_document() -> None: + doc = create_test_document( + title="Valid Title", sections=[Section(text="Valid content", link="test_link")] + ) + result = filter_documents([doc]) + assert len(result) == 1 + assert result[0].id == "test_id" + assert result[0].title == "Valid Title" + + +def test_filter_documents_whitespace_only() -> None: + doc = create_test_document( + title=" ", semantic_id=" ", sections=[Section(text=" ", link="test_link")] + ) + result = filter_documents([doc]) + assert len(result) == 0 + + +def test_filter_documents_semantic_id_no_title() -> None: + doc = create_test_document( + title=None, + semantic_id="Valid Semantic ID", + sections=[Section(text="Valid content", link="test_link")], + ) + result = filter_documents([doc]) + assert len(result) == 1 + assert result[0].semantic_identifier == "Valid Semantic ID" + + +def test_filter_documents_multiple_sections() -> None: + doc = create_test_document( + sections=[ + Section(text="Content 1", link="test_link"), + Section(text="Content 2", link="test_link"), + Section(text="Content 3", link="test_link"), + ] + ) + result = filter_documents([doc]) + assert len(result) == 1 + assert len(result[0].sections) == 3 + + +def test_filter_documents_multiple_documents() -> None: + docs = [ + create_test_document(doc_id="1", title="Title 1"), + create_test_document( + doc_id="2", title="", sections=[Section(text="", link="test_link")] + ), # Should be filtered + create_test_document(doc_id="3", title="Title 3"), + ] + result = filter_documents(docs) + assert len(result) == 2 + assert {doc.id for doc in result} == {"1", "3"} + + +def test_filter_documents_empty_batch() -> None: + result = filter_documents([]) + assert len(result) == 0 diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index bcd73b729c..55466f086d 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -183,6 +183,8 @@ services: - GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-} - NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-} - GITHUB_CONNECTOR_BASE_URL=${GITHUB_CONNECTOR_BASE_URL:-} + - MAX_DOCUMENT_CHARS=${MAX_DOCUMENT_CHARS:-} + - MAX_FILE_SIZE_BYTES=${MAX_FILE_SIZE_BYTES:-} # Celery Configs (defaults are set in the supervisord.conf file. # prefer doing that to have one source of defaults) - CELERY_WORKER_INDEXING_CONCURRENCY=${CELERY_WORKER_INDEXING_CONCURRENCY:-} diff --git a/deployment/kubernetes/env-configmap.yaml b/deployment/kubernetes/env-configmap.yaml index 176e468c11..84bd674797 100644 --- a/deployment/kubernetes/env-configmap.yaml +++ b/deployment/kubernetes/env-configmap.yaml @@ -61,6 +61,8 @@ data: WEB_CONNECTOR_VALIDATE_URLS: "" GONG_CONNECTOR_START_TIME: "" NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: "" + MAX_DOCUMENT_CHARS: "" + MAX_FILE_SIZE_BYTES: "" # DanswerBot SlackBot Configs DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: "" DANSWER_BOT_DISPLAY_ERROR_MSGS: ""