cleaned up sharepoint connector (#1599)

* cleaned up sharepoint connector

* additional cleanup

* fixed dropbox connector string
This commit is contained in:
hagen-danswer 2024-06-09 15:15:52 -04:00 committed by GitHub
parent fa3a3d348c
commit 64a042b94d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 61 additions and 110 deletions

View File

@ -96,7 +96,11 @@ class DropboxConnector(LoadConnector, PollConnector):
downloaded_file = self._download_file(entry.path_display)
link = self._get_shared_link(entry.path_display)
try:
text = extract_file_text(entry.name, BytesIO(downloaded_file))
text = extract_file_text(
entry.name,
BytesIO(downloaded_file),
break_on_unprocessable=False,
)
batch.append(
Document(
id=f"doc:{entry.id}",

View File

@ -22,54 +22,47 @@ from danswer.connectors.models import BasicExpertInfo
from danswer.connectors.models import ConnectorMissingCredentialError
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.file_processing.extract_file_text import docx_to_text
from danswer.file_processing.extract_file_text import file_io_to_text
from danswer.file_processing.extract_file_text import is_text_file_extension
from danswer.file_processing.extract_file_text import pdf_to_text
from danswer.file_processing.extract_file_text import pptx_to_text
from danswer.file_processing.extract_file_text import xlsx_to_text
from danswer.file_processing.extract_file_text import extract_file_text
from danswer.utils.logger import setup_logger
UNSUPPORTED_FILE_TYPE_CONTENT = "" # idea copied from the google drive side of things
logger = setup_logger()
def get_text_from_xlsx_driveitem(driveitem_object: DriveItem) -> str:
file_content = driveitem_object.get_content().execute_query().value
return xlsx_to_text(file=io.BytesIO(file_content))
def get_text_from_docx_driveitem(driveitem_object: DriveItem) -> str:
file_content = driveitem_object.get_content().execute_query().value
return docx_to_text(file=io.BytesIO(file_content))
def get_text_from_pdf_driveitem(driveitem_object: DriveItem) -> str:
file_content = driveitem_object.get_content().execute_query().value
file_text = pdf_to_text(file=io.BytesIO(file_content))
return file_text
def get_text_from_txt_driveitem(driveitem_object: DriveItem) -> str:
file_content: bytes = driveitem_object.get_content().execute_query().value
return file_io_to_text(file=io.BytesIO(file_content))
def get_text_from_pptx_driveitem(driveitem_object: DriveItem) -> str:
file_content = driveitem_object.get_content().execute_query().value
return pptx_to_text(file=io.BytesIO(file_content))
@dataclass
class SiteData:
url: str | None
folder: Optional[str]
siteobjects: list = field(default_factory=list)
sites: list = field(default_factory=list)
driveitems: list = field(default_factory=list)
def _convert_driveitem_to_document(
driveitem: DriveItem,
) -> Document:
file_text = extract_file_text(
file_name=driveitem.name,
file=io.BytesIO(driveitem.get_content().execute_query().value),
break_on_unprocessable=False,
)
doc = Document(
id=driveitem.id,
sections=[Section(link=driveitem.web_url, text=file_text)],
source=DocumentSource.SHAREPOINT,
semantic_identifier=driveitem.name,
doc_updated_at=driveitem.last_modified_datetime.replace(tzinfo=timezone.utc),
primary_owners=[
BasicExpertInfo(
display_name=driveitem.last_modified_by.user.displayName,
email=driveitem.last_modified_by.user.email,
)
],
metadata={},
)
return doc
class SharepointConnector(LoadConnector, PollConnector):
def __init__(
self,
@ -78,7 +71,7 @@ class SharepointConnector(LoadConnector, PollConnector):
) -> None:
self.batch_size = batch_size
self.graph_client: GraphClient | None = None
self.site_data = self._extract_site_and_folder(sites)
self.site_data: list[SiteData] = self._extract_site_and_folder(sites)
@staticmethod
def _extract_site_and_folder(site_urls: list[str]) -> list[SiteData]:
@ -92,29 +85,28 @@ class SharepointConnector(LoadConnector, PollConnector):
parts[sites_index + 2] if len(parts) > sites_index + 2 else None
)
site_data_list.append(
SiteData(url=site_url, folder=folder, siteobjects=[], driveitems=[])
SiteData(url=site_url, folder=folder, sites=[], driveitems=[])
)
return site_data_list
def _get_all_driveitem_objects(
def _populate_sitedata_driveitems(
self,
start: datetime | None = None,
end: datetime | None = None,
) -> list[DriveItem]:
) -> None:
filter_str = ""
if start is not None and end is not None:
filter_str = f"last_modified_datetime ge {start.isoformat()} and last_modified_datetime le {end.isoformat()}"
driveitem_list: list[DriveItem] = []
for element in self.site_data:
site_objects_list: list[Site] = []
for site_object in element.siteobjects:
site_objects_sublist = site_object.lists.get().execute_query()
site_objects_list.extend(site_objects_sublist)
sites: list[Site] = []
for site in element.sites:
site_sublist = site.lists.get().execute_query()
sites.extend(site_sublist)
for site_object in site_objects_list:
for site in sites:
try:
query = site_object.drive.root.get_files(True, 1000)
query = site.drive.root.get_files(True, 1000)
if filter_str:
query = query.filter(filter_str)
driveitems = query.execute_query()
@ -133,91 +125,41 @@ class SharepointConnector(LoadConnector, PollConnector):
# but this is fine, as there are no actually documents in those
pass
return driveitem_list
def _get_all_site_objects(self) -> list[SiteData]:
def _populate_sitedata_sites(self) -> None:
if self.graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
if self.site_data:
for element in self.site_data:
element.siteobjects = [
element.sites = [
self.graph_client.sites.get_by_url(element.url)
.get()
.execute_query()
]
return self.site_data
else:
site_objects = self.graph_client.sites.get().execute_query()
return [
SiteData(url=None, folder=None, siteobjects=site_objects, driveitems=[])
sites = self.graph_client.sites.get().execute_query()
self.site_data = [
SiteData(url=None, folder=None, sites=sites, driveitems=[])
]
def _extract_driveitem_text(self, driveitem_object: DriveItem) -> str:
driveitem_name = driveitem_object.name
driveitem_text = UNSUPPORTED_FILE_TYPE_CONTENT
if driveitem_name.endswith(".docx"):
driveitem_text = get_text_from_docx_driveitem(driveitem_object)
elif driveitem_name.endswith(".pdf"):
driveitem_text = get_text_from_pdf_driveitem(driveitem_object)
elif driveitem_name.endswith(".xlsx"):
driveitem_text = get_text_from_xlsx_driveitem(driveitem_object)
elif driveitem_name.endswith(".pptx"):
driveitem_text = get_text_from_pptx_driveitem(driveitem_object)
elif is_text_file_extension(driveitem_name):
driveitem_text = get_text_from_txt_driveitem(driveitem_object)
return driveitem_text
def _convert_driveitem_object_to_document(
self,
driveitem_object: DriveItem,
) -> Document:
file_text = self._extract_driveitem_text(driveitem_object)
doc = Document(
id=driveitem_object.id,
sections=[Section(link=driveitem_object.web_url, text=file_text)],
source=DocumentSource.SHAREPOINT,
semantic_identifier=driveitem_object.name,
doc_updated_at=driveitem_object.last_modified_datetime.replace(
tzinfo=timezone.utc
),
primary_owners=[
BasicExpertInfo(
display_name=driveitem_object.last_modified_by.user.displayName,
email=driveitem_object.last_modified_by.user.email,
)
],
metadata={},
)
return doc
def _fetch_from_sharepoint(
self, start: datetime | None = None, end: datetime | None = None
) -> GenerateDocumentsOutput:
if self.graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
self.site_data = self._get_all_site_objects()
self.driveitems = self._get_all_driveitem_objects(start=start, end=end)
self._populate_sitedata_sites()
self._populate_sitedata_driveitems(start=start, end=end)
# goes over all urls, converts them into Document objects and then yields them in batches
doc_batch: list[Document] = []
batch_count = 0
for element in self.site_data:
for driveitem_object in element.driveitems:
logger.debug(f"Processing: {driveitem_object.web_url}")
doc_batch.append(
self._convert_driveitem_object_to_document(driveitem_object)
)
for driveitem in element.driveitems:
logger.debug(f"Processing: {driveitem.web_url}")
doc_batch.append(_convert_driveitem_to_document(driveitem))
batch_count += 1
if batch_count >= self.batch_size:
if len(doc_batch) >= self.batch_size:
yield doc_batch
batch_count = 0
doc_batch = []
yield doc_batch

View File

@ -256,13 +256,18 @@ def file_io_to_text(file: IO[Any]) -> str:
def extract_file_text(
file_name: str | None,
file: IO[Any],
break_on_unprocessable: bool = True,
) -> str:
if not file_name:
return file_io_to_text(file)
extension = get_file_ext(file_name)
if not check_file_ext_is_valid(extension):
raise RuntimeError("Unprocessable file type")
if break_on_unprocessable:
raise RuntimeError(f"Unprocessable file type: {file_name}")
else:
logger.warning(f"Unprocessable file type: {file_name}")
return ""
if extension == ".pdf":
return pdf_to_text(file=file)

View File

@ -161,7 +161,7 @@ const Main = () => {
Dropbox indexing status
</Title>
<Text className="mb-2">
Due to Dropbox's access key design, the Dropbox connector will only
Due to Dropbox access key design, the Dropbox connector will only
re-index files after a new access key is provided and the indexing
process is re-run manually. Check the docs for more information.
</Text>