Sharepoint fixes (#3826)

* Sharepoint connector fixes

* Refactor sharepoint to be better

* Improve env variable naming

* Fix

* Add new secrets

* Fix unstructured failure
This commit is contained in:
Chris Weaver
2025-01-28 20:06:09 -08:00
committed by GitHub
parent 2701f83634
commit 028e877342
5 changed files with 345 additions and 87 deletions

View File

@@ -1,17 +1,14 @@
import io
import os
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Optional
from urllib.parse import unquote
import msal # type: ignore
from office365.graph_client import GraphClient # type: ignore
from office365.onedrive.driveitems.driveItem import DriveItem # type: ignore
from office365.onedrive.sites.site import Site # type: ignore
from pydantic import BaseModel
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
@@ -30,16 +27,25 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
@dataclass
class SiteData:
url: str | None
folder: Optional[str]
sites: list = field(default_factory=list)
driveitems: list = field(default_factory=list)
class SiteDescriptor(BaseModel):
"""Data class for storing SharePoint site information.
Args:
url: The base site URL (e.g. https://danswerai.sharepoint.com/sites/sharepoint-tests)
drive_name: The name of the drive to access (e.g. "Shared Documents", "Other Library")
If None, all drives will be accessed.
folder_path: The folder path within the drive to access (e.g. "test/nested with spaces")
If None, all folders will be accessed.
"""
url: str
drive_name: str | None
folder_path: str | None
def _convert_driveitem_to_document(
driveitem: DriveItem,
drive_name: str,
) -> Document:
file_text = extract_file_text(
file=io.BytesIO(driveitem.get_content().execute_query().value),
@@ -59,7 +65,7 @@ def _convert_driveitem_to_document(
email=driveitem.last_modified_by.user.email,
)
],
metadata={},
metadata={"drive": drive_name},
)
return doc
@@ -71,106 +77,171 @@ class SharepointConnector(LoadConnector, PollConnector):
sites: list[str] = [],
) -> None:
self.batch_size = batch_size
self.graph_client: GraphClient | None = None
self.site_data: list[SiteData] = self._extract_site_and_folder(sites)
self._graph_client: GraphClient | None = None
self.site_descriptors: list[SiteDescriptor] = self._extract_site_and_drive_info(
sites
)
@property
def graph_client(self) -> GraphClient:
if self._graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
return self._graph_client
@staticmethod
def _extract_site_and_folder(site_urls: list[str]) -> list[SiteData]:
def _extract_site_and_drive_info(site_urls: list[str]) -> list[SiteDescriptor]:
site_data_list = []
for url in site_urls:
parts = url.strip().split("/")
if "sites" in parts:
sites_index = parts.index("sites")
site_url = "/".join(parts[: sites_index + 2])
folder = (
"/".join(unquote(part) for part in parts[sites_index + 2 :])
if len(parts) > sites_index + 2
else None
)
# Handling for new URL structure
if folder and folder.startswith("Shared Documents/"):
folder = folder[len("Shared Documents/") :]
remaining_parts = parts[sites_index + 2 :]
# Extract drive name and folder path
if remaining_parts:
drive_name = unquote(remaining_parts[0])
folder_path = (
"/".join(unquote(part) for part in remaining_parts[1:])
if len(remaining_parts) > 1
else None
)
else:
drive_name = None
folder_path = None
site_data_list.append(
SiteData(url=site_url, folder=folder, sites=[], driveitems=[])
SiteDescriptor(
url=site_url,
drive_name=drive_name,
folder_path=folder_path,
)
)
return site_data_list
def _populate_sitedata_driveitems(
def _fetch_driveitems(
self,
site_descriptor: SiteDescriptor,
start: datetime | None = None,
end: datetime | None = None,
) -> None:
) -> list[tuple[DriveItem, str]]:
filter_str = ""
if start is not None and end is not None:
filter_str = f"last_modified_datetime ge {start.isoformat()} and last_modified_datetime le {end.isoformat()}"
filter_str = (
f"last_modified_datetime ge {start.isoformat()} and "
f"last_modified_datetime le {end.isoformat()}"
)
for element in self.site_data:
sites: list[Site] = []
for site in element.sites:
site_sublist = site.lists.get().execute_query()
sites.extend(site_sublist)
final_driveitems: list[tuple[DriveItem, str]] = []
try:
site = self.graph_client.sites.get_by_url(site_descriptor.url)
for site in sites:
# Get all drives in the site
drives = site.drives.get().execute_query()
logger.debug(f"Found drives: {[drive.name for drive in drives]}")
# Filter drives based on the requested drive name
if site_descriptor.drive_name:
drives = [
drive
for drive in drives
if drive.name == site_descriptor.drive_name
or (
drive.name == "Documents"
and site_descriptor.drive_name == "Shared Documents"
)
]
if not drives:
logger.warning(f"Drive '{site_descriptor.drive_name}' not found")
return []
# Process each matching drive
for drive in drives:
try:
query = site.drive.root.get_files(True, 1000)
root_folder = drive.root
if site_descriptor.folder_path:
# If a specific folder is requested, navigate to it
for folder_part in site_descriptor.folder_path.split("/"):
root_folder = root_folder.get_by_path(folder_part)
# Get all items recursively
query = root_folder.get_files(True, 1000)
if filter_str:
query = query.filter(filter_str)
driveitems = query.execute_query()
if element.folder:
expected_path = f"/root:/{element.folder}"
logger.debug(
f"Found {len(driveitems)} items in drive '{drive.name}'"
)
# Use "Shared Documents" as the library name for the default "Documents" drive
drive_name = (
"Shared Documents" if drive.name == "Documents" else drive.name
)
if site_descriptor.folder_path:
# Filter items to ensure they're in the specified folder or its subfolders
# The path will be in format: /drives/{drive_id}/root:/folder/path
filtered_driveitems = [
item
(item, drive_name)
for item in driveitems
if item.parent_reference.path.endswith(expected_path)
if any(
path_part == site_descriptor.folder_path
or path_part.startswith(
site_descriptor.folder_path + "/"
)
for path_part in item.parent_reference.path.split(
"root:/"
)[1].split("/")
)
]
if len(filtered_driveitems) == 0:
all_paths = [
item.parent_reference.path for item in driveitems
]
logger.warning(
f"Nothing found for folder '{expected_path}' in any of valid paths: {all_paths}"
f"Nothing found for folder '{site_descriptor.folder_path}' "
f"in; any of valid paths: {all_paths}"
)
element.driveitems.extend(filtered_driveitems)
final_driveitems.extend(filtered_driveitems)
else:
element.driveitems.extend(driveitems)
final_driveitems.extend(
[(item, drive_name) for item in driveitems]
)
except Exception as e:
# Some drives might not be accessible
logger.warning(f"Failed to process drive: {str(e)}")
except Exception:
# Sites include things that do not contain .drive.root so this fails
# but this is fine, as there are no actually documents in those
pass
except Exception as e:
# Sites include things that do not contain drives so this fails
# but this is fine, as there are no actual documents in those
logger.warning(f"Failed to process site: {str(e)}")
def _populate_sitedata_sites(self) -> None:
if self.graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
return final_driveitems
if self.site_data:
for element in self.site_data:
element.sites = [
self.graph_client.sites.get_by_url(element.url)
.get()
.execute_query()
]
else:
sites = self.graph_client.sites.get_all().execute_query()
self.site_data = [
SiteData(url=None, folder=None, sites=sites, driveitems=[])
]
def _fetch_sites(self) -> list[SiteDescriptor]:
sites = self.graph_client.sites.get_all().execute_query()
site_descriptors = [
SiteDescriptor(
url=sites.resource_url,
drive_name=None,
folder_path=None,
)
]
return site_descriptors
def _fetch_from_sharepoint(
self, start: datetime | None = None, end: datetime | None = None
) -> GenerateDocumentsOutput:
if self.graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
self._populate_sitedata_sites()
self._populate_sitedata_driveitems(start=start, end=end)
site_descriptors = self.site_descriptors or self._fetch_sites()
# goes over all urls, converts them into Document objects and then yields them in batches
doc_batch: list[Document] = []
for element in self.site_data:
for driveitem in element.driveitems:
for site_descriptor in site_descriptors:
driveitems = self._fetch_driveitems(site_descriptor, start=start, end=end)
for driveitem, drive_name in driveitems:
logger.debug(f"Processing: {driveitem.web_url}")
doc_batch.append(_convert_driveitem_to_document(driveitem))
doc_batch.append(_convert_driveitem_to_document(driveitem, drive_name))
if len(doc_batch) >= self.batch_size:
yield doc_batch
@@ -197,7 +268,7 @@ class SharepointConnector(LoadConnector, PollConnector):
)
return token
self.graph_client = GraphClient(_acquire_token_func)
self._graph_client = GraphClient(_acquire_token_func)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
@@ -206,19 +277,19 @@ class SharepointConnector(LoadConnector, PollConnector):
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
start_datetime = datetime.utcfromtimestamp(start)
end_datetime = datetime.utcfromtimestamp(end)
start_datetime = datetime.fromtimestamp(start, timezone.utc)
end_datetime = datetime.fromtimestamp(end, timezone.utc)
return self._fetch_from_sharepoint(start=start_datetime, end=end_datetime)
if __name__ == "__main__":
connector = SharepointConnector(sites=os.environ["SITES"].split(","))
connector = SharepointConnector(sites=os.environ["SHAREPOINT_SITES"].split(","))
connector.load_credentials(
{
"sp_client_id": os.environ["SP_CLIENT_ID"],
"sp_client_secret": os.environ["SP_CLIENT_SECRET"],
"sp_directory_id": os.environ["SP_CLIENT_DIRECTORY_ID"],
"sp_client_id": os.environ["SHAREPOINT_CLIENT_ID"],
"sp_client_secret": os.environ["SHAREPOINT_CLIENT_SECRET"],
"sp_directory_id": os.environ["SHAREPOINT_CLIENT_DIRECTORY_ID"],
}
)
document_batches = connector.load_from_state()