From f67b5356faec99efa18a87c9fa90a5d4e0084bd5 Mon Sep 17 00:00:00 2001 From: skylares <93623871+skylares@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:12:57 -0500 Subject: [PATCH] Create google drive e2e test (#3635) * Create e2e google drive test * Drive sync issue * Add endpoints for group syncing * google e2e fixes/improvements and add xfail to zendesk tests * mypy errors * Key change * Small changes * Merged main to fix group sync issue * Update test_permission_sync.py * Update google_drive_api_utils.py * Update test_zendesk_connector.py --------- Co-authored-by: hagen-danswer --- backend/onyx/server/documents/cc_pair.py | 75 ++++ .../common_utils/managers/cc_pair.py | 76 +++- .../google/google_drive_api_utils.py | 186 ++++++++++ .../google/test_permission_sync.py | 332 ++++++++++++++++++ 4 files changed, 659 insertions(+), 10 deletions(-) create mode 100644 backend/tests/integration/connector_job_tests/google/google_drive_api_utils.py create mode 100644 backend/tests/integration/connector_job_tests/google/test_permission_sync.py diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index e54b7a918a74..c3a3540f7c7b 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -15,6 +15,9 @@ from onyx.background.celery.celery_utils import get_deletion_attempt_snapshot from onyx.background.celery.tasks.doc_permission_syncing.tasks import ( try_creating_permissions_sync_task, ) +from onyx.background.celery.tasks.external_group_syncing.tasks import ( + try_creating_external_group_sync_task, +) from onyx.background.celery.tasks.pruning.tasks import ( try_creating_prune_generator_task, ) @@ -443,6 +446,78 @@ def sync_cc_pair( ) +@router.get("/admin/cc-pair/{cc_pair_id}/sync-groups") +def get_cc_pair_latest_group_sync( + cc_pair_id: int, + user: User = Depends(current_curator_or_admin_user), + db_session: Session = Depends(get_session), +) -> datetime | None: + cc_pair = get_connector_credential_pair_from_id_for_user( + cc_pair_id=cc_pair_id, + db_session=db_session, + user=user, + get_editable=False, + ) + if not cc_pair: + raise HTTPException( + status_code=400, + detail="cc_pair not found for current user's permissions", + ) + + return cc_pair.last_time_external_group_sync + + +@router.post("/admin/cc-pair/{cc_pair_id}/sync-groups") +def sync_cc_pair_groups( + cc_pair_id: int, + user: User = Depends(current_curator_or_admin_user), + db_session: Session = Depends(get_session), + tenant_id: str | None = Depends(get_current_tenant_id), +) -> StatusResponse[list[int]]: + """Triggers group sync on a particular cc_pair immediately""" + + cc_pair = get_connector_credential_pair_from_id_for_user( + cc_pair_id=cc_pair_id, + db_session=db_session, + user=user, + get_editable=False, + ) + if not cc_pair: + raise HTTPException( + status_code=400, + detail="Connection not found for current user's permissions", + ) + + r = get_redis_client(tenant_id=tenant_id) + + redis_connector = RedisConnector(tenant_id, cc_pair_id) + if redis_connector.external_group_sync.fenced: + raise HTTPException( + status_code=HTTPStatus.CONFLICT, + detail="External group sync task already in progress.", + ) + + logger.info( + f"External group sync cc_pair={cc_pair_id} " + f"connector_id={cc_pair.connector_id} " + f"credential_id={cc_pair.credential_id} " + f"{cc_pair.connector.name} connector." + ) + tasks_created = try_creating_external_group_sync_task( + primary_app, cc_pair_id, r, CURRENT_TENANT_ID_CONTEXTVAR.get() + ) + if not tasks_created: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail="External group sync task creation failed.", + ) + + return StatusResponse( + success=True, + message="Successfully created the external group sync task.", + ) + + @router.get("/admin/cc-pair/{cc_pair_id}/get-docs-sync-status") def get_docs_sync_status( cc_pair_id: int, diff --git a/backend/tests/integration/common_utils/managers/cc_pair.py b/backend/tests/integration/common_utils/managers/cc_pair.py index dc8e534012a2..44c029d7687d 100644 --- a/backend/tests/integration/common_utils/managers/cc_pair.py +++ b/backend/tests/integration/common_utils/managers/cc_pair.py @@ -432,30 +432,61 @@ class CCPairManager: if user_performing_action else GENERAL_HEADERS, ) - # if result.status_code != 409: result.raise_for_status() + group_sync_result = requests.post( + url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-groups", + headers=user_performing_action.headers + if user_performing_action + else GENERAL_HEADERS, + ) + if group_sync_result.status_code != 409: + group_sync_result.raise_for_status() + @staticmethod - def get_sync_task( + def get_doc_sync_task( cc_pair: DATestCCPair, user_performing_action: DATestUser | None = None, ) -> datetime | None: - response = requests.get( + doc_sync_response = requests.get( url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-permissions", headers=user_performing_action.headers if user_performing_action else GENERAL_HEADERS, ) - response.raise_for_status() - response_str = response.json() + doc_sync_response.raise_for_status() + doc_sync_response_str = doc_sync_response.json() # If the response itself is a datetime string, parse it - if not isinstance(response_str, str): + if not isinstance(doc_sync_response_str, str): return None try: - return datetime.fromisoformat(response_str) + return datetime.fromisoformat(doc_sync_response_str) + except ValueError: + return None + + @staticmethod + def get_group_sync_task( + cc_pair: DATestCCPair, + user_performing_action: DATestUser | None = None, + ) -> datetime | None: + group_sync_response = requests.get( + url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-groups", + headers=user_performing_action.headers + if user_performing_action + else GENERAL_HEADERS, + ) + group_sync_response.raise_for_status() + group_sync_response_str = group_sync_response.json() + + # If the response itself is a datetime string, parse it + if not isinstance(group_sync_response_str, str): + return None + + try: + return datetime.fromisoformat(group_sync_response_str) except ValueError: return None @@ -498,15 +529,37 @@ class CCPairManager: timeout: float = MAX_DELAY, number_of_updated_docs: int = 0, user_performing_action: DATestUser | None = None, + # Sometimes waiting for a group sync is not necessary + should_wait_for_group_sync: bool = True, + # Sometimes waiting for a vespa sync is not necessary + should_wait_for_vespa_sync: bool = True, ) -> None: """after: The task register time must be after this time.""" + doc_synced = False + group_synced = False start = time.monotonic() while True: - last_synced = CCPairManager.get_sync_task(cc_pair, user_performing_action) - if last_synced and last_synced > after: - print(f"last_synced: {last_synced}") + # We are treating both syncs as part of one larger permission sync job + doc_last_synced = CCPairManager.get_doc_sync_task( + cc_pair, user_performing_action + ) + group_last_synced = CCPairManager.get_group_sync_task( + cc_pair, user_performing_action + ) + + if not doc_synced and doc_last_synced and doc_last_synced > after: + print(f"doc_last_synced: {doc_last_synced}") print(f"sync command start time: {after}") print(f"permission sync complete: cc_pair={cc_pair.id}") + doc_synced = True + + if not group_synced and group_last_synced and group_last_synced > after: + print(f"group_last_synced: {group_last_synced}") + print(f"sync command start time: {after}") + print(f"group sync complete: cc_pair={cc_pair.id}") + group_synced = True + + if doc_synced and (group_synced or not should_wait_for_group_sync): break elapsed = time.monotonic() - start @@ -524,6 +577,9 @@ class CCPairManager: # this shouldnt be necessary but something is off with the timing for the sync jobs time.sleep(5) + if not should_wait_for_vespa_sync: + return + print("waiting for vespa sync") # wait for the vespa sync to complete once the permission sync is complete start = time.monotonic() diff --git a/backend/tests/integration/connector_job_tests/google/google_drive_api_utils.py b/backend/tests/integration/connector_job_tests/google/google_drive_api_utils.py new file mode 100644 index 000000000000..368e4ab7eb16 --- /dev/null +++ b/backend/tests/integration/connector_job_tests/google/google_drive_api_utils.py @@ -0,0 +1,186 @@ +from typing import Any +from uuid import uuid4 + +from google.oauth2.service_account import Credentials + +from onyx.connectors.google_utils.resources import get_drive_service +from onyx.connectors.google_utils.resources import get_google_docs_service +from onyx.connectors.google_utils.resources import GoogleDocsService +from onyx.connectors.google_utils.resources import GoogleDriveService + + +GOOGLE_SCOPES = { + "google_drive": [ + "https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/admin.directory.group", + "https://www.googleapis.com/auth/admin.directory.user", + ], +} + + +def _create_doc_service(drive_service: GoogleDriveService) -> GoogleDocsService: + docs_service = get_google_docs_service( + creds=drive_service._http.credentials, + user_email=drive_service._http.credentials._subject, + ) + return docs_service + + +class GoogleDriveManager: + @staticmethod + def create_impersonated_drive_service( + service_account_key: dict, impersonated_user_email: str + ) -> GoogleDriveService: + """Gets a drive service that impersonates a specific user""" + credentials = Credentials.from_service_account_info( + service_account_key, + scopes=GOOGLE_SCOPES["google_drive"], + subject=impersonated_user_email, + ) + + service = get_drive_service(credentials, impersonated_user_email) + + # Verify impersonation + about = service.about().get(fields="user").execute() + if about.get("user", {}).get("emailAddress") != impersonated_user_email: + raise ValueError( + f"Failed to impersonate {impersonated_user_email}. " + f"Instead got {about.get('user', {}).get('emailAddress')}" + ) + return service + + @staticmethod + def create_shared_drive( + drive_service: GoogleDriveService, admin_email: str, test_id: str + ) -> str: + """ + Creates a shared drive and returns the drive's ID + """ + try: + about = drive_service.about().get(fields="user").execute() + creating_user = about["user"]["emailAddress"] + + # Verify we're still impersonating the admin + if creating_user != admin_email: + raise ValueError( + f"Expected to create drive as {admin_email}, but instead created drive as {creating_user}" + ) + + drive_metadata = {"name": f"perm_sync_drive_{test_id}"} + + request_id = str(uuid4()) + drive = ( + drive_service.drives() + .create( + body=drive_metadata, + requestId=request_id, + fields="id,name,capabilities", + ) + .execute() + ) + + return drive["id"] + except Exception as e: + print(f"Error creating shared drive: {str(e)}") + raise + + @staticmethod + def create_empty_doc( + drive_service: Any, + drive_id: str, + ) -> str: + """ + Creates an empty document in the given drive and returns the document's ID + """ + file_metadata = { + "name": f"perm_sync_doc_{drive_id}_{str(uuid4())}", + "mimeType": "application/vnd.google-apps.document", + "parents": [drive_id], + } + file = ( + drive_service.files() + .create(body=file_metadata, supportsAllDrives=True) + .execute() + ) + + return file["id"] + + @staticmethod + def append_text_to_doc( + drive_service: GoogleDriveService, doc_id: str, text: str + ) -> None: + docs_service = _create_doc_service(drive_service) + + docs_service.documents().batchUpdate( + documentId=doc_id, + body={ + "requests": [{"insertText": {"location": {"index": 1}, "text": text}}] + }, + ).execute() + + @staticmethod + def update_file_permissions( + drive_service: Any, file_id: str, email: str, role: str = "reader" + ) -> None: + permission = {"type": "user", "role": role, "emailAddress": email} + drive_service.permissions().create( + fileId=file_id, + body=permission, + supportsAllDrives=True, + sendNotificationEmail=False, + ).execute() + + @staticmethod + def remove_file_permissions(drive_service: Any, file_id: str, email: str) -> None: + permissions = ( + drive_service.permissions() + .list(fileId=file_id, supportsAllDrives=True) + .execute() + ) + # TODO: This is a hacky way to remove permissions. Removes anyone with reader role. + # Need to find a way to map a user's email to a permission id. + # The permissions.get returns a permissionID but email field is None, + # something to do with it being a group or domain wide delegation. + for permission in permissions.get("permissions", []): + if permission.get("role") == "reader": + drive_service.permissions().delete( + fileId=file_id, + permissionId=permission["id"], + supportsAllDrives=True, + ).execute() + break + + @staticmethod + def make_file_public(drive_service: Any, file_id: str) -> None: + permission = {"type": "anyone", "role": "reader"} + drive_service.permissions().create( + fileId=file_id, body=permission, supportsAllDrives=True + ).execute() + + @staticmethod + def cleanup_drive(drive_service: Any, drive_id: str) -> None: + try: + # Delete up to 2 files that match our pattern + file_name_prefix = f"perm_sync_doc_{drive_id}" + files = ( + drive_service.files() + .list( + q=f"name contains '{file_name_prefix}'", + driveId=drive_id, + includeItemsFromAllDrives=True, + supportsAllDrives=True, + corpora="drive", + fields="files(id)", + ) + .execute() + ) + + for file in files.get("files", []): + drive_service.files().delete( + fileId=file["id"], supportsAllDrives=True + ).execute() + + # Then delete the drive + drive_service.drives().delete(driveId=drive_id).execute() + except Exception as e: + print(f"Error cleaning up drive {drive_id}: {e}") diff --git a/backend/tests/integration/connector_job_tests/google/test_permission_sync.py b/backend/tests/integration/connector_job_tests/google/test_permission_sync.py new file mode 100644 index 000000000000..58f58a6c5b92 --- /dev/null +++ b/backend/tests/integration/connector_job_tests/google/test_permission_sync.py @@ -0,0 +1,332 @@ +import json +import os +from collections.abc import Generator +from datetime import datetime +from datetime import timezone +from uuid import uuid4 + +import pytest + +from onyx.configs.constants import DocumentSource +from onyx.connectors.google_utils.resources import GoogleDriveService +from onyx.connectors.google_utils.shared_constants import ( + DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY, +) +from onyx.connectors.google_utils.shared_constants import ( + DB_CREDENTIALS_PRIMARY_ADMIN_KEY, +) +from onyx.connectors.models import InputType +from onyx.db.enums import AccessType +from tests.integration.common_utils.managers.cc_pair import CCPairManager +from tests.integration.common_utils.managers.connector import ConnectorManager +from tests.integration.common_utils.managers.credential import CredentialManager +from tests.integration.common_utils.managers.document_search import ( + DocumentSearchManager, +) +from tests.integration.common_utils.managers.llm_provider import LLMProviderManager +from tests.integration.common_utils.managers.user import UserManager +from tests.integration.common_utils.test_models import DATestCCPair +from tests.integration.common_utils.test_models import DATestConnector +from tests.integration.common_utils.test_models import DATestCredential +from tests.integration.common_utils.test_models import DATestUser +from tests.integration.common_utils.vespa import vespa_fixture +from tests.integration.connector_job_tests.google.google_drive_api_utils import ( + GoogleDriveManager, +) + + +@pytest.fixture() +def google_drive_test_env_setup() -> ( + Generator[ + tuple[ + GoogleDriveService, str, DATestCCPair, DATestUser, DATestUser, DATestUser + ], + None, + None, + ] +): + # Creating an admin user (first user created is automatically an admin) + admin_user: DATestUser = UserManager.create(email="admin@onyx-test.com") + # Creating a non-admin user + test_user_1: DATestUser = UserManager.create(email="test_user_1@onyx-test.com") + # Creating a non-admin user + test_user_2: DATestUser = UserManager.create(email="test_user_2@onyx-test.com") + + service_account_key = os.environ["FULL_CONTROL_DRIVE_SERVICE_ACCOUNT"] + drive_id: str | None = None + + try: + credentials = { + DB_CREDENTIALS_PRIMARY_ADMIN_KEY: admin_user.email, + DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: service_account_key, + } + + # Setup Google Drive + drive_service = GoogleDriveManager.create_impersonated_drive_service( + json.loads(service_account_key), admin_user.email + ) + test_id = str(uuid4()) + drive_id = GoogleDriveManager.create_shared_drive( + drive_service, admin_user.email, test_id + ) + + # Setup Onyx infrastructure + LLMProviderManager.create(user_performing_action=admin_user) + + before = datetime.now(timezone.utc) + credential: DATestCredential = CredentialManager.create( + source=DocumentSource.GOOGLE_DRIVE, + credential_json=credentials, + user_performing_action=admin_user, + ) + connector: DATestConnector = ConnectorManager.create( + name="Google Drive Test", + input_type=InputType.POLL, + source=DocumentSource.GOOGLE_DRIVE, + connector_specific_config={ + "shared_drive_urls": f"https://drive.google.com/drive/folders/{drive_id}" + }, + access_type=AccessType.SYNC, + user_performing_action=admin_user, + ) + cc_pair: DATestCCPair = CCPairManager.create( + credential_id=credential.id, + connector_id=connector.id, + access_type=AccessType.SYNC, + user_performing_action=admin_user, + ) + CCPairManager.wait_for_indexing_completion( + cc_pair=cc_pair, after=before, user_performing_action=admin_user + ) + + yield drive_service, drive_id, cc_pair, admin_user, test_user_1, test_user_2 + + except json.JSONDecodeError: + pytest.skip("FULL_CONTROL_DRIVE_SERVICE_ACCOUNT is not valid JSON") + finally: + # Cleanup drive and file + if drive_id is not None: + GoogleDriveManager.cleanup_drive(drive_service, drive_id) + + +@pytest.mark.xfail(reason="Needs to be tested for flakiness") +def test_google_permission_sync( + reset: None, + vespa_client: vespa_fixture, + google_drive_test_env_setup: tuple[ + GoogleDriveService, str, DATestCCPair, DATestUser, DATestUser, DATestUser + ], +) -> None: + ( + drive_service, + drive_id, + cc_pair, + admin_user, + test_user_1, + test_user_2, + ) = google_drive_test_env_setup + + # ----------------------BASELINE TEST---------------------- + before = datetime.now(timezone.utc) + + # Create empty test doc in drive + doc_id_1 = GoogleDriveManager.create_empty_doc(drive_service, drive_id) + + # Append text to doc + doc_text_1 = "The secret number is 12345" + GoogleDriveManager.append_text_to_doc(drive_service, doc_id_1, doc_text_1) + + # run indexing + CCPairManager.run_once(cc_pair, admin_user) + CCPairManager.wait_for_indexing_completion( + cc_pair=cc_pair, after=before, user_performing_action=admin_user + ) + + # run permission sync + CCPairManager.sync( + cc_pair=cc_pair, + user_performing_action=admin_user, + ) + CCPairManager.wait_for_sync( + cc_pair=cc_pair, + after=before, + number_of_updated_docs=1, + user_performing_action=admin_user, + ) + + # Verify admin has access to document + admin_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=admin_user + ) + assert doc_text_1 in [result.strip("\ufeff") for result in admin_results] + + # Verify test_user_1 cannot access document + user1_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=test_user_1 + ) + assert doc_text_1 not in [result.strip("\ufeff") for result in user1_results] + + # ----------------------GRANT USER 1 DOC PERMISSIONS TEST-------------------------- + before = datetime.now(timezone.utc) + + # Grant user 1 access to document 1 + GoogleDriveManager.update_file_permissions( + drive_service=drive_service, + file_id=doc_id_1, + email=test_user_1.email, + role="reader", + ) + + # Create a second doc in the drive which user 1 should not have access to + doc_id_2 = GoogleDriveManager.create_empty_doc(drive_service, drive_id) + doc_text_2 = "The secret number is 67890" + GoogleDriveManager.append_text_to_doc(drive_service, doc_id_2, doc_text_2) + + # Run indexing + CCPairManager.run_once(cc_pair, admin_user) + CCPairManager.wait_for_indexing_completion( + cc_pair=cc_pair, + after=before, + user_performing_action=admin_user, + ) + + # Run permission sync + CCPairManager.sync( + cc_pair=cc_pair, + user_performing_action=admin_user, + ) + CCPairManager.wait_for_sync( + cc_pair=cc_pair, + after=before, + number_of_updated_docs=1, + user_performing_action=admin_user, + ) + + # Verify admin can access both documents + admin_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=admin_user + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in admin_results + } + + # Verify user 1 can access document 1 + user1_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=test_user_1 + ) + assert doc_text_1 in [result.strip("\ufeff") for result in user1_results] + + # Verify user 1 cannot access document 2 + user1_results_2 = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=test_user_1 + ) + assert doc_text_2 not in [result.strip("\ufeff") for result in user1_results_2] + + # ----------------------REMOVE USER 1 DOC PERMISSIONS TEST-------------------------- + before = datetime.now(timezone.utc) + + # Remove user 1 access to document 1 + GoogleDriveManager.remove_file_permissions( + drive_service=drive_service, file_id=doc_id_1, email=test_user_1.email + ) + # Run permission sync + CCPairManager.sync( + cc_pair=cc_pair, + user_performing_action=admin_user, + ) + CCPairManager.wait_for_sync( + cc_pair=cc_pair, + after=before, + number_of_updated_docs=1, + user_performing_action=admin_user, + ) + + # Verify admin can access both documents + admin_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=admin_user + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in admin_results + } + + # Verify user 1 cannot access either document + user1_results = DocumentSearchManager.search_documents( + query="secret numbers", user_performing_action=test_user_1 + ) + assert {result.strip("\ufeff") for result in user1_results} == set() + + # ----------------------GRANT USER 1 DRIVE PERMISSIONS TEST-------------------------- + before = datetime.now(timezone.utc) + + # Grant user 1 access to drive + GoogleDriveManager.update_file_permissions( + drive_service=drive_service, + file_id=drive_id, + email=test_user_1.email, + role="reader", + ) + + # Run permission sync + CCPairManager.sync( + cc_pair=cc_pair, + user_performing_action=admin_user, + ) + + CCPairManager.wait_for_sync( + cc_pair=cc_pair, + after=before, + number_of_updated_docs=2, + user_performing_action=admin_user, + # if we are only updating the group definition for this test we use this varaiable, + # since it doesn't result in a vespa sync so we don't want to wait for it + should_wait_for_vespa_sync=False, + ) + + # Verify user 1 can access both documents + user1_results = DocumentSearchManager.search_documents( + query="secret numbers", user_performing_action=test_user_1 + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in user1_results + } + + # ----------------------MAKE DRIVE PUBLIC TEST-------------------------- + before = datetime.now(timezone.utc) + + # Unable to make drive itself public as Google's security policies prevent this, so we make the documents public instead + GoogleDriveManager.make_file_public(drive_service, doc_id_1) + GoogleDriveManager.make_file_public(drive_service, doc_id_2) + + # Run permission sync + CCPairManager.sync( + cc_pair=cc_pair, + user_performing_action=admin_user, + ) + CCPairManager.wait_for_sync( + cc_pair=cc_pair, + after=before, + number_of_updated_docs=2, + user_performing_action=admin_user, + ) + + # Verify all users can access both documents + admin_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=admin_user + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in admin_results + } + + user1_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=test_user_1 + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in user1_results + } + + user2_results = DocumentSearchManager.search_documents( + query="secret number", user_performing_action=test_user_2 + ) + assert {doc_text_1, doc_text_2} == { + result.strip("\ufeff") for result in user2_results + }