Files
danswer/backend/onyx/connectors/dropbox/connector.py

183 lines
7.1 KiB
Python

from datetime import timezone
from io import BytesIO
from typing import Any
from dropbox import Dropbox # type: ignore
from dropbox.exceptions import ApiError # type:ignore
from dropbox.exceptions import AuthError # type:ignore
from dropbox.files import FileMetadata # type:ignore
from dropbox.files import FolderMetadata # type:ignore
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialInvalidError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.utils.logger import setup_logger
logger = setup_logger()
class DropboxConnector(LoadConnector, PollConnector):
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
self.batch_size = batch_size
self.dropbox_client: Dropbox | None = None
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
self.dropbox_client = Dropbox(credentials["dropbox_access_token"])
return None
def _download_file(self, path: str) -> bytes:
"""Download a single file from Dropbox."""
if self.dropbox_client is None:
raise ConnectorMissingCredentialError("Dropbox")
_, resp = self.dropbox_client.files_download(path)
return resp.content
def _get_shared_link(self, path: str) -> str:
"""Create a shared link for a file in Dropbox."""
if self.dropbox_client is None:
raise ConnectorMissingCredentialError("Dropbox")
try:
# Check if a shared link already exists
shared_links = self.dropbox_client.sharing_list_shared_links(path=path)
if shared_links.links:
return shared_links.links[0].url
link_metadata = (
self.dropbox_client.sharing_create_shared_link_with_settings(path)
)
return link_metadata.url
except ApiError as err:
logger.exception(f"Failed to create a shared link for {path}: {err}")
return ""
def _yield_files_recursive(
self,
path: str,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
) -> GenerateDocumentsOutput:
"""Yield files in batches from a specified Dropbox folder, including subfolders."""
if self.dropbox_client is None:
raise ConnectorMissingCredentialError("Dropbox")
result = self.dropbox_client.files_list_folder(
path,
limit=self.batch_size,
recursive=False,
include_non_downloadable_files=False,
)
while True:
batch: list[Document] = []
for entry in result.entries:
if isinstance(entry, FileMetadata):
modified_time = entry.client_modified
if modified_time.tzinfo is None:
# If no timezone info, assume it is UTC
modified_time = modified_time.replace(tzinfo=timezone.utc)
else:
# If not in UTC, translate it
modified_time = modified_time.astimezone(timezone.utc)
time_as_seconds = int(modified_time.timestamp())
if start and time_as_seconds < start:
continue
if end and time_as_seconds > end:
continue
downloaded_file = self._download_file(entry.path_display)
link = self._get_shared_link(entry.path_display)
try:
text = extract_file_text(
BytesIO(downloaded_file),
file_name=entry.name,
break_on_unprocessable=False,
)
batch.append(
Document(
id=f"doc:{entry.id}",
sections=[Section(link=link, text=text)],
source=DocumentSource.DROPBOX,
semantic_identifier=entry.name,
doc_updated_at=modified_time,
metadata={"type": "article"},
)
)
except Exception as e:
logger.exception(
f"Error decoding file {entry.path_display} as utf-8 error occurred: {e}"
)
elif isinstance(entry, FolderMetadata):
yield from self._yield_files_recursive(entry.path_lower, start, end)
if batch:
yield batch
if not result.has_more:
break
result = self.dropbox_client.files_list_folder_continue(result.cursor)
def load_from_state(self) -> GenerateDocumentsOutput:
return self.poll_source(None, None)
def poll_source(
self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
if self.dropbox_client is None:
raise ConnectorMissingCredentialError("Dropbox")
for batch in self._yield_files_recursive("", start, end):
yield batch
return None
def validate_connector_settings(self) -> None:
if self.dropbox_client is None:
raise ConnectorMissingCredentialError("Dropbox credentials not loaded.")
try:
self.dropbox_client.files_list_folder(path="", limit=1)
except AuthError as e:
logger.exception("Failed to validate Dropbox credentials")
raise CredentialInvalidError(f"Dropbox credential is invalid: {e.error}")
except ApiError as e:
if (
e.error is not None
and "insufficient_permissions" in str(e.error).lower()
):
raise InsufficientPermissionsError(
"Your Dropbox token does not have sufficient permissions."
)
raise ConnectorValidationError(
f"Unexpected Dropbox error during validation: {e.user_message_text or e}"
)
except Exception as e:
raise Exception(f"Unexpected error during Dropbox settings validation: {e}")
if __name__ == "__main__":
import os
connector = DropboxConnector()
connector.load_credentials(
{
"dropbox_access_token": os.environ["DROPBOX_ACCESS_TOKEN"],
}
)
document_batches = connector.load_from_state()
print(next(document_batches))