Fix perm sync memory usage (#4282)

* Fix slack perm sync memory usage

* Make perm syncing run in batches rather than fetching everything

* Update backend/ee/onyx/external_permissions/slack/doc_sync.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update backend/ee/onyx/external_permissions/slack/doc_sync.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Loud error on slack doc sync missing permissions

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
Chris Weaver 2025-03-13 19:26:22 -07:00 committed by GitHub
parent 934700b928
commit 63692a6bd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 87 additions and 106 deletions

View File

@ -2,6 +2,7 @@
Rules defined here:
https://confluence.atlassian.com/conf85/check-who-can-view-a-page-1283360557.html
"""
from collections.abc import Generator
from typing import Any
from ee.onyx.configs.app_configs import CONFLUENCE_ANONYMOUS_ACCESS_IS_PUBLIC
@ -263,13 +264,11 @@ def _fetch_all_page_restrictions(
space_permissions_by_space_key: dict[str, ExternalAccess],
is_cloud: bool,
callback: IndexingHeartbeatInterface | None,
) -> list[DocExternalAccess]:
) -> Generator[DocExternalAccess, None, None]:
"""
For all pages, if a page has restrictions, then use those restrictions.
Otherwise, use the space's restrictions.
"""
document_restrictions: list[DocExternalAccess] = []
for slim_doc in slim_docs:
if callback:
if callback.should_stop():
@ -286,11 +285,9 @@ def _fetch_all_page_restrictions(
confluence_client=confluence_client,
perm_sync_data=slim_doc.perm_sync_data,
):
document_restrictions.append(
DocExternalAccess(
doc_id=slim_doc.id,
external_access=restrictions,
)
yield DocExternalAccess(
doc_id=slim_doc.id,
external_access=restrictions,
)
# If there are restrictions, then we don't need to use the space's restrictions
continue
@ -324,11 +321,9 @@ def _fetch_all_page_restrictions(
continue
# If there are no restrictions, then use the space's restrictions
document_restrictions.append(
DocExternalAccess(
doc_id=slim_doc.id,
external_access=space_permissions,
)
yield DocExternalAccess(
doc_id=slim_doc.id,
external_access=space_permissions,
)
if (
not space_permissions.is_public
@ -342,13 +337,12 @@ def _fetch_all_page_restrictions(
)
logger.debug("Finished fetching all page restrictions for space")
return document_restrictions
def confluence_doc_sync(
cc_pair: ConnectorCredentialPair,
callback: IndexingHeartbeatInterface | None,
) -> list[DocExternalAccess]:
) -> Generator[DocExternalAccess, None, None]:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
@ -387,7 +381,7 @@ def confluence_doc_sync(
slim_docs.extend(doc_batch)
logger.debug("Fetching all page restrictions for space")
return _fetch_all_page_restrictions(
yield from _fetch_all_page_restrictions(
confluence_client=confluence_connector.confluence_client,
slim_docs=slim_docs,
space_permissions_by_space_key=space_permissions_by_space_key,

View File

@ -1,3 +1,4 @@
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
@ -34,7 +35,7 @@ def _get_slim_doc_generator(
def gmail_doc_sync(
cc_pair: ConnectorCredentialPair,
callback: IndexingHeartbeatInterface | None,
) -> list[DocExternalAccess]:
) -> Generator[DocExternalAccess, None, None]:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
@ -48,7 +49,6 @@ def gmail_doc_sync(
cc_pair, gmail_connector, callback=callback
)
document_external_access: list[DocExternalAccess] = []
for slim_doc_batch in slim_doc_generator:
for slim_doc in slim_doc_batch:
if callback:
@ -60,17 +60,14 @@ def gmail_doc_sync(
if slim_doc.perm_sync_data is None:
logger.warning(f"No permissions found for document {slim_doc.id}")
continue
if user_email := slim_doc.perm_sync_data.get("user_email"):
ext_access = ExternalAccess(
external_user_emails=set([user_email]),
external_user_group_ids=set(),
is_public=False,
)
document_external_access.append(
DocExternalAccess(
doc_id=slim_doc.id,
external_access=ext_access,
)
yield DocExternalAccess(
doc_id=slim_doc.id,
external_access=ext_access,
)
return document_external_access

View File

@ -1,3 +1,4 @@
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from typing import Any
@ -147,7 +148,7 @@ def _get_permissions_from_slim_doc(
def gdrive_doc_sync(
cc_pair: ConnectorCredentialPair,
callback: IndexingHeartbeatInterface | None,
) -> list[DocExternalAccess]:
) -> Generator[DocExternalAccess, None, None]:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
@ -161,7 +162,6 @@ def gdrive_doc_sync(
slim_doc_generator = _get_slim_doc_generator(cc_pair, google_drive_connector)
document_external_accesses = []
for slim_doc_batch in slim_doc_generator:
for slim_doc in slim_doc_batch:
if callback:
@ -174,10 +174,7 @@ def gdrive_doc_sync(
google_drive_connector=google_drive_connector,
slim_doc=slim_doc,
)
document_external_accesses.append(
DocExternalAccess(
external_access=ext_access,
doc_id=slim_doc.id,
)
yield DocExternalAccess(
external_access=ext_access,
doc_id=slim_doc.id,
)
return document_external_accesses

View File

@ -1,3 +1,5 @@
from collections.abc import Generator
from slack_sdk import WebClient
from ee.onyx.external_permissions.slack.utils import fetch_user_id_to_email_map
@ -14,35 +16,6 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
def _get_slack_document_ids_and_channels(
cc_pair: ConnectorCredentialPair, callback: IndexingHeartbeatInterface | None
) -> dict[str, list[str]]:
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
slack_connector.load_credentials(cc_pair.credential.credential_json)
slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback)
channel_doc_map: dict[str, list[str]] = {}
for doc_metadata_batch in slim_doc_generator:
for doc_metadata in doc_metadata_batch:
if doc_metadata.perm_sync_data is None:
continue
channel_id = doc_metadata.perm_sync_data["channel_id"]
if channel_id not in channel_doc_map:
channel_doc_map[channel_id] = []
channel_doc_map[channel_id].append(doc_metadata.id)
if callback:
if callback.should_stop():
raise RuntimeError(
"_get_slack_document_ids_and_channels: Stop signal detected"
)
callback.progress("_get_slack_document_ids_and_channels", 1)
return channel_doc_map
def _fetch_workspace_permissions(
user_id_to_email_map: dict[str, str],
) -> ExternalAccess:
@ -122,10 +95,37 @@ def _fetch_channel_permissions(
return channel_permissions
def _get_slack_document_access(
cc_pair: ConnectorCredentialPair,
channel_permissions: dict[str, ExternalAccess],
callback: IndexingHeartbeatInterface | None,
) -> Generator[DocExternalAccess, None, None]:
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
slack_connector.load_credentials(cc_pair.credential.credential_json)
slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback)
for doc_metadata_batch in slim_doc_generator:
for doc_metadata in doc_metadata_batch:
if doc_metadata.perm_sync_data is None:
continue
channel_id = doc_metadata.perm_sync_data["channel_id"]
yield DocExternalAccess(
external_access=channel_permissions[channel_id],
doc_id=doc_metadata.id,
)
if callback:
if callback.should_stop():
raise RuntimeError("_get_slack_document_access: Stop signal detected")
callback.progress("_get_slack_document_access", 1)
def slack_doc_sync(
cc_pair: ConnectorCredentialPair,
callback: IndexingHeartbeatInterface | None,
) -> list[DocExternalAccess]:
) -> Generator[DocExternalAccess, None, None]:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
@ -136,9 +136,12 @@ def slack_doc_sync(
token=cc_pair.credential.credential_json["slack_bot_token"]
)
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)
channel_doc_map = _get_slack_document_ids_and_channels(
cc_pair=cc_pair, callback=callback
)
if not user_id_to_email_map:
raise ValueError(
"No user id to email map found. Please check to make sure that "
"your Slack bot token has the `users:read.email` scope"
)
workspace_permissions = _fetch_workspace_permissions(
user_id_to_email_map=user_id_to_email_map,
)
@ -148,18 +151,8 @@ def slack_doc_sync(
user_id_to_email_map=user_id_to_email_map,
)
document_external_accesses = []
for channel_id, ext_access in channel_permissions.items():
doc_ids = channel_doc_map.get(channel_id)
if not doc_ids:
# No documents found for channel the channel_id
continue
for doc_id in doc_ids:
document_external_accesses.append(
DocExternalAccess(
external_access=ext_access,
doc_id=doc_id,
)
)
return document_external_accesses
yield from _get_slack_document_access(
cc_pair=cc_pair,
channel_permissions=channel_permissions,
callback=callback,
)

View File

@ -1,4 +1,5 @@
from collections.abc import Callable
from collections.abc import Generator
from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY
from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY
@ -23,7 +24,7 @@ DocSyncFuncType = Callable[
ConnectorCredentialPair,
IndexingHeartbeatInterface | None,
],
list[DocExternalAccess],
Generator[DocExternalAccess, None, None],
]
GroupSyncFuncType = Callable[

View File

@ -20,7 +20,7 @@ class ExternalAccess:
class DocExternalAccess:
"""
This is just a class to wrap the external access and the document ID
together. It's used for syncing document permissions to Redis.
together. It's used for syncing document permissions to Vespa.
"""
external_access: ExternalAccess

View File

@ -453,23 +453,23 @@ def connector_permission_sync_generator_task(
redis_connector.permissions.set_fence(new_payload)
callback = PermissionSyncCallback(redis_connector, lock, r)
document_external_accesses: list[DocExternalAccess] = doc_sync_func(
cc_pair, callback
)
document_external_accesses = doc_sync_func(cc_pair, callback)
task_logger.info(
f"RedisConnector.permissions.generate_tasks starting. cc_pair={cc_pair_id}"
)
tasks_generated = redis_connector.permissions.generate_tasks(
celery_app=self.app,
lock=lock,
new_permissions=document_external_accesses,
source_string=source_type,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
)
if tasks_generated is None:
return None
tasks_generated = 0
for doc_external_access in document_external_accesses:
redis_connector.permissions.generate_tasks(
celery_app=self.app,
lock=lock,
new_permissions=[doc_external_access],
source_string=source_type,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
)
tasks_generated += 1
task_logger.info(
f"RedisConnector.permissions.generate_tasks finished. "

View File

@ -412,8 +412,8 @@ def _get_all_doc_ids(
callback=callback,
)
message_ts_set: set[str] = set()
for message_batch in channel_message_batches:
slim_doc_batch: list[SlimDocument] = []
for message in message_batch:
if msg_filter_func(message):
continue
@ -421,18 +421,17 @@ def _get_all_doc_ids(
# The document id is the channel id and the ts of the first message in the thread
# Since we already have the first message of the thread, we dont have to
# fetch the thread for id retrieval, saving time and API calls
message_ts_set.add(message["ts"])
channel_metadata_list: list[SlimDocument] = []
for message_ts in message_ts_set:
channel_metadata_list.append(
SlimDocument(
id=_build_doc_id(channel_id=channel_id, thread_ts=message_ts),
perm_sync_data={"channel_id": channel_id},
slim_doc_batch.append(
SlimDocument(
id=_build_doc_id(
channel_id=channel_id, thread_ts=message["ts"]
),
perm_sync_data={"channel_id": channel_id},
)
)
)
yield channel_metadata_list
yield slim_doc_batch
def _process_message(