From c4865d57b15ff43eb4fa1cbae79e6f359d885df4 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Wed, 2 Apr 2025 17:01:05 -0700 Subject: [PATCH] Fix tons of users w/o drive access causing timeouts (#4437) --- .../onyx/connectors/google_drive/connector.py | 28 ++++++++++++++++++- backend/onyx/utils/threadpool_concurrency.py | 7 +++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index 66003670be..e4d4e86b78 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -445,6 +445,9 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo logger.warning( f"User '{user_email}' does not have access to the drive APIs." ) + # mark this user as done so we don't try to retrieve anything for them + # again + curr_stage.stage = DriveRetrievalStage.DONE return raise @@ -581,6 +584,25 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo drive_ids_to_retrieve, checkpoint ) + # only process emails that we haven't already completed retrieval for + non_completed_org_emails = [ + user_email + for user_email, stage in checkpoint.completion_map.items() + if stage != DriveRetrievalStage.DONE + ] + + # don't process too many emails before returning a checkpoint. This is + # to resolve the case where there are a ton of emails that don't have access + # to the drive APIs. Without this, we could loop through these emails for + # more than 3 hours, causing a timeout and stalling progress. + email_batch_takes_us_to_completion = True + MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = 50 + if len(non_completed_org_emails) > MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING: + non_completed_org_emails = non_completed_org_emails[ + :MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING + ] + email_batch_takes_us_to_completion = False + user_retrieval_gens = [ self._impersonate_user_for_retrieval( email, @@ -591,10 +613,14 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo start, end, ) - for email in all_org_emails + for email in non_completed_org_emails ] yield from parallel_yield(user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS) + # if there are more emails to process, don't mark as complete + if not email_batch_takes_us_to_completion: + return + remaining_folders = ( drive_ids_to_retrieve | folder_ids_to_retrieve ) - self._retrieved_ids diff --git a/backend/onyx/utils/threadpool_concurrency.py b/backend/onyx/utils/threadpool_concurrency.py index fd39fa04df..d0824d75b8 100644 --- a/backend/onyx/utils/threadpool_concurrency.py +++ b/backend/onyx/utils/threadpool_concurrency.py @@ -332,14 +332,15 @@ def wait_on_background(task: TimeoutThread[R]) -> R: return task.result -def _next_or_none(ind: int, g: Iterator[R]) -> tuple[int, R | None]: - return ind, next(g, None) +def _next_or_none(ind: int, gen: Iterator[R]) -> tuple[int, R | None]: + return ind, next(gen, None) def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R]: with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_index: dict[Future[tuple[int, R | None]], int] = { - executor.submit(_next_or_none, i, g): i for i, g in enumerate(gens) + executor.submit(_next_or_none, ind, gen): ind + for ind, gen in enumerate(gens) } next_ind = len(gens)