diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index a33a2ac0e77..62ec561e968 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -35,7 +35,7 @@ SECTION_SEPARATOR = "\n\n" INDEX_SEPARATOR = "===" # For File Connector Metadata override file -DANSWER_METADATA_FILENAME = ".onyx_metadata.json" +ONYX_METADATA_FILENAME = ".onyx_metadata.json" # Messages DISABLED_GEN_AI_MSG = ( diff --git a/backend/onyx/connectors/file/connector.py b/backend/onyx/connectors/file/connector.py index 1caaa0f1d3d..13ff1fc25c7 100644 --- a/backend/onyx/connectors/file/connector.py +++ b/backend/onyx/connectors/file/connector.py @@ -1,5 +1,4 @@ import os -from collections.abc import Iterator from datetime import datetime from datetime import timezone from pathlib import Path @@ -23,7 +22,6 @@ from onyx.db.pg_file_store import get_pgfilestore_by_file_name from onyx.file_processing.extract_file_text import extract_text_and_images from onyx.file_processing.extract_file_text import get_file_ext from onyx.file_processing.extract_file_text import is_accepted_file_ext -from onyx.file_processing.extract_file_text import load_files_from_zip from onyx.file_processing.extract_file_text import OnyxExtensionType from onyx.file_processing.image_utils import store_image_and_create_section from onyx.file_store.file_store import get_default_file_store @@ -32,30 +30,22 @@ from onyx.utils.logger import setup_logger logger = setup_logger() -def _read_files_and_metadata( +def _read_file_from_postgres( file_name: str, db_session: Session, -) -> Iterator[tuple[str, IO, dict[str, Any]]]: +) -> IO | None: """ - Reads the file from Postgres. If the file is a .zip, yields subfiles. + Gets the content of a file from Postgres. """ extension = get_file_ext(file_name) - metadata: dict[str, Any] = {} - directory_path = os.path.dirname(file_name) # Read file from Postgres store file_content = get_default_file_store(db_session).read_file(file_name, mode="b") - # If it's a zip, expand it - if extension == ".zip": - for file_info, subfile, metadata in load_files_from_zip( - file_content, ignore_dirs=True - ): - yield os.path.join(directory_path, file_info.filename), subfile, metadata - elif is_accepted_file_ext(extension, OnyxExtensionType.All): - yield file_name, file_content, metadata - else: - logger.warning(f"Skipping file '{file_name}' with extension '{extension}'") + if is_accepted_file_ext(extension, OnyxExtensionType.All): + return file_content + logger.warning(f"Skipping file '{file_name}' with extension '{extension}'") + return None def _create_image_section( @@ -288,17 +278,24 @@ class LocalFileConnector(LoadConnector): def __init__( self, file_locations: list[Path | str], + zip_metadata: dict[str, Any], batch_size: int = INDEX_BATCH_SIZE, ) -> None: self.file_locations = [str(loc) for loc in file_locations] self.batch_size = batch_size self.pdf_pass: str | None = None + self.zip_metadata = zip_metadata def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: self.pdf_pass = credentials.get("pdf_password") return None + def _get_file_metadata(self, file_name: str) -> dict[str, Any]: + return self.zip_metadata.get(file_name, {}) or self.zip_metadata.get( + os.path.basename(file_name), {} + ) + def load_from_state(self) -> GenerateDocumentsOutput: """ Iterates over each file path, fetches from Postgres, tries to parse text @@ -310,35 +307,40 @@ class LocalFileConnector(LoadConnector): for file_path in self.file_locations: current_datetime = datetime.now(timezone.utc) - files_iter = _read_files_and_metadata( + file_io = _read_file_from_postgres( file_name=file_path, db_session=db_session, ) + if not file_io: + # typically an unsupported extension + continue - for actual_file_name, file, metadata in files_iter: - metadata["time_updated"] = metadata.get( - "time_updated", current_datetime - ) - new_docs = _process_file( - file_name=actual_file_name, - file=file, - metadata=metadata, - pdf_pass=self.pdf_pass, - db_session=db_session, - ) - documents.extend(new_docs) + metadata = self._get_file_metadata(file_path) + metadata["time_updated"] = metadata.get( + "time_updated", current_datetime + ) + new_docs = _process_file( + file_name=file_path, + file=file_io, + metadata=metadata, + pdf_pass=self.pdf_pass, + db_session=db_session, + ) + documents.extend(new_docs) - if len(documents) >= self.batch_size: - yield documents + if len(documents) >= self.batch_size: + yield documents - documents = [] + documents = [] if documents: yield documents if __name__ == "__main__": - connector = LocalFileConnector(file_locations=[os.environ["TEST_FILE"]]) + connector = LocalFileConnector( + file_locations=[os.environ["TEST_FILE"]], zip_metadata={} + ) connector.load_credentials({"pdf_password": os.environ.get("PDF_PASSWORD")}) doc_batches = connector.load_from_state() for batch in doc_batches: diff --git a/backend/onyx/connectors/google_site/connector.py b/backend/onyx/connectors/google_site/connector.py index 7f0af1d551a..cf02ef90daa 100644 --- a/backend/onyx/connectors/google_site/connector.py +++ b/backend/onyx/connectors/google_site/connector.py @@ -77,7 +77,7 @@ class GoogleSitesConnector(LoadConnector): # load the HTML files files = load_files_from_zip(file_content_io) count = 0 - for file_info, file_io, _metadata in files: + for file_info, file_io in files: # skip non-published files if "/PUBLISHED/" not in file_info.filename: continue diff --git a/backend/onyx/db/user_documents.py b/backend/onyx/db/user_documents.py index 9d98bd3840d..03a6b3c7265 100644 --- a/backend/onyx/db/user_documents.py +++ b/backend/onyx/db/user_documents.py @@ -40,6 +40,8 @@ def create_user_files( db_session: Session, link_url: str | None = None, ) -> list[UserFile]: + # NOTE: At the moment, zip metadata is not used for user files. + # Should revisit to decide whether this should be a feature. upload_response = upload_files(files, db_session) user_files = [] @@ -105,6 +107,7 @@ def create_file_connector_credential( input_type=InputType.LOAD_STATE, connector_specific_config={ "file_locations": [user_file.file_id], + "zip_metadata": {}, }, refresh_freq=None, prune_freq=None, diff --git a/backend/onyx/file_processing/extract_file_text.py b/backend/onyx/file_processing/extract_file_text.py index f617ee1d5c0..153c61962a4 100644 --- a/backend/onyx/file_processing/extract_file_text.py +++ b/backend/onyx/file_processing/extract_file_text.py @@ -26,8 +26,8 @@ from PIL import Image from pypdf import PdfReader from pypdf.errors import PdfStreamError -from onyx.configs.constants import DANSWER_METADATA_FILENAME from onyx.configs.constants import FileOrigin +from onyx.configs.constants import ONYX_METADATA_FILENAME from onyx.file_processing.html_utils import parse_html_page_basic from onyx.file_processing.unstructured import get_unstructured_api_key from onyx.file_processing.unstructured import unstructured_to_text @@ -146,27 +146,11 @@ def load_files_from_zip( zip_file_io: IO, ignore_macos_resource_fork_files: bool = True, ignore_dirs: bool = True, -) -> Iterator[tuple[zipfile.ZipInfo, IO[Any], dict[str, Any]]]: +) -> Iterator[tuple[zipfile.ZipInfo, IO[Any]]]: """ If there's a .onyx_metadata.json in the zip, attach those metadata to each subfile. """ with zipfile.ZipFile(zip_file_io, "r") as zip_file: - zip_metadata = {} - try: - metadata_file_info = zip_file.getinfo(DANSWER_METADATA_FILENAME) - with zip_file.open(metadata_file_info, "r") as metadata_file: - try: - zip_metadata = json.load(metadata_file) - if isinstance(zip_metadata, list): - # convert list of dicts to dict of dicts - # Use just the basename for matching since metadata may not include - # the full path within the ZIP file - zip_metadata = {d["filename"]: d for d in zip_metadata} - except json.JSONDecodeError: - logger.warning(f"Unable to load {DANSWER_METADATA_FILENAME}") - except KeyError: - logger.info(f"No {DANSWER_METADATA_FILENAME} file") - for file_info in zip_file.infolist(): if ignore_dirs and file_info.is_dir(): continue @@ -174,28 +158,23 @@ def load_files_from_zip( if ( ignore_macos_resource_fork_files and is_macos_resource_fork_file(file_info.filename) - ) or file_info.filename == DANSWER_METADATA_FILENAME: + ) or file_info.filename == ONYX_METADATA_FILENAME: continue with zip_file.open(file_info.filename, "r") as subfile: # Try to match by exact filename first - if file_info.filename in zip_metadata: - yield file_info, subfile, zip_metadata.get(file_info.filename, {}) - else: - # Then try matching by just the basename - basename = os.path.basename(file_info.filename) - yield file_info, subfile, zip_metadata.get(basename, {}) + yield file_info, subfile def _extract_onyx_metadata(line: str) -> dict | None: """ Example: first line has: - + or - #DANSWER_METADATA={"title":"..."} + #ONYX_METADATA={"title":"..."} """ - html_comment_pattern = r"" - hashtag_pattern = r"#DANSWER_METADATA=\{(.*?)\}" + html_comment_pattern = r"" + hashtag_pattern = r"#ONYX_METADATA=\{(.*?)\}" html_comment_match = re.search(html_comment_pattern, line) hashtag_match = re.search(hashtag_pattern, line) diff --git a/backend/onyx/server/documents/connector.py b/backend/onyx/server/documents/connector.py index 14376599bb8..3973c896b95 100644 --- a/backend/onyx/server/documents/connector.py +++ b/backend/onyx/server/documents/connector.py @@ -1,8 +1,10 @@ +import json import mimetypes import os import uuid import zipfile from io import BytesIO +from typing import Any from typing import cast from fastapi import APIRouter @@ -26,6 +28,7 @@ from onyx.configs.app_configs import MOCK_CONNECTOR_FILE_PATH from onyx.configs.constants import DocumentSource from onyx.configs.constants import FileOrigin from onyx.configs.constants import MilestoneRecordType +from onyx.configs.constants import ONYX_METADATA_FILENAME from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask from onyx.connectors.exceptions import ConnectorValidationError @@ -128,12 +131,13 @@ from onyx.utils.telemetry import create_milestone_and_report from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop - logger = setup_logger() _GMAIL_CREDENTIAL_ID_COOKIE_NAME = "gmail_credential_id" _GOOGLE_DRIVE_CREDENTIAL_ID_COOKIE_NAME = "google_drive_credential_id" +SEEN_ZIP_DETAIL = "Only one zip file is allowed per file connector, \ +use the ingestion APIs for multiple files" router = APIRouter(prefix="/manage") @@ -390,6 +394,31 @@ def check_drive_tokens( return AuthStatus(authenticated=True) +def extract_zip_metadata(zf: zipfile.ZipFile) -> dict[str, Any]: + zip_metadata = {} + try: + metadata_file_info = zf.getinfo(ONYX_METADATA_FILENAME) + with zf.open(metadata_file_info, "r") as metadata_file: + try: + zip_metadata = json.load(metadata_file) + if isinstance(zip_metadata, list): + # convert list of dicts to dict of dicts + # Use just the basename for matching since metadata may not include + # the full path within the ZIP file + zip_metadata = {d["filename"]: d for d in zip_metadata} + except json.JSONDecodeError as e: + logger.warning(f"Unable to load {ONYX_METADATA_FILENAME}: {e}") + # should fail loudly here to let users know that their metadata + # file is not valid JSON + raise HTTPException( + status_code=400, + detail=f"Unable to load {ONYX_METADATA_FILENAME}: {e}", + ) + except KeyError: + logger.info(f"No {ONYX_METADATA_FILENAME} file") + return zip_metadata + + def upload_files(files: list[UploadFile], db_session: Session) -> FileUploadResponse: for file in files: if not file.filename: @@ -400,13 +429,18 @@ def upload_files(files: list[UploadFile], db_session: Session) -> FileUploadResp normalized_path = os.path.normpath(file_path) return not any(part.startswith(".") for part in normalized_path.split(os.sep)) + deduped_file_paths = [] + zip_metadata = {} try: file_store = get_default_file_store(db_session) - deduped_file_paths = [] - + seen_zip = False for file in files: if file.content_type and file.content_type.startswith("application/zip"): + if seen_zip: + raise HTTPException(status_code=400, detail=SEEN_ZIP_DETAIL) + seen_zip = True with zipfile.ZipFile(file.file, "r") as zf: + zip_metadata = extract_zip_metadata(zf) for file_info in zf.namelist(): if zf.getinfo(file_info).is_dir(): continue @@ -452,7 +486,7 @@ def upload_files(files: list[UploadFile], db_session: Session) -> FileUploadResp except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) - return FileUploadResponse(file_paths=deduped_file_paths) + return FileUploadResponse(file_paths=deduped_file_paths, zip_metadata=zip_metadata) @router.post("/admin/connector/file/upload") diff --git a/backend/onyx/server/documents/models.py b/backend/onyx/server/documents/models.py index 8e1072b7eb7..653c077d3af 100644 --- a/backend/onyx/server/documents/models.py +++ b/backend/onyx/server/documents/models.py @@ -422,6 +422,7 @@ class GoogleServiceAccountCredentialRequest(BaseModel): class FileUploadResponse(BaseModel): file_paths: list[str] + zip_metadata: dict[str, Any] class ObjectCreationIdResponse(BaseModel): diff --git a/backend/onyx/server/query_and_chat/chat_backend.py b/backend/onyx/server/query_and_chat/chat_backend.py index b8112cf317d..70ba54dfef7 100644 --- a/backend/onyx/server/query_and_chat/chat_backend.py +++ b/backend/onyx/server/query_and_chat/chat_backend.py @@ -785,6 +785,7 @@ def upload_files_for_chat( input_type=InputType.LOAD_STATE, connector_specific_config={ "file_locations": [user_file.file_id], + "zip_metadata": {}, }, refresh_freq=None, prune_freq=None, diff --git a/backend/onyx/server/user_documents/api.py b/backend/onyx/server/user_documents/api.py index 9e01198eff5..fe6e3f8edab 100644 --- a/backend/onyx/server/user_documents/api.py +++ b/backend/onyx/server/user_documents/api.py @@ -381,6 +381,7 @@ def create_file_from_link( input_type=InputType.LOAD_STATE, connector_specific_config={ "file_locations": [user_file.file_id], + "zip_metadata": {}, }, refresh_freq=None, prune_freq=None, diff --git a/backend/tests/integration/common_utils/managers/connector.py b/backend/tests/integration/common_utils/managers/connector.py index 2e0f9e517b9..ea7ddd145bc 100644 --- a/backend/tests/integration/common_utils/managers/connector.py +++ b/backend/tests/integration/common_utils/managers/connector.py @@ -33,7 +33,11 @@ class ConnectorManager: input_type=input_type, connector_specific_config=( connector_specific_config - or ({"file_locations": []} if source == DocumentSource.FILE else {}) + or ( + {"file_locations": [], "zip_metadata": {}} + if source == DocumentSource.FILE + else {} + ) ), access_type=access_type, groups=groups or [], diff --git a/backend/tests/integration/tests/image_indexing/test_indexing_images.py b/backend/tests/integration/tests/image_indexing/test_indexing_images.py index eac492a33a3..37fc51b7795 100644 --- a/backend/tests/integration/tests/image_indexing/test_indexing_images.py +++ b/backend/tests/integration/tests/image_indexing/test_indexing_images.py @@ -72,7 +72,7 @@ def test_image_indexing( name=connector_name, source=DocumentSource.FILE, input_type=InputType.LOAD_STATE, - connector_specific_config={"file_locations": file_paths}, + connector_specific_config={"file_locations": file_paths, "zip_metadata": {}}, access_type=AccessType.PUBLIC, groups=[], user_performing_action=admin_user, diff --git a/backend/tests/regression/answer_quality/api_utils.py b/backend/tests/regression/answer_quality/api_utils.py index 1a281a1bdd1..7390b24059b 100644 --- a/backend/tests/regression/answer_quality/api_utils.py +++ b/backend/tests/regression/answer_quality/api_utils.py @@ -160,7 +160,7 @@ def create_connector(env_name: str, file_paths: list[str]) -> int: name=connector_name, source=DocumentSource.FILE, input_type=InputType.LOAD_STATE, - connector_specific_config={"file_locations": file_paths}, + connector_specific_config={"file_locations": file_paths, "zip_metadata": {}}, refresh_freq=None, prune_freq=None, indexing_start=None, diff --git a/web/src/app/admin/connectors/[connector]/pages/utils/files.ts b/web/src/app/admin/connectors/[connector]/pages/utils/files.ts index feb2810178f..12fbb4c38a6 100644 --- a/web/src/app/admin/connectors/[connector]/pages/utils/files.ts +++ b/web/src/app/admin/connectors/[connector]/pages/utils/files.ts @@ -31,6 +31,7 @@ export const submitFiles = async ( } const filePaths = responseJson.file_paths as string[]; + const zipMetadata = responseJson.zip_metadata as Record; const [connectorErrorMsg, connector] = await createConnector({ name: "FileConnector-" + Date.now(), @@ -38,6 +39,7 @@ export const submitFiles = async ( input_type: "load_state", connector_specific_config: { file_locations: filePaths, + zip_metadata: zipMetadata, }, refresh_freq: null, prune_freq: null, diff --git a/web/src/app/admin/indexing/status/CCPairIndexingStatusTable.tsx b/web/src/app/admin/indexing/status/CCPairIndexingStatusTable.tsx index d537b6846aa..4043f67a328 100644 --- a/web/src/app/admin/indexing/status/CCPairIndexingStatusTable.tsx +++ b/web/src/app/admin/indexing/status/CCPairIndexingStatusTable.tsx @@ -484,6 +484,7 @@ export function CCPairIndexingStatusTable({ input_type: "poll", connector_specific_config: { file_locations: ["/path/to/sample/file.txt"], + zip_metadata: {}, }, refresh_freq: 86400, prune_freq: null, diff --git a/web/src/lib/connectors/connectors.tsx b/web/src/lib/connectors/connectors.tsx index 78caaefd5c0..2c8d104f8b0 100644 --- a/web/src/lib/connectors/connectors.tsx +++ b/web/src/lib/connectors/connectors.tsx @@ -1500,6 +1500,7 @@ export interface LoopioConfig { export interface FileConfig { file_locations: string[]; + zip_metadata: Record; } export interface ZulipConfig {