mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-21 21:41:03 +02:00
parent
4e4214b82c
commit
ca988f5c5f
@ -417,21 +417,28 @@ LARGE_CHUNK_RATIO = 4
|
||||
# We don't want the metadata to overwhelm the actual contents of the chunk
|
||||
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"
|
||||
# Timeout to wait for job's last update before killing it, in hours
|
||||
CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 3))
|
||||
CLEANUP_INDEXING_JOBS_TIMEOUT = int(
|
||||
os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT") or 3
|
||||
)
|
||||
|
||||
# The indexer will warn in the logs whenver a document exceeds this threshold (in bytes)
|
||||
INDEXING_SIZE_WARNING_THRESHOLD = int(
|
||||
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD", 100 * 1024 * 1024)
|
||||
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD") or 100 * 1024 * 1024
|
||||
)
|
||||
|
||||
# during indexing, will log verbose memory diff stats every x batches and at the end.
|
||||
# 0 disables this behavior and is the default.
|
||||
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0))
|
||||
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL") or 0)
|
||||
|
||||
# During an indexing attempt, specifies the number of batches which are allowed to
|
||||
# exception without aborting the attempt.
|
||||
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0))
|
||||
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT") or 0)
|
||||
|
||||
# Maximum file size in a document to be indexed
|
||||
MAX_DOCUMENT_CHARS = int(os.environ.get("MAX_DOCUMENT_CHARS") or 5_000_000)
|
||||
MAX_FILE_SIZE_BYTES = int(
|
||||
os.environ.get("MAX_FILE_SIZE_BYTES") or 2 * 1024 * 1024 * 1024
|
||||
) # 2GB in bytes
|
||||
|
||||
#####
|
||||
# Miscellaneous
|
||||
|
@ -4,11 +4,13 @@ from concurrent.futures import as_completed
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import partial
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
|
||||
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
|
||||
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.app_configs import MAX_FILE_SIZE_BYTES
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.google_drive.doc_conversion import build_slim_document
|
||||
from danswer.connectors.google_drive.doc_conversion import (
|
||||
@ -452,12 +454,14 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
if isinstance(self.creds, ServiceAccountCredentials)
|
||||
else self._manage_oauth_retrieval
|
||||
)
|
||||
return retrieval_method(
|
||||
drive_files = retrieval_method(
|
||||
is_slim=is_slim,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
|
||||
return drive_files
|
||||
|
||||
def _extract_docs_from_google_drive(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
@ -473,6 +477,15 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
files_to_process = []
|
||||
# Gather the files into batches to be processed in parallel
|
||||
for file in self._fetch_drive_items(is_slim=False, start=start, end=end):
|
||||
if (
|
||||
file.get("size")
|
||||
and int(cast(str, file.get("size"))) > MAX_FILE_SIZE_BYTES
|
||||
):
|
||||
logger.warning(
|
||||
f"Skipping file {file.get('name', 'Unknown')} as it is too large: {file.get('size')} bytes"
|
||||
)
|
||||
continue
|
||||
|
||||
files_to_process.append(file)
|
||||
if len(files_to_process) >= LARGE_BATCH_SIZE:
|
||||
yield from _process_files_batch(
|
||||
|
@ -16,7 +16,7 @@ logger = setup_logger()
|
||||
|
||||
FILE_FIELDS = (
|
||||
"nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, "
|
||||
"shortcutDetails, owners(emailAddress))"
|
||||
"shortcutDetails, owners(emailAddress), size)"
|
||||
)
|
||||
SLIM_FILE_FIELDS = (
|
||||
"nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), "
|
||||
|
@ -1,4 +1,5 @@
|
||||
import traceback
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from http import HTTPStatus
|
||||
from typing import Protocol
|
||||
@ -12,6 +13,7 @@ from danswer.access.access import get_access_for_documents
|
||||
from danswer.access.models import DocumentAccess
|
||||
from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING
|
||||
from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT
|
||||
from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
|
||||
from danswer.configs.constants import DEFAULT_BOOST
|
||||
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
get_experts_stores_representations,
|
||||
@ -202,40 +204,13 @@ def index_doc_batch_with_handler(
|
||||
|
||||
|
||||
def index_doc_batch_prepare(
|
||||
document_batch: list[Document],
|
||||
documents: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
db_session: Session,
|
||||
ignore_time_skip: bool = False,
|
||||
) -> DocumentBatchPrepareContext | None:
|
||||
"""Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
|
||||
This preceeds indexing it into the actual document index."""
|
||||
documents: list[Document] = []
|
||||
for document in document_batch:
|
||||
empty_contents = not any(section.text.strip() for section in document.sections)
|
||||
if (
|
||||
(not document.title or not document.title.strip())
|
||||
and not document.semantic_identifier.strip()
|
||||
and empty_contents
|
||||
):
|
||||
# Skip documents that have neither title nor content
|
||||
# If the document doesn't have either, then there is no useful information in it
|
||||
# This is again verified later in the pipeline after chunking but at that point there should
|
||||
# already be no documents that are empty.
|
||||
logger.warning(
|
||||
f"Skipping document with ID {document.id} as it has neither title nor content."
|
||||
)
|
||||
continue
|
||||
|
||||
if document.title is not None and not document.title.strip() and empty_contents:
|
||||
# The title is explicitly empty ("" and not None) and the document is empty
|
||||
# so when building the chunk text representation, it will be empty and unuseable
|
||||
logger.warning(
|
||||
f"Skipping document with ID {document.id} as the chunks will be empty."
|
||||
)
|
||||
continue
|
||||
|
||||
documents.append(document)
|
||||
|
||||
# Create a trimmed list of docs that don't have a newer updated at
|
||||
# Shortcuts the time-consuming flow on connector index retries
|
||||
document_ids: list[str] = [document.id for document in documents]
|
||||
@ -282,17 +257,64 @@ def index_doc_batch_prepare(
|
||||
)
|
||||
|
||||
|
||||
def filter_documents(document_batch: list[Document]) -> list[Document]:
|
||||
documents: list[Document] = []
|
||||
for document in document_batch:
|
||||
empty_contents = not any(section.text.strip() for section in document.sections)
|
||||
if (
|
||||
(not document.title or not document.title.strip())
|
||||
and not document.semantic_identifier.strip()
|
||||
and empty_contents
|
||||
):
|
||||
# Skip documents that have neither title nor content
|
||||
# If the document doesn't have either, then there is no useful information in it
|
||||
# This is again verified later in the pipeline after chunking but at that point there should
|
||||
# already be no documents that are empty.
|
||||
logger.warning(
|
||||
f"Skipping document with ID {document.id} as it has neither title nor content."
|
||||
)
|
||||
continue
|
||||
|
||||
if document.title is not None and not document.title.strip() and empty_contents:
|
||||
# The title is explicitly empty ("" and not None) and the document is empty
|
||||
# so when building the chunk text representation, it will be empty and unuseable
|
||||
logger.warning(
|
||||
f"Skipping document with ID {document.id} as the chunks will be empty."
|
||||
)
|
||||
continue
|
||||
|
||||
section_chars = sum(len(section.text) for section in document.sections)
|
||||
if (
|
||||
MAX_DOCUMENT_CHARS
|
||||
and len(document.title or document.semantic_identifier) + section_chars
|
||||
> MAX_DOCUMENT_CHARS
|
||||
):
|
||||
# Skip documents that are too long, later on there are more memory intensive steps done on the text
|
||||
# and the container will run out of memory and crash. Several other checks are included upstream but
|
||||
# those are at the connector level so a catchall is still needed.
|
||||
# Assumption here is that files that are that long, are generated files and not the type users
|
||||
# generally care for.
|
||||
logger.warning(
|
||||
f"Skipping document with ID {document.id} as it is too long."
|
||||
)
|
||||
continue
|
||||
|
||||
documents.append(document)
|
||||
return documents
|
||||
|
||||
|
||||
@log_function_time(debug_only=True)
|
||||
def index_doc_batch(
|
||||
*,
|
||||
document_batch: list[Document],
|
||||
chunker: Chunker,
|
||||
embedder: IndexingEmbedder,
|
||||
document_index: DocumentIndex,
|
||||
document_batch: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
db_session: Session,
|
||||
ignore_time_skip: bool = False,
|
||||
tenant_id: str | None = None,
|
||||
filter_fnc: Callable[[list[Document]], list[Document]] = filter_documents,
|
||||
) -> tuple[int, int]:
|
||||
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
|
||||
Note that the documents should already be batched at this point so that it does not inflate the
|
||||
@ -309,8 +331,11 @@ def index_doc_batch(
|
||||
is_public=False,
|
||||
)
|
||||
|
||||
logger.debug("Filtering Documents")
|
||||
filtered_documents = filter_fnc(document_batch)
|
||||
|
||||
ctx = index_doc_batch_prepare(
|
||||
document_batch=document_batch,
|
||||
documents=filtered_documents,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
ignore_time_skip=ignore_time_skip,
|
||||
db_session=db_session,
|
||||
|
@ -196,7 +196,7 @@ def seed_initial_documents(
|
||||
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
document_batch=docs,
|
||||
documents=docs,
|
||||
index_attempt_metadata=IndexAttemptMetadata(
|
||||
connector_id=connector_id,
|
||||
credential_id=PUBLIC_CREDENTIAL_ID,
|
||||
|
120
backend/tests/unit/danswer/indexing/test_indexing_pipeline.py
Normal file
120
backend/tests/unit/danswer/indexing/test_indexing_pipeline.py
Normal file
@ -0,0 +1,120 @@
|
||||
from typing import List
|
||||
|
||||
from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import DocumentSource
|
||||
from danswer.connectors.models import Section
|
||||
from danswer.indexing.indexing_pipeline import filter_documents
|
||||
|
||||
|
||||
def create_test_document(
|
||||
doc_id: str = "test_id",
|
||||
title: str | None = "Test Title",
|
||||
semantic_id: str = "test_semantic_id",
|
||||
sections: List[Section] | None = None,
|
||||
) -> Document:
|
||||
if sections is None:
|
||||
sections = [Section(text="Test content", link="test_link")]
|
||||
return Document(
|
||||
id=doc_id,
|
||||
title=title,
|
||||
semantic_identifier=semantic_id,
|
||||
sections=sections,
|
||||
source=DocumentSource.FILE,
|
||||
metadata={},
|
||||
)
|
||||
|
||||
|
||||
def test_filter_documents_empty_title_and_content() -> None:
|
||||
doc = create_test_document(
|
||||
title="", semantic_id="", sections=[Section(text="", link="test_link")]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
def test_filter_documents_empty_title_with_content() -> None:
|
||||
doc = create_test_document(
|
||||
title="", sections=[Section(text="Valid content", link="test_link")]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 1
|
||||
assert result[0].id == "test_id"
|
||||
|
||||
|
||||
def test_filter_documents_empty_content_with_title() -> None:
|
||||
doc = create_test_document(
|
||||
title="Valid Title", sections=[Section(text="", link="test_link")]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 1
|
||||
assert result[0].id == "test_id"
|
||||
|
||||
|
||||
def test_filter_documents_exceeding_max_chars() -> None:
|
||||
if not MAX_DOCUMENT_CHARS: # Skip if no max chars configured
|
||||
return
|
||||
long_text = "a" * (MAX_DOCUMENT_CHARS + 1)
|
||||
doc = create_test_document(sections=[Section(text=long_text, link="test_link")])
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
def test_filter_documents_valid_document() -> None:
|
||||
doc = create_test_document(
|
||||
title="Valid Title", sections=[Section(text="Valid content", link="test_link")]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 1
|
||||
assert result[0].id == "test_id"
|
||||
assert result[0].title == "Valid Title"
|
||||
|
||||
|
||||
def test_filter_documents_whitespace_only() -> None:
|
||||
doc = create_test_document(
|
||||
title=" ", semantic_id=" ", sections=[Section(text=" ", link="test_link")]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
def test_filter_documents_semantic_id_no_title() -> None:
|
||||
doc = create_test_document(
|
||||
title=None,
|
||||
semantic_id="Valid Semantic ID",
|
||||
sections=[Section(text="Valid content", link="test_link")],
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 1
|
||||
assert result[0].semantic_identifier == "Valid Semantic ID"
|
||||
|
||||
|
||||
def test_filter_documents_multiple_sections() -> None:
|
||||
doc = create_test_document(
|
||||
sections=[
|
||||
Section(text="Content 1", link="test_link"),
|
||||
Section(text="Content 2", link="test_link"),
|
||||
Section(text="Content 3", link="test_link"),
|
||||
]
|
||||
)
|
||||
result = filter_documents([doc])
|
||||
assert len(result) == 1
|
||||
assert len(result[0].sections) == 3
|
||||
|
||||
|
||||
def test_filter_documents_multiple_documents() -> None:
|
||||
docs = [
|
||||
create_test_document(doc_id="1", title="Title 1"),
|
||||
create_test_document(
|
||||
doc_id="2", title="", sections=[Section(text="", link="test_link")]
|
||||
), # Should be filtered
|
||||
create_test_document(doc_id="3", title="Title 3"),
|
||||
]
|
||||
result = filter_documents(docs)
|
||||
assert len(result) == 2
|
||||
assert {doc.id for doc in result} == {"1", "3"}
|
||||
|
||||
|
||||
def test_filter_documents_empty_batch() -> None:
|
||||
result = filter_documents([])
|
||||
assert len(result) == 0
|
@ -183,6 +183,8 @@ services:
|
||||
- GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-}
|
||||
- NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-}
|
||||
- GITHUB_CONNECTOR_BASE_URL=${GITHUB_CONNECTOR_BASE_URL:-}
|
||||
- MAX_DOCUMENT_CHARS=${MAX_DOCUMENT_CHARS:-}
|
||||
- MAX_FILE_SIZE_BYTES=${MAX_FILE_SIZE_BYTES:-}
|
||||
# Celery Configs (defaults are set in the supervisord.conf file.
|
||||
# prefer doing that to have one source of defaults)
|
||||
- CELERY_WORKER_INDEXING_CONCURRENCY=${CELERY_WORKER_INDEXING_CONCURRENCY:-}
|
||||
|
@ -61,6 +61,8 @@ data:
|
||||
WEB_CONNECTOR_VALIDATE_URLS: ""
|
||||
GONG_CONNECTOR_START_TIME: ""
|
||||
NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: ""
|
||||
MAX_DOCUMENT_CHARS: ""
|
||||
MAX_FILE_SIZE_BYTES: ""
|
||||
# DanswerBot SlackBot Configs
|
||||
DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: ""
|
||||
DANSWER_BOT_DISPLAY_ERROR_MSGS: ""
|
||||
|
Loading…
x
Reference in New Issue
Block a user