id not set in checkpoint FINAL (#4656)

* it will never happen again.

* fix perm sync issue

* fix perm sync issue2

* ensure member emails map is populated

* other fix for perm sync

* address CW comments

* nit
This commit is contained in:
Evan Lohn
2025-05-02 17:10:21 -07:00
committed by GitHub
parent 5c3820b39f
commit 113876b276
6 changed files with 89 additions and 34 deletions

View File

@@ -68,7 +68,7 @@ def _fetch_permissions_for_permission_ids(
retrieval_function=drive_service.permissions().list,
list_key="permissions",
fileId=doc_id,
fields="permissions(id, emailAddress, type, domain)",
fields="permissions(id, emailAddress, type, domain),nextPageToken",
supportsAllDrives=True,
continue_on_404_or_403=True,
)

View File

@@ -51,7 +51,7 @@ def _get_drive_members(
drive_service.permissions().list,
list_key="permissions",
fileId=drive_id,
fields="permissions(emailAddress, type)",
fields="permissions(emailAddress, type),nextPageToken",
supportsAllDrives=True,
# can only set `useDomainAdminAccess` to true if the user
# is an admin
@@ -107,7 +107,7 @@ def _map_group_email_to_member_emails(
admin_service.members().list,
list_key="members",
groupKey=group_email,
fields="members(email)",
fields="members(email),nextPageToken",
):
group_member_emails.add(member["email"])
@@ -127,6 +127,11 @@ def _build_onyx_groups(
for drive_id, (group_emails, user_emails) in drive_id_to_members_map.items():
all_member_emails: set[str] = user_emails
for group_email in group_emails:
if group_email not in group_email_to_member_emails_map:
logger.warning(
f"Group email {group_email} not found in group_email_to_member_emails_map"
)
continue
all_member_emails.update(group_email_to_member_emails_map[group_email])
onyx_groups.append(
ExternalUserGroup(

View File

@@ -12,6 +12,7 @@ from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.google_utils.google_auth import get_google_creds
from onyx.connectors.google_utils.google_utils import execute_paginated_retrieval
from onyx.connectors.google_utils.google_utils import execute_single_retrieval
from onyx.connectors.google_utils.resources import get_admin_service
from onyx.connectors.google_utils.resources import get_gmail_service
from onyx.connectors.google_utils.shared_constants import (
@@ -301,7 +302,7 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnector):
q=query,
continue_on_404_or_403=True,
):
full_threads = execute_paginated_retrieval(
full_threads = execute_single_retrieval(
retrieval_function=gmail_service.users().threads().get,
list_key=None,
userId=user_email,

View File

@@ -377,7 +377,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
cv.notify_all()
# when entering the iterator with a previous id in the checkpoint, the user
# just finished that drive from a previous run.
# has just finished that drive from a previous run.
if (
completion.stage == DriveRetrievalStage.MY_DRIVE_FILES
and completion.current_folder_or_drive_id is not None
@@ -492,9 +492,14 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
if resuming:
drive_id = curr_stage.current_folder_or_drive_id
if drive_id is None:
raise ValueError("drive id not set in checkpoint")
resume_start = curr_stage.completed_until
yield from _yield_from_drive(drive_id, resume_start)
logger.warning(
f"drive id not set in checkpoint for user {user_email}. "
"This happens occasionally when the connector is interrupted "
"and resumed."
)
else:
resume_start = curr_stage.completed_until
yield from _yield_from_drive(drive_id, resume_start)
# Don't enter resuming case for folder retrieval
resuming = False
@@ -536,9 +541,14 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
if resuming:
folder_id = curr_stage.current_folder_or_drive_id
if folder_id is None:
raise ValueError("folder id not set in checkpoint")
resume_start = curr_stage.completed_until
yield from _yield_from_folder_crawl(folder_id, resume_start)
logger.warning(
f"folder id not set in checkpoint for user {user_email}. "
"This happens occasionally when the connector is interrupted "
"and resumed."
)
else:
resume_start = curr_stage.completed_until
yield from _yield_from_folder_crawl(folder_id, resume_start)
last_processed_folder = folder_id
skipping_seen_folders = last_processed_folder is not None
@@ -1040,6 +1050,9 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
if len(files_batch) < self.batch_size:
continue
logger.info(
f"Yielding batch of {len(files_batch)} files; num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
)
yield from _yield_batch(files_batch)
files_batch = []

View File

@@ -108,6 +108,54 @@ def get_file_owners(file: GoogleDriveFileType) -> list[str]:
]
def _execute_single_retrieval(
retrieval_function: Callable,
continue_on_404_or_403: bool = False,
**request_kwargs: Any,
) -> GoogleDriveFileType:
"""Execute a single retrieval from Google Drive API"""
try:
results = retrieval_function(**request_kwargs).execute()
except HttpError as e:
if e.resp.status >= 500:
results = add_retries(
lambda: retrieval_function(**request_kwargs).execute()
)()
elif e.resp.status == 404 or e.resp.status == 403:
if continue_on_404_or_403:
logger.debug(f"Error executing request: {e}")
results = {}
else:
raise e
elif e.resp.status == 429:
results = _execute_with_retry(
lambda: retrieval_function(**request_kwargs).execute()
)
else:
logger.exception("Error executing request:")
raise e
return results
def execute_single_retrieval(
retrieval_function: Callable,
list_key: str | None = None,
continue_on_404_or_403: bool = False,
**request_kwargs: Any,
) -> Iterator[GoogleDriveFileType]:
results = _execute_single_retrieval(
retrieval_function,
continue_on_404_or_403,
**request_kwargs,
)
if list_key:
for item in results.get(list_key, []):
yield item
else:
yield results
def execute_paginated_retrieval(
retrieval_function: Callable,
list_key: str | None = None,
@@ -119,32 +167,20 @@ def execute_paginated_retrieval(
retrieval_function: The specific list function to call (e.g., service.files().list)
**kwargs: Arguments to pass to the list function
"""
if "fields" not in kwargs or "nextPageToken" not in kwargs["fields"]:
raise ValueError(
"fields must contain nextPageToken for execute_paginated_retrieval"
)
next_page_token = kwargs.get(PAGE_TOKEN_KEY, "")
while next_page_token is not None:
request_kwargs = kwargs.copy()
if next_page_token:
request_kwargs[PAGE_TOKEN_KEY] = next_page_token
try:
results = retrieval_function(**request_kwargs).execute()
except HttpError as e:
if e.resp.status >= 500:
results = add_retries(
lambda: retrieval_function(**request_kwargs).execute()
)()
elif e.resp.status == 404 or e.resp.status == 403:
if continue_on_404_or_403:
logger.debug(f"Error executing request: {e}")
results = {}
else:
raise e
elif e.resp.status == 429:
results = _execute_with_retry(
lambda: retrieval_function(**request_kwargs).execute()
)
else:
logger.exception("Error executing request:")
raise e
results = _execute_single_retrieval(
retrieval_function,
continue_on_404_or_403,
**request_kwargs,
)
next_page_token = results.get(NEXT_PAGE_TOKEN_KEY)
if list_key:

View File

@@ -101,7 +101,7 @@ def get_group_map(google_drive_connector: GoogleDriveConnector) -> dict[str, lis
admin_service.groups().list,
list_key="groups",
domain=google_drive_connector.google_domain,
fields="groups(email)",
fields="groups(email),nextPageToken",
):
# The id is the group email
group_email = group["email"]
@@ -112,7 +112,7 @@ def get_group_map(google_drive_connector: GoogleDriveConnector) -> dict[str, lis
admin_service.members().list,
list_key="members",
groupKey=group_email,
fields="members(email)",
fields="members(email),nextPageToken",
):
group_member_emails.append(member["email"])
group_map[group_email] = group_member_emails