mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-18 19:43:26 +02:00
183 lines
7.1 KiB
Python
183 lines
7.1 KiB
Python
from datetime import timezone
|
|
from io import BytesIO
|
|
from typing import Any
|
|
|
|
from dropbox import Dropbox # type: ignore
|
|
from dropbox.exceptions import ApiError # type:ignore
|
|
from dropbox.exceptions import AuthError # type:ignore
|
|
from dropbox.files import FileMetadata # type:ignore
|
|
from dropbox.files import FolderMetadata # type:ignore
|
|
|
|
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
|
from onyx.configs.constants import DocumentSource
|
|
from onyx.connectors.exceptions import ConnectorValidationError
|
|
from onyx.connectors.exceptions import CredentialInvalidError
|
|
from onyx.connectors.exceptions import InsufficientPermissionsError
|
|
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
|
from onyx.connectors.interfaces import LoadConnector
|
|
from onyx.connectors.interfaces import PollConnector
|
|
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
|
from onyx.connectors.models import ConnectorMissingCredentialError
|
|
from onyx.connectors.models import Document
|
|
from onyx.connectors.models import Section
|
|
from onyx.file_processing.extract_file_text import extract_file_text
|
|
from onyx.utils.logger import setup_logger
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class DropboxConnector(LoadConnector, PollConnector):
|
|
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
|
|
self.batch_size = batch_size
|
|
self.dropbox_client: Dropbox | None = None
|
|
|
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
|
self.dropbox_client = Dropbox(credentials["dropbox_access_token"])
|
|
return None
|
|
|
|
def _download_file(self, path: str) -> bytes:
|
|
"""Download a single file from Dropbox."""
|
|
if self.dropbox_client is None:
|
|
raise ConnectorMissingCredentialError("Dropbox")
|
|
_, resp = self.dropbox_client.files_download(path)
|
|
return resp.content
|
|
|
|
def _get_shared_link(self, path: str) -> str:
|
|
"""Create a shared link for a file in Dropbox."""
|
|
if self.dropbox_client is None:
|
|
raise ConnectorMissingCredentialError("Dropbox")
|
|
|
|
try:
|
|
# Check if a shared link already exists
|
|
shared_links = self.dropbox_client.sharing_list_shared_links(path=path)
|
|
if shared_links.links:
|
|
return shared_links.links[0].url
|
|
|
|
link_metadata = (
|
|
self.dropbox_client.sharing_create_shared_link_with_settings(path)
|
|
)
|
|
return link_metadata.url
|
|
except ApiError as err:
|
|
logger.exception(f"Failed to create a shared link for {path}: {err}")
|
|
return ""
|
|
|
|
def _yield_files_recursive(
|
|
self,
|
|
path: str,
|
|
start: SecondsSinceUnixEpoch | None,
|
|
end: SecondsSinceUnixEpoch | None,
|
|
) -> GenerateDocumentsOutput:
|
|
"""Yield files in batches from a specified Dropbox folder, including subfolders."""
|
|
if self.dropbox_client is None:
|
|
raise ConnectorMissingCredentialError("Dropbox")
|
|
|
|
result = self.dropbox_client.files_list_folder(
|
|
path,
|
|
limit=self.batch_size,
|
|
recursive=False,
|
|
include_non_downloadable_files=False,
|
|
)
|
|
|
|
while True:
|
|
batch: list[Document] = []
|
|
for entry in result.entries:
|
|
if isinstance(entry, FileMetadata):
|
|
modified_time = entry.client_modified
|
|
if modified_time.tzinfo is None:
|
|
# If no timezone info, assume it is UTC
|
|
modified_time = modified_time.replace(tzinfo=timezone.utc)
|
|
else:
|
|
# If not in UTC, translate it
|
|
modified_time = modified_time.astimezone(timezone.utc)
|
|
|
|
time_as_seconds = int(modified_time.timestamp())
|
|
if start and time_as_seconds < start:
|
|
continue
|
|
if end and time_as_seconds > end:
|
|
continue
|
|
|
|
downloaded_file = self._download_file(entry.path_display)
|
|
link = self._get_shared_link(entry.path_display)
|
|
try:
|
|
text = extract_file_text(
|
|
BytesIO(downloaded_file),
|
|
file_name=entry.name,
|
|
break_on_unprocessable=False,
|
|
)
|
|
batch.append(
|
|
Document(
|
|
id=f"doc:{entry.id}",
|
|
sections=[Section(link=link, text=text)],
|
|
source=DocumentSource.DROPBOX,
|
|
semantic_identifier=entry.name,
|
|
doc_updated_at=modified_time,
|
|
metadata={"type": "article"},
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Error decoding file {entry.path_display} as utf-8 error occurred: {e}"
|
|
)
|
|
|
|
elif isinstance(entry, FolderMetadata):
|
|
yield from self._yield_files_recursive(entry.path_lower, start, end)
|
|
|
|
if batch:
|
|
yield batch
|
|
|
|
if not result.has_more:
|
|
break
|
|
|
|
result = self.dropbox_client.files_list_folder_continue(result.cursor)
|
|
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
return self.poll_source(None, None)
|
|
|
|
def poll_source(
|
|
self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
|
|
) -> GenerateDocumentsOutput:
|
|
if self.dropbox_client is None:
|
|
raise ConnectorMissingCredentialError("Dropbox")
|
|
|
|
for batch in self._yield_files_recursive("", start, end):
|
|
yield batch
|
|
|
|
return None
|
|
|
|
def validate_connector_settings(self) -> None:
|
|
if self.dropbox_client is None:
|
|
raise ConnectorMissingCredentialError("Dropbox credentials not loaded.")
|
|
|
|
try:
|
|
self.dropbox_client.files_list_folder(path="", limit=1)
|
|
except AuthError as e:
|
|
logger.exception("Failed to validate Dropbox credentials")
|
|
raise CredentialInvalidError(f"Dropbox credential is invalid: {e.error}")
|
|
except ApiError as e:
|
|
if (
|
|
e.error is not None
|
|
and "insufficient_permissions" in str(e.error).lower()
|
|
):
|
|
raise InsufficientPermissionsError(
|
|
"Your Dropbox token does not have sufficient permissions."
|
|
)
|
|
raise ConnectorValidationError(
|
|
f"Unexpected Dropbox error during validation: {e.user_message_text or e}"
|
|
)
|
|
except Exception as e:
|
|
raise Exception(f"Unexpected error during Dropbox settings validation: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import os
|
|
|
|
connector = DropboxConnector()
|
|
connector.load_credentials(
|
|
{
|
|
"dropbox_access_token": os.environ["DROPBOX_ACCESS_TOKEN"],
|
|
}
|
|
)
|
|
document_batches = connector.load_from_state()
|
|
print(next(document_batches))
|