mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-19 12:03:54 +02:00
Allow specification of specific google drive folders to index (#197)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import datetime
|
||||
import io
|
||||
from collections.abc import Generator
|
||||
from itertools import chain
|
||||
from typing import Any
|
||||
|
||||
from danswer.configs.app_configs import GOOGLE_DRIVE_INCLUDE_SHARED
|
||||
@@ -23,25 +24,49 @@ from PyPDF2 import PdfReader
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
|
||||
SCOPES = [
|
||||
"https://www.googleapis.com/auth/drive.readonly",
|
||||
"https://www.googleapis.com/auth/drive.metadata.readonly",
|
||||
]
|
||||
SUPPORTED_DRIVE_DOC_TYPES = [
|
||||
"application/vnd.google-apps.document",
|
||||
"application/pdf",
|
||||
"application/vnd.google-apps.spreadsheet",
|
||||
]
|
||||
DRIVE_FOLDER_TYPE = "application/vnd.google-apps.folder"
|
||||
ID_KEY = "id"
|
||||
LINK_KEY = "link"
|
||||
TYPE_KEY = "type"
|
||||
|
||||
|
||||
def get_folder_id(
|
||||
service: discovery.Resource, parent_id: str, folder_name: str
|
||||
) -> str | None:
|
||||
"""
|
||||
Get the ID of a folder given its name and the ID of its parent folder.
|
||||
"""
|
||||
query = f"'{parent_id}' in parents and name='{folder_name}' and mimeType='{DRIVE_FOLDER_TYPE}'"
|
||||
results = (
|
||||
service.files()
|
||||
.list(q=query, spaces="drive", fields="nextPageToken, files(id, name)")
|
||||
.execute()
|
||||
)
|
||||
items = results.get("files", [])
|
||||
return items[0]["id"] if items else None
|
||||
|
||||
|
||||
def get_file_batches(
|
||||
service: discovery.Resource,
|
||||
include_shared: bool = GOOGLE_DRIVE_INCLUDE_SHARED,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
time_range_start: SecondsSinceUnixEpoch | None = None,
|
||||
time_range_end: SecondsSinceUnixEpoch | None = None,
|
||||
folder_id: str | None = None, # if specified, only fetches files within this folder
|
||||
# if True, will fetch files in sub-folders of the specified folder ID. Only applies if folder_id is specified.
|
||||
traverse_subfolders: bool = True,
|
||||
) -> Generator[list[dict[str, str]], None, None]:
|
||||
next_page_token = ""
|
||||
subfolders: list[dict[str, str]] = []
|
||||
while next_page_token is not None:
|
||||
query = ""
|
||||
if time_range_start is not None:
|
||||
@@ -53,7 +78,10 @@ def get_file_batches(
|
||||
time_stop = (
|
||||
datetime.datetime.utcfromtimestamp(time_range_end).isoformat() + "Z"
|
||||
)
|
||||
query += f"and modifiedTime <= '{time_stop}'"
|
||||
query += f"and modifiedTime <= '{time_stop}' "
|
||||
if folder_id:
|
||||
query += f"and '{folder_id}' in parents "
|
||||
query = query.rstrip() # remove the trailing space(s)
|
||||
|
||||
results = (
|
||||
service.files()
|
||||
@@ -69,14 +97,30 @@ def get_file_batches(
|
||||
next_page_token = results.get("nextPageToken")
|
||||
files = results["files"]
|
||||
valid_files: list[dict[str, str]] = []
|
||||
|
||||
for file in files:
|
||||
if file["mimeType"] in SUPPORTED_DRIVE_DOC_TYPES:
|
||||
valid_files.append(file)
|
||||
elif file["mimeType"] == DRIVE_FOLDER_TYPE:
|
||||
subfolders.append(file)
|
||||
logger.info(
|
||||
f"Parseable Documents in batch: {[file['name'] for file in valid_files]}"
|
||||
)
|
||||
yield valid_files
|
||||
|
||||
if traverse_subfolders:
|
||||
for subfolder in subfolders:
|
||||
logger.info("Fetching all files in subfolder: " + subfolder["name"])
|
||||
yield from get_file_batches(
|
||||
service=service,
|
||||
include_shared=include_shared,
|
||||
batch_size=batch_size,
|
||||
time_range_start=time_range_start,
|
||||
time_range_end=time_range_end,
|
||||
folder_id=subfolder["id"],
|
||||
traverse_subfolders=traverse_subfolders,
|
||||
)
|
||||
|
||||
|
||||
def extract_text(file: dict[str, str], service: discovery.Resource) -> str:
|
||||
mime_type = file["mimeType"]
|
||||
@@ -105,13 +149,36 @@ def extract_text(file: dict[str, str], service: discovery.Resource) -> str:
|
||||
class GoogleDriveConnector(LoadConnector, PollConnector):
|
||||
def __init__(
|
||||
self,
|
||||
# optional list of folder paths e.g. "[My Folder/My Subfolder]"
|
||||
# if specified, will only index files in these folders
|
||||
folder_paths: list[str] | None = None,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
include_shared: bool = GOOGLE_DRIVE_INCLUDE_SHARED,
|
||||
) -> None:
|
||||
self.folder_paths = folder_paths or []
|
||||
self.batch_size = batch_size
|
||||
self.include_shared = include_shared
|
||||
self.creds: Credentials | None = None
|
||||
|
||||
@staticmethod
|
||||
def _process_folder_paths(
|
||||
service: discovery.Resource, folder_paths: list[str]
|
||||
) -> list[str]:
|
||||
"""['Folder/Sub Folder'] -> ['<FOLDER_ID>']"""
|
||||
folder_ids: list[str] = []
|
||||
for path in folder_paths:
|
||||
folder_names = path.split("/")
|
||||
parent_id = "root"
|
||||
for folder_name in folder_names:
|
||||
parent_id = get_folder_id(
|
||||
service=service, parent_id=parent_id, folder_name=folder_name
|
||||
)
|
||||
if parent_id is None:
|
||||
raise ValueError(f"Folder path '{path}' not found in Google Drive")
|
||||
folder_ids.append(parent_id)
|
||||
|
||||
return folder_ids
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
access_token_json_str = credentials[DB_CREDENTIALS_DICT_KEY]
|
||||
creds = get_drive_tokens(token_json_str=access_token_json_str)
|
||||
@@ -132,13 +199,25 @@ class GoogleDriveConnector(LoadConnector, PollConnector):
|
||||
raise PermissionError("Not logged into Google Drive")
|
||||
|
||||
service = discovery.build("drive", "v3", credentials=self.creds)
|
||||
for files_batch in get_file_batches(
|
||||
service,
|
||||
self.include_shared,
|
||||
self.batch_size,
|
||||
time_range_start=start,
|
||||
time_range_end=end,
|
||||
):
|
||||
folder_ids = self._process_folder_paths(service, self.folder_paths)
|
||||
if not folder_ids:
|
||||
folder_ids = [None]
|
||||
|
||||
file_batches = chain(
|
||||
*[
|
||||
get_file_batches(
|
||||
service=service,
|
||||
include_shared=self.include_shared,
|
||||
batch_size=self.batch_size,
|
||||
time_range_start=start,
|
||||
time_range_end=end,
|
||||
folder_id=folder_id,
|
||||
traverse_subfolders=True,
|
||||
)
|
||||
for folder_id in folder_ids
|
||||
]
|
||||
)
|
||||
for files_batch in file_batches:
|
||||
doc_batch = []
|
||||
for file in files_batch:
|
||||
text_contents = extract_text(file, service)
|
||||
|
Reference in New Issue
Block a user