Add more retries in Google Drive connector

This commit is contained in:
Weves 2023-10-24 19:53:53 -07:00 committed by Chris Weaver
parent ef2b445201
commit fbb05e630d
2 changed files with 77 additions and 28 deletions

View File

@ -0,0 +1,42 @@
from collections.abc import Callable
from logging import Logger
from typing import Any
from typing import cast
from typing import TypeVar
from retry import retry
from danswer.utils.logger import setup_logger
logger = setup_logger()
F = TypeVar("F", bound=Callable[..., Any])
def retry_builder(
tries: int = 10,
delay: float = 0.1,
max_delay: float | None = None,
backoff: float = 2,
jitter: tuple[float, float] | float = 1,
) -> Callable[[F], F]:
"""Builds a generic wrapper/decorator for calls to external APIs that
may fail due to rate limiting, flakes, or other reasons. Applies expontential
backoff with jitter to retry the call."""
@retry(
tries=tries,
delay=delay,
max_delay=max_delay,
backoff=backoff,
jitter=jitter,
logger=cast(Logger, logger),
)
def retry_with_default(func: F) -> F:
def wrapped_func(*args: list, **kwargs: dict[str, Any]) -> Any:
return func(*args, **kwargs)
return cast(F, wrapped_func)
return retry_with_default

View File

@ -21,6 +21,7 @@ from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import IGNORE_FOR_QA
from danswer.connectors.cross_connector_utils.file_utils import read_pdf_file
from danswer.connectors.cross_connector_utils.retry_wrapper import retry_builder
from danswer.connectors.google_drive.connector_auth import (
get_google_drive_creds_for_authorized_user,
)
@ -61,6 +62,8 @@ class GDriveMimeType(str, Enum):
GoogleDriveFileType = dict[str, Any]
add_retries = retry_builder()
def _run_drive_file_query(
service: discovery.Resource,
@ -73,24 +76,26 @@ def _run_drive_file_query(
next_page_token = ""
while next_page_token is not None:
logger.debug(f"Running Google Drive fetch with query: {query}")
results = (
service.files()
.list(
corpora="allDrives"
if include_shared
else "user", # needed to search through shared drives
pageSize=batch_size,
supportsAllDrives=include_shared,
includeItemsFromAllDrives=include_shared,
fields=(
"nextPageToken, files(mimeType, id, name, "
"modifiedTime, webViewLink, shortcutDetails)"
),
pageToken=next_page_token,
q=query,
results = add_retries(
lambda: (
service.files()
.list(
corpora="allDrives"
if include_shared
else "user", # needed to search through shared drives
pageSize=batch_size,
supportsAllDrives=include_shared,
includeItemsFromAllDrives=include_shared,
fields=(
"nextPageToken, files(mimeType, id, name, "
"modifiedTime, webViewLink, shortcutDetails)"
),
pageToken=next_page_token,
q=query,
)
.execute()
)
.execute()
)
)()
next_page_token = results.get("nextPageToken")
files = results["files"]
for file in files:
@ -101,7 +106,7 @@ def _run_drive_file_query(
supportsAllDrives=include_shared,
fields="mimeType, id, name, modifiedTime, webViewLink, shortcutDetails",
)
file = file.execute()
file = add_retries(lambda: file.execute())()
except HttpError:
logger.error(
f"Failed to follow shortcut with details: {file['shortcutDetails']}"
@ -129,17 +134,19 @@ def _get_folder_id(
query += f"mimeType='{DRIVE_FOLDER_TYPE}'"
# TODO: support specifying folder path in shared drive rather than just `My Drive`
results = (
service.files()
.list(
q=query,
spaces="drive",
fields="nextPageToken, files(id, name, shortcutDetails)",
supportsAllDrives=include_shared,
includeItemsFromAllDrives=include_shared,
results = add_retries(
lambda: (
service.files()
.list(
q=query,
spaces="drive",
fields="nextPageToken, files(id, name, shortcutDetails)",
supportsAllDrives=include_shared,
includeItemsFromAllDrives=include_shared,
)
.execute()
)
.execute()
)
)()
items = results.get("files", [])
folder_id = None