From 63692a6bd3ddc13820dc25cdf87269c7fbbed5f4 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Thu, 13 Mar 2025 19:26:22 -0700 Subject: [PATCH] Fix perm sync memory usage (#4282) * Fix slack perm sync memory usage * Make perm syncing run in batches rather than fetching everything * Update backend/ee/onyx/external_permissions/slack/doc_sync.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Update backend/ee/onyx/external_permissions/slack/doc_sync.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Loud error on slack doc sync missing permissions --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- .../confluence/doc_sync.py | 26 +++--- .../external_permissions/gmail/doc_sync.py | 15 ++-- .../google_drive/doc_sync.py | 13 ++- .../external_permissions/slack/doc_sync.py | 89 +++++++++---------- .../onyx/external_permissions/sync_params.py | 3 +- backend/onyx/access/models.py | 2 +- .../tasks/doc_permission_syncing/tasks.py | 26 +++--- backend/onyx/connectors/slack/connector.py | 19 ++-- 8 files changed, 87 insertions(+), 106 deletions(-) diff --git a/backend/ee/onyx/external_permissions/confluence/doc_sync.py b/backend/ee/onyx/external_permissions/confluence/doc_sync.py index 8ed076a3c..981aa3acb 100644 --- a/backend/ee/onyx/external_permissions/confluence/doc_sync.py +++ b/backend/ee/onyx/external_permissions/confluence/doc_sync.py @@ -2,6 +2,7 @@ Rules defined here: https://confluence.atlassian.com/conf85/check-who-can-view-a-page-1283360557.html """ +from collections.abc import Generator from typing import Any from ee.onyx.configs.app_configs import CONFLUENCE_ANONYMOUS_ACCESS_IS_PUBLIC @@ -263,13 +264,11 @@ def _fetch_all_page_restrictions( space_permissions_by_space_key: dict[str, ExternalAccess], is_cloud: bool, callback: IndexingHeartbeatInterface | None, -) -> list[DocExternalAccess]: +) -> Generator[DocExternalAccess, None, None]: """ For all pages, if a page has restrictions, then use those restrictions. Otherwise, use the space's restrictions. """ - document_restrictions: list[DocExternalAccess] = [] - for slim_doc in slim_docs: if callback: if callback.should_stop(): @@ -286,11 +285,9 @@ def _fetch_all_page_restrictions( confluence_client=confluence_client, perm_sync_data=slim_doc.perm_sync_data, ): - document_restrictions.append( - DocExternalAccess( - doc_id=slim_doc.id, - external_access=restrictions, - ) + yield DocExternalAccess( + doc_id=slim_doc.id, + external_access=restrictions, ) # If there are restrictions, then we don't need to use the space's restrictions continue @@ -324,11 +321,9 @@ def _fetch_all_page_restrictions( continue # If there are no restrictions, then use the space's restrictions - document_restrictions.append( - DocExternalAccess( - doc_id=slim_doc.id, - external_access=space_permissions, - ) + yield DocExternalAccess( + doc_id=slim_doc.id, + external_access=space_permissions, ) if ( not space_permissions.is_public @@ -342,13 +337,12 @@ def _fetch_all_page_restrictions( ) logger.debug("Finished fetching all page restrictions for space") - return document_restrictions def confluence_doc_sync( cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None, -) -> list[DocExternalAccess]: +) -> Generator[DocExternalAccess, None, None]: """ Adds the external permissions to the documents in postgres if the document doesn't already exists in postgres, we create @@ -387,7 +381,7 @@ def confluence_doc_sync( slim_docs.extend(doc_batch) logger.debug("Fetching all page restrictions for space") - return _fetch_all_page_restrictions( + yield from _fetch_all_page_restrictions( confluence_client=confluence_connector.confluence_client, slim_docs=slim_docs, space_permissions_by_space_key=space_permissions_by_space_key, diff --git a/backend/ee/onyx/external_permissions/gmail/doc_sync.py b/backend/ee/onyx/external_permissions/gmail/doc_sync.py index 6f1bae674..3a58e5aca 100644 --- a/backend/ee/onyx/external_permissions/gmail/doc_sync.py +++ b/backend/ee/onyx/external_permissions/gmail/doc_sync.py @@ -1,3 +1,4 @@ +from collections.abc import Generator from datetime import datetime from datetime import timezone @@ -34,7 +35,7 @@ def _get_slim_doc_generator( def gmail_doc_sync( cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None, -) -> list[DocExternalAccess]: +) -> Generator[DocExternalAccess, None, None]: """ Adds the external permissions to the documents in postgres if the document doesn't already exists in postgres, we create @@ -48,7 +49,6 @@ def gmail_doc_sync( cc_pair, gmail_connector, callback=callback ) - document_external_access: list[DocExternalAccess] = [] for slim_doc_batch in slim_doc_generator: for slim_doc in slim_doc_batch: if callback: @@ -60,17 +60,14 @@ def gmail_doc_sync( if slim_doc.perm_sync_data is None: logger.warning(f"No permissions found for document {slim_doc.id}") continue + if user_email := slim_doc.perm_sync_data.get("user_email"): ext_access = ExternalAccess( external_user_emails=set([user_email]), external_user_group_ids=set(), is_public=False, ) - document_external_access.append( - DocExternalAccess( - doc_id=slim_doc.id, - external_access=ext_access, - ) + yield DocExternalAccess( + doc_id=slim_doc.id, + external_access=ext_access, ) - - return document_external_access diff --git a/backend/ee/onyx/external_permissions/google_drive/doc_sync.py b/backend/ee/onyx/external_permissions/google_drive/doc_sync.py index 8d3df7fa8..6b7aed3ec 100644 --- a/backend/ee/onyx/external_permissions/google_drive/doc_sync.py +++ b/backend/ee/onyx/external_permissions/google_drive/doc_sync.py @@ -1,3 +1,4 @@ +from collections.abc import Generator from datetime import datetime from datetime import timezone from typing import Any @@ -147,7 +148,7 @@ def _get_permissions_from_slim_doc( def gdrive_doc_sync( cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None, -) -> list[DocExternalAccess]: +) -> Generator[DocExternalAccess, None, None]: """ Adds the external permissions to the documents in postgres if the document doesn't already exists in postgres, we create @@ -161,7 +162,6 @@ def gdrive_doc_sync( slim_doc_generator = _get_slim_doc_generator(cc_pair, google_drive_connector) - document_external_accesses = [] for slim_doc_batch in slim_doc_generator: for slim_doc in slim_doc_batch: if callback: @@ -174,10 +174,7 @@ def gdrive_doc_sync( google_drive_connector=google_drive_connector, slim_doc=slim_doc, ) - document_external_accesses.append( - DocExternalAccess( - external_access=ext_access, - doc_id=slim_doc.id, - ) + yield DocExternalAccess( + external_access=ext_access, + doc_id=slim_doc.id, ) - return document_external_accesses diff --git a/backend/ee/onyx/external_permissions/slack/doc_sync.py b/backend/ee/onyx/external_permissions/slack/doc_sync.py index 0ae9b58cc..ce8b883a2 100644 --- a/backend/ee/onyx/external_permissions/slack/doc_sync.py +++ b/backend/ee/onyx/external_permissions/slack/doc_sync.py @@ -1,3 +1,5 @@ +from collections.abc import Generator + from slack_sdk import WebClient from ee.onyx.external_permissions.slack.utils import fetch_user_id_to_email_map @@ -14,35 +16,6 @@ from onyx.utils.logger import setup_logger logger = setup_logger() -def _get_slack_document_ids_and_channels( - cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None -) -> dict[str, list[str]]: - slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config) - slack_connector.load_credentials(cc_pair.credential.credential_json) - - slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback) - - channel_doc_map: dict[str, list[str]] = {} - for doc_metadata_batch in slim_doc_generator: - for doc_metadata in doc_metadata_batch: - if doc_metadata.perm_sync_data is None: - continue - channel_id = doc_metadata.perm_sync_data["channel_id"] - if channel_id not in channel_doc_map: - channel_doc_map[channel_id] = [] - channel_doc_map[channel_id].append(doc_metadata.id) - - if callback: - if callback.should_stop(): - raise RuntimeError( - "_get_slack_document_ids_and_channels: Stop signal detected" - ) - - callback.progress("_get_slack_document_ids_and_channels", 1) - - return channel_doc_map - - def _fetch_workspace_permissions( user_id_to_email_map: dict[str, str], ) -> ExternalAccess: @@ -122,10 +95,37 @@ def _fetch_channel_permissions( return channel_permissions +def _get_slack_document_access( + cc_pair: ConnectorCredentialPair, + channel_permissions: dict[str, ExternalAccess], + callback: IndexingHeartbeatInterface | None, +) -> Generator[DocExternalAccess, None, None]: + slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config) + slack_connector.load_credentials(cc_pair.credential.credential_json) + + slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback) + + for doc_metadata_batch in slim_doc_generator: + for doc_metadata in doc_metadata_batch: + if doc_metadata.perm_sync_data is None: + continue + channel_id = doc_metadata.perm_sync_data["channel_id"] + yield DocExternalAccess( + external_access=channel_permissions[channel_id], + doc_id=doc_metadata.id, + ) + + if callback: + if callback.should_stop(): + raise RuntimeError("_get_slack_document_access: Stop signal detected") + + callback.progress("_get_slack_document_access", 1) + + def slack_doc_sync( cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None, -) -> list[DocExternalAccess]: +) -> Generator[DocExternalAccess, None, None]: """ Adds the external permissions to the documents in postgres if the document doesn't already exists in postgres, we create @@ -136,9 +136,12 @@ def slack_doc_sync( token=cc_pair.credential.credential_json["slack_bot_token"] ) user_id_to_email_map = fetch_user_id_to_email_map(slack_client) - channel_doc_map = _get_slack_document_ids_and_channels( - cc_pair=cc_pair, callback=callback - ) + if not user_id_to_email_map: + raise ValueError( + "No user id to email map found. Please check to make sure that " + "your Slack bot token has the `users:read.email` scope" + ) + workspace_permissions = _fetch_workspace_permissions( user_id_to_email_map=user_id_to_email_map, ) @@ -148,18 +151,8 @@ def slack_doc_sync( user_id_to_email_map=user_id_to_email_map, ) - document_external_accesses = [] - for channel_id, ext_access in channel_permissions.items(): - doc_ids = channel_doc_map.get(channel_id) - if not doc_ids: - # No documents found for channel the channel_id - continue - - for doc_id in doc_ids: - document_external_accesses.append( - DocExternalAccess( - external_access=ext_access, - doc_id=doc_id, - ) - ) - return document_external_accesses + yield from _get_slack_document_access( + cc_pair=cc_pair, + channel_permissions=channel_permissions, + callback=callback, + ) diff --git a/backend/ee/onyx/external_permissions/sync_params.py b/backend/ee/onyx/external_permissions/sync_params.py index 9f8ed9681..28e27652c 100644 --- a/backend/ee/onyx/external_permissions/sync_params.py +++ b/backend/ee/onyx/external_permissions/sync_params.py @@ -1,4 +1,5 @@ from collections.abc import Callable +from collections.abc import Generator from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY @@ -23,7 +24,7 @@ DocSyncFuncType = Callable[ ConnectorCredentialPair, IndexingHeartbeatInterface | None, ], - list[DocExternalAccess], + Generator[DocExternalAccess, None, None], ] GroupSyncFuncType = Callable[ diff --git a/backend/onyx/access/models.py b/backend/onyx/access/models.py index e2364fcf7..411e53a03 100644 --- a/backend/onyx/access/models.py +++ b/backend/onyx/access/models.py @@ -20,7 +20,7 @@ class ExternalAccess: class DocExternalAccess: """ This is just a class to wrap the external access and the document ID - together. It's used for syncing document permissions to Redis. + together. It's used for syncing document permissions to Vespa. """ external_access: ExternalAccess diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index b308e5a18..ba2b68aa1 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -453,23 +453,23 @@ def connector_permission_sync_generator_task( redis_connector.permissions.set_fence(new_payload) callback = PermissionSyncCallback(redis_connector, lock, r) - document_external_accesses: list[DocExternalAccess] = doc_sync_func( - cc_pair, callback - ) + document_external_accesses = doc_sync_func(cc_pair, callback) task_logger.info( f"RedisConnector.permissions.generate_tasks starting. cc_pair={cc_pair_id}" ) - tasks_generated = redis_connector.permissions.generate_tasks( - celery_app=self.app, - lock=lock, - new_permissions=document_external_accesses, - source_string=source_type, - connector_id=cc_pair.connector.id, - credential_id=cc_pair.credential.id, - ) - if tasks_generated is None: - return None + + tasks_generated = 0 + for doc_external_access in document_external_accesses: + redis_connector.permissions.generate_tasks( + celery_app=self.app, + lock=lock, + new_permissions=[doc_external_access], + source_string=source_type, + connector_id=cc_pair.connector.id, + credential_id=cc_pair.credential.id, + ) + tasks_generated += 1 task_logger.info( f"RedisConnector.permissions.generate_tasks finished. " diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index 9c29ed437..138c3b422 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -412,8 +412,8 @@ def _get_all_doc_ids( callback=callback, ) - message_ts_set: set[str] = set() for message_batch in channel_message_batches: + slim_doc_batch: list[SlimDocument] = [] for message in message_batch: if msg_filter_func(message): continue @@ -421,18 +421,17 @@ def _get_all_doc_ids( # The document id is the channel id and the ts of the first message in the thread # Since we already have the first message of the thread, we dont have to # fetch the thread for id retrieval, saving time and API calls - message_ts_set.add(message["ts"]) - channel_metadata_list: list[SlimDocument] = [] - for message_ts in message_ts_set: - channel_metadata_list.append( - SlimDocument( - id=_build_doc_id(channel_id=channel_id, thread_ts=message_ts), - perm_sync_data={"channel_id": channel_id}, + slim_doc_batch.append( + SlimDocument( + id=_build_doc_id( + channel_id=channel_id, thread_ts=message["ts"] + ), + perm_sync_data={"channel_id": channel_id}, + ) ) - ) - yield channel_metadata_list + yield slim_doc_batch def _process_message(