mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-09 12:30:49 +02:00
Add metadata file loader to ZIP file connector (#920)
This commit is contained in:
parent
2a139fd529
commit
a4d5ac816e
@ -74,12 +74,29 @@ def is_macos_resource_fork_file(file_name: str) -> bool:
|
||||
)
|
||||
|
||||
|
||||
# To include additional metadata in the search index, add a .danswer_metadata.json file
|
||||
# to the zip file. This file should contain a list of objects with the following format:
|
||||
# [{ "filename": "file1.txt", "link": "https://example.com/file1.txt" }]
|
||||
def load_files_from_zip(
|
||||
zip_location: str | Path,
|
||||
ignore_macos_resource_fork_files: bool = True,
|
||||
ignore_dirs: bool = True,
|
||||
) -> Generator[tuple[zipfile.ZipInfo, IO[Any]], None, None]:
|
||||
) -> Generator[tuple[zipfile.ZipInfo, IO[Any], dict[str, Any]], None, None]:
|
||||
with zipfile.ZipFile(zip_location, "r") as zip_file:
|
||||
zip_metadata = {}
|
||||
try:
|
||||
metadata_file_info = zip_file.getinfo(".danswer_metadata.json")
|
||||
with zip_file.open(metadata_file_info, "r") as metadata_file:
|
||||
try:
|
||||
zip_metadata = json.load(metadata_file)
|
||||
if isinstance(zip_metadata, list):
|
||||
# convert list of dicts to dict of dicts
|
||||
zip_metadata = {d["filename"]: d for d in zip_metadata}
|
||||
except json.JSONDecodeError:
|
||||
logger.warn("Unable to load .danswer_metadata.json")
|
||||
except KeyError:
|
||||
logger.info("No .danswer_metadata.json file")
|
||||
|
||||
for file_info in zip_file.infolist():
|
||||
with zip_file.open(file_info.filename, "r") as file:
|
||||
if ignore_dirs and file_info.is_dir():
|
||||
@ -89,7 +106,7 @@ def load_files_from_zip(
|
||||
file_info.filename
|
||||
):
|
||||
continue
|
||||
yield file_info, file
|
||||
yield file_info, file, zip_metadata.get(file_info.filename, {})
|
||||
|
||||
|
||||
def detect_encoding(file_path: str | Path) -> str:
|
||||
|
@ -27,19 +27,22 @@ logger = setup_logger()
|
||||
|
||||
def _open_files_at_location(
|
||||
file_path: str | Path,
|
||||
) -> Generator[tuple[str, IO[Any]], Any, None]:
|
||||
) -> Generator[tuple[str, IO[Any], dict[str, Any]], Any, None]:
|
||||
extension = get_file_ext(file_path)
|
||||
metadata: dict[str, Any] = {}
|
||||
|
||||
if extension == ".zip":
|
||||
for file_info, file in load_files_from_zip(file_path, ignore_dirs=True):
|
||||
yield file_info.filename, file
|
||||
for file_info, file, metadata in load_files_from_zip(
|
||||
file_path, ignore_dirs=True
|
||||
):
|
||||
yield file_info.filename, file, metadata
|
||||
elif extension in [".txt", ".md", ".mdx"]:
|
||||
encoding = detect_encoding(file_path)
|
||||
with open(file_path, "r", encoding=encoding, errors="replace") as file:
|
||||
yield os.path.basename(file_path), file
|
||||
yield os.path.basename(file_path), file, metadata
|
||||
elif extension == ".pdf":
|
||||
with open(file_path, "rb") as file:
|
||||
yield os.path.basename(file_path), file
|
||||
yield os.path.basename(file_path), file, metadata
|
||||
else:
|
||||
logger.warning(f"Skipping file '{file_path}' with extension '{extension}'")
|
||||
|
||||
@ -47,7 +50,7 @@ def _open_files_at_location(
|
||||
def _process_file(
|
||||
file_name: str,
|
||||
file: IO[Any],
|
||||
time_updated: datetime,
|
||||
metadata: dict[str, Any] = {},
|
||||
pdf_pass: str | None = None,
|
||||
) -> list[Document]:
|
||||
extension = get_file_ext(file_name)
|
||||
@ -55,14 +58,19 @@ def _process_file(
|
||||
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
|
||||
return []
|
||||
|
||||
metadata: dict[str, Any] = {}
|
||||
file_metadata: dict[str, Any] = {}
|
||||
|
||||
if extension == ".pdf":
|
||||
file_content_raw = read_pdf_file(
|
||||
file=file, file_name=file_name, pdf_pass=pdf_pass
|
||||
)
|
||||
else:
|
||||
file_content_raw, metadata = read_file(file)
|
||||
file_content_raw, file_metadata = read_file(file)
|
||||
file_metadata = {**metadata, **file_metadata}
|
||||
|
||||
time_updated = file_metadata.get("time_updated", datetime.now(timezone.utc))
|
||||
if isinstance(time_updated, str):
|
||||
time_updated = time_str_to_utc(time_updated)
|
||||
|
||||
dt_str = metadata.get("doc_updated_at")
|
||||
final_time_updated = time_str_to_utc(dt_str) if dt_str else time_updated
|
||||
@ -103,9 +111,12 @@ class LocalFileConnector(LoadConnector):
|
||||
current_datetime = datetime.now(timezone.utc)
|
||||
files = _open_files_at_location(file_location)
|
||||
|
||||
for file_name, file in files:
|
||||
for file_name, file, metadata in files:
|
||||
metadata["time_updated"] = metadata.get(
|
||||
"time_updated", current_datetime
|
||||
)
|
||||
documents.extend(
|
||||
_process_file(file_name, file, current_datetime, self.pdf_pass)
|
||||
_process_file(file_name, file, metadata, self.pdf_pass)
|
||||
)
|
||||
|
||||
if len(documents) >= self.batch_size:
|
||||
|
@ -69,7 +69,7 @@ class GoogleSitesConnector(LoadConnector):
|
||||
# load the HTML files
|
||||
files = load_files_from_zip(self.zip_path)
|
||||
count = 0
|
||||
for file_info, file_io in files:
|
||||
for file_info, file_io, _metadata in files:
|
||||
# skip non-published files
|
||||
if "/PUBLISHED/" not in file_info.filename:
|
||||
continue
|
||||
|
Loading…
x
Reference in New Issue
Block a user