Files
danswer/backend/onyx/connectors/google_site/connector.py
2024-12-13 09:56:10 -08:00

148 lines
4.9 KiB
Python

import os
import re
from typing import Any
from typing import cast
from bs4 import BeautifulSoup
from bs4 import Tag
from sqlalchemy.orm import Session
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.db.engine import get_sqlalchemy_engine
from onyx.file_processing.extract_file_text import load_files_from_zip
from onyx.file_processing.extract_file_text import read_text_file
from onyx.file_processing.html_utils import web_html_cleanup
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
logger = setup_logger()
def a_tag_text_to_path(atag: Tag) -> str:
page_path = atag.text.strip().lower()
page_path = re.sub(r"[^a-zA-Z0-9\s]", "", page_path)
page_path = "-".join(page_path.split())
return page_path
def find_google_sites_page_path_from_navbar(
element: BeautifulSoup | Tag, path: str, depth: int
) -> str | None:
lis = cast(
list[Tag],
element.find_all("li", attrs={"data-nav-level": f"{depth}"}),
)
for li in lis:
a = cast(Tag, li.find("a"))
if a.get("aria-selected") == "true":
return f"{path}/{a_tag_text_to_path(a)}"
elif a.get("aria-expanded") == "true":
sub_path = find_google_sites_page_path_from_navbar(
element, f"{path}/{a_tag_text_to_path(a)}", depth + 1
)
if sub_path:
return sub_path
return None
class GoogleSitesConnector(LoadConnector):
def __init__(
self,
zip_path: str,
base_url: str,
batch_size: int = INDEX_BATCH_SIZE,
):
self.zip_path = zip_path
self.base_url = base_url
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
pass
def load_from_state(self) -> GenerateDocumentsOutput:
documents: list[Document] = []
with Session(get_sqlalchemy_engine()) as db_session:
file_content_io = get_default_file_store(db_session).read_file(
self.zip_path, mode="b"
)
# load the HTML files
files = load_files_from_zip(file_content_io)
count = 0
for file_info, file_io, _metadata in files:
# skip non-published files
if "/PUBLISHED/" not in file_info.filename:
continue
file_path, extension = os.path.splitext(file_info.filename)
if extension != ".html":
continue
file_content, _ = read_text_file(file_io)
soup = BeautifulSoup(file_content, "html.parser")
# get the link out of the navbar
header = cast(Tag, soup.find("header"))
nav = cast(Tag, header.find("nav"))
path = find_google_sites_page_path_from_navbar(nav, "", 1)
if not path:
count += 1
logger.error(
f"Could not find path for '{file_info.filename}'. "
+ "This page will not have a working link.\n\n"
+ f"# of broken links so far - {count}"
)
logger.info(f"Path to page: {path}")
# cleanup the hidden `Skip to main content` and `Skip to navigation` that
# appears at the top of every page
for div in soup.find_all("div", attrs={"data-is-touch-wrapper": "true"}):
div.extract()
# get the body of the page
parsed_html = web_html_cleanup(
soup, additional_element_types_to_discard=["header", "nav"]
)
title = parsed_html.title or file_path.split("/")[-1]
documents.append(
Document(
id=f"{DocumentSource.GOOGLE_SITES.value}:{path}",
source=DocumentSource.GOOGLE_SITES,
semantic_identifier=title,
sections=[
Section(
link=(self.base_url.rstrip("/") + "/" + path.lstrip("/"))
if path
else "",
text=parsed_html.cleaned_text,
)
],
metadata={},
)
)
if len(documents) >= self.batch_size:
yield documents
documents = []
if documents:
yield documents
if __name__ == "__main__":
connector = GoogleSitesConnector(
os.environ["GOOGLE_SITES_ZIP_PATH"],
os.environ.get("GOOGLE_SITES_BASE_URL", ""),
)
for doc_batch in connector.load_from_state():
for doc in doc_batch:
print(doc)