Create google drive e2e test (#3635)

* Create e2e google drive test

* Drive sync issue

* Add endpoints for group syncing

* google e2e fixes/improvements and add xfail to zendesk tests

* mypy errors

* Key change

* Small changes

* Merged main to fix group sync issue

* Update test_permission_sync.py

* Update google_drive_api_utils.py

* Update test_zendesk_connector.py

---------

Co-authored-by: hagen-danswer <hagen@danswer.ai>
This commit is contained in:
skylares
2025-01-28 17:12:57 -05:00
committed by GitHub
parent 9bdb581220
commit f67b5356fa
4 changed files with 659 additions and 10 deletions

View File

@@ -15,6 +15,9 @@ from onyx.background.celery.celery_utils import get_deletion_attempt_snapshot
from onyx.background.celery.tasks.doc_permission_syncing.tasks import (
try_creating_permissions_sync_task,
)
from onyx.background.celery.tasks.external_group_syncing.tasks import (
try_creating_external_group_sync_task,
)
from onyx.background.celery.tasks.pruning.tasks import (
try_creating_prune_generator_task,
)
@@ -443,6 +446,78 @@ def sync_cc_pair(
)
@router.get("/admin/cc-pair/{cc_pair_id}/sync-groups")
def get_cc_pair_latest_group_sync(
cc_pair_id: int,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> datetime | None:
cc_pair = get_connector_credential_pair_from_id_for_user(
cc_pair_id=cc_pair_id,
db_session=db_session,
user=user,
get_editable=False,
)
if not cc_pair:
raise HTTPException(
status_code=400,
detail="cc_pair not found for current user's permissions",
)
return cc_pair.last_time_external_group_sync
@router.post("/admin/cc-pair/{cc_pair_id}/sync-groups")
def sync_cc_pair_groups(
cc_pair_id: int,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str | None = Depends(get_current_tenant_id),
) -> StatusResponse[list[int]]:
"""Triggers group sync on a particular cc_pair immediately"""
cc_pair = get_connector_credential_pair_from_id_for_user(
cc_pair_id=cc_pair_id,
db_session=db_session,
user=user,
get_editable=False,
)
if not cc_pair:
raise HTTPException(
status_code=400,
detail="Connection not found for current user's permissions",
)
r = get_redis_client(tenant_id=tenant_id)
redis_connector = RedisConnector(tenant_id, cc_pair_id)
if redis_connector.external_group_sync.fenced:
raise HTTPException(
status_code=HTTPStatus.CONFLICT,
detail="External group sync task already in progress.",
)
logger.info(
f"External group sync cc_pair={cc_pair_id} "
f"connector_id={cc_pair.connector_id} "
f"credential_id={cc_pair.credential_id} "
f"{cc_pair.connector.name} connector."
)
tasks_created = try_creating_external_group_sync_task(
primary_app, cc_pair_id, r, CURRENT_TENANT_ID_CONTEXTVAR.get()
)
if not tasks_created:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="External group sync task creation failed.",
)
return StatusResponse(
success=True,
message="Successfully created the external group sync task.",
)
@router.get("/admin/cc-pair/{cc_pair_id}/get-docs-sync-status")
def get_docs_sync_status(
cc_pair_id: int,

View File

@@ -432,30 +432,61 @@ class CCPairManager:
if user_performing_action
else GENERAL_HEADERS,
)
#
if result.status_code != 409:
result.raise_for_status()
group_sync_result = requests.post(
url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-groups",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
if group_sync_result.status_code != 409:
group_sync_result.raise_for_status()
@staticmethod
def get_sync_task(
def get_doc_sync_task(
cc_pair: DATestCCPair,
user_performing_action: DATestUser | None = None,
) -> datetime | None:
response = requests.get(
doc_sync_response = requests.get(
url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-permissions",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
response.raise_for_status()
response_str = response.json()
doc_sync_response.raise_for_status()
doc_sync_response_str = doc_sync_response.json()
# If the response itself is a datetime string, parse it
if not isinstance(response_str, str):
if not isinstance(doc_sync_response_str, str):
return None
try:
return datetime.fromisoformat(response_str)
return datetime.fromisoformat(doc_sync_response_str)
except ValueError:
return None
@staticmethod
def get_group_sync_task(
cc_pair: DATestCCPair,
user_performing_action: DATestUser | None = None,
) -> datetime | None:
group_sync_response = requests.get(
url=f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair.id}/sync-groups",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
group_sync_response.raise_for_status()
group_sync_response_str = group_sync_response.json()
# If the response itself is a datetime string, parse it
if not isinstance(group_sync_response_str, str):
return None
try:
return datetime.fromisoformat(group_sync_response_str)
except ValueError:
return None
@@ -498,15 +529,37 @@ class CCPairManager:
timeout: float = MAX_DELAY,
number_of_updated_docs: int = 0,
user_performing_action: DATestUser | None = None,
# Sometimes waiting for a group sync is not necessary
should_wait_for_group_sync: bool = True,
# Sometimes waiting for a vespa sync is not necessary
should_wait_for_vespa_sync: bool = True,
) -> None:
"""after: The task register time must be after this time."""
doc_synced = False
group_synced = False
start = time.monotonic()
while True:
last_synced = CCPairManager.get_sync_task(cc_pair, user_performing_action)
if last_synced and last_synced > after:
print(f"last_synced: {last_synced}")
# We are treating both syncs as part of one larger permission sync job
doc_last_synced = CCPairManager.get_doc_sync_task(
cc_pair, user_performing_action
)
group_last_synced = CCPairManager.get_group_sync_task(
cc_pair, user_performing_action
)
if not doc_synced and doc_last_synced and doc_last_synced > after:
print(f"doc_last_synced: {doc_last_synced}")
print(f"sync command start time: {after}")
print(f"permission sync complete: cc_pair={cc_pair.id}")
doc_synced = True
if not group_synced and group_last_synced and group_last_synced > after:
print(f"group_last_synced: {group_last_synced}")
print(f"sync command start time: {after}")
print(f"group sync complete: cc_pair={cc_pair.id}")
group_synced = True
if doc_synced and (group_synced or not should_wait_for_group_sync):
break
elapsed = time.monotonic() - start
@@ -524,6 +577,9 @@ class CCPairManager:
# this shouldnt be necessary but something is off with the timing for the sync jobs
time.sleep(5)
if not should_wait_for_vespa_sync:
return
print("waiting for vespa sync")
# wait for the vespa sync to complete once the permission sync is complete
start = time.monotonic()

View File

@@ -0,0 +1,186 @@
from typing import Any
from uuid import uuid4
from google.oauth2.service_account import Credentials
from onyx.connectors.google_utils.resources import get_drive_service
from onyx.connectors.google_utils.resources import get_google_docs_service
from onyx.connectors.google_utils.resources import GoogleDocsService
from onyx.connectors.google_utils.resources import GoogleDriveService
GOOGLE_SCOPES = {
"google_drive": [
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/admin.directory.group",
"https://www.googleapis.com/auth/admin.directory.user",
],
}
def _create_doc_service(drive_service: GoogleDriveService) -> GoogleDocsService:
docs_service = get_google_docs_service(
creds=drive_service._http.credentials,
user_email=drive_service._http.credentials._subject,
)
return docs_service
class GoogleDriveManager:
@staticmethod
def create_impersonated_drive_service(
service_account_key: dict, impersonated_user_email: str
) -> GoogleDriveService:
"""Gets a drive service that impersonates a specific user"""
credentials = Credentials.from_service_account_info(
service_account_key,
scopes=GOOGLE_SCOPES["google_drive"],
subject=impersonated_user_email,
)
service = get_drive_service(credentials, impersonated_user_email)
# Verify impersonation
about = service.about().get(fields="user").execute()
if about.get("user", {}).get("emailAddress") != impersonated_user_email:
raise ValueError(
f"Failed to impersonate {impersonated_user_email}. "
f"Instead got {about.get('user', {}).get('emailAddress')}"
)
return service
@staticmethod
def create_shared_drive(
drive_service: GoogleDriveService, admin_email: str, test_id: str
) -> str:
"""
Creates a shared drive and returns the drive's ID
"""
try:
about = drive_service.about().get(fields="user").execute()
creating_user = about["user"]["emailAddress"]
# Verify we're still impersonating the admin
if creating_user != admin_email:
raise ValueError(
f"Expected to create drive as {admin_email}, but instead created drive as {creating_user}"
)
drive_metadata = {"name": f"perm_sync_drive_{test_id}"}
request_id = str(uuid4())
drive = (
drive_service.drives()
.create(
body=drive_metadata,
requestId=request_id,
fields="id,name,capabilities",
)
.execute()
)
return drive["id"]
except Exception as e:
print(f"Error creating shared drive: {str(e)}")
raise
@staticmethod
def create_empty_doc(
drive_service: Any,
drive_id: str,
) -> str:
"""
Creates an empty document in the given drive and returns the document's ID
"""
file_metadata = {
"name": f"perm_sync_doc_{drive_id}_{str(uuid4())}",
"mimeType": "application/vnd.google-apps.document",
"parents": [drive_id],
}
file = (
drive_service.files()
.create(body=file_metadata, supportsAllDrives=True)
.execute()
)
return file["id"]
@staticmethod
def append_text_to_doc(
drive_service: GoogleDriveService, doc_id: str, text: str
) -> None:
docs_service = _create_doc_service(drive_service)
docs_service.documents().batchUpdate(
documentId=doc_id,
body={
"requests": [{"insertText": {"location": {"index": 1}, "text": text}}]
},
).execute()
@staticmethod
def update_file_permissions(
drive_service: Any, file_id: str, email: str, role: str = "reader"
) -> None:
permission = {"type": "user", "role": role, "emailAddress": email}
drive_service.permissions().create(
fileId=file_id,
body=permission,
supportsAllDrives=True,
sendNotificationEmail=False,
).execute()
@staticmethod
def remove_file_permissions(drive_service: Any, file_id: str, email: str) -> None:
permissions = (
drive_service.permissions()
.list(fileId=file_id, supportsAllDrives=True)
.execute()
)
# TODO: This is a hacky way to remove permissions. Removes anyone with reader role.
# Need to find a way to map a user's email to a permission id.
# The permissions.get returns a permissionID but email field is None,
# something to do with it being a group or domain wide delegation.
for permission in permissions.get("permissions", []):
if permission.get("role") == "reader":
drive_service.permissions().delete(
fileId=file_id,
permissionId=permission["id"],
supportsAllDrives=True,
).execute()
break
@staticmethod
def make_file_public(drive_service: Any, file_id: str) -> None:
permission = {"type": "anyone", "role": "reader"}
drive_service.permissions().create(
fileId=file_id, body=permission, supportsAllDrives=True
).execute()
@staticmethod
def cleanup_drive(drive_service: Any, drive_id: str) -> None:
try:
# Delete up to 2 files that match our pattern
file_name_prefix = f"perm_sync_doc_{drive_id}"
files = (
drive_service.files()
.list(
q=f"name contains '{file_name_prefix}'",
driveId=drive_id,
includeItemsFromAllDrives=True,
supportsAllDrives=True,
corpora="drive",
fields="files(id)",
)
.execute()
)
for file in files.get("files", []):
drive_service.files().delete(
fileId=file["id"], supportsAllDrives=True
).execute()
# Then delete the drive
drive_service.drives().delete(driveId=drive_id).execute()
except Exception as e:
print(f"Error cleaning up drive {drive_id}: {e}")

View File

@@ -0,0 +1,332 @@
import json
import os
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from uuid import uuid4
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.google_utils.resources import GoogleDriveService
from onyx.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from onyx.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from onyx.connectors.models import InputType
from onyx.db.enums import AccessType
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.connector import ConnectorManager
from tests.integration.common_utils.managers.credential import CredentialManager
from tests.integration.common_utils.managers.document_search import (
DocumentSearchManager,
)
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestCCPair
from tests.integration.common_utils.test_models import DATestConnector
from tests.integration.common_utils.test_models import DATestCredential
from tests.integration.common_utils.test_models import DATestUser
from tests.integration.common_utils.vespa import vespa_fixture
from tests.integration.connector_job_tests.google.google_drive_api_utils import (
GoogleDriveManager,
)
@pytest.fixture()
def google_drive_test_env_setup() -> (
Generator[
tuple[
GoogleDriveService, str, DATestCCPair, DATestUser, DATestUser, DATestUser
],
None,
None,
]
):
# Creating an admin user (first user created is automatically an admin)
admin_user: DATestUser = UserManager.create(email="admin@onyx-test.com")
# Creating a non-admin user
test_user_1: DATestUser = UserManager.create(email="test_user_1@onyx-test.com")
# Creating a non-admin user
test_user_2: DATestUser = UserManager.create(email="test_user_2@onyx-test.com")
service_account_key = os.environ["FULL_CONTROL_DRIVE_SERVICE_ACCOUNT"]
drive_id: str | None = None
try:
credentials = {
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: admin_user.email,
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: service_account_key,
}
# Setup Google Drive
drive_service = GoogleDriveManager.create_impersonated_drive_service(
json.loads(service_account_key), admin_user.email
)
test_id = str(uuid4())
drive_id = GoogleDriveManager.create_shared_drive(
drive_service, admin_user.email, test_id
)
# Setup Onyx infrastructure
LLMProviderManager.create(user_performing_action=admin_user)
before = datetime.now(timezone.utc)
credential: DATestCredential = CredentialManager.create(
source=DocumentSource.GOOGLE_DRIVE,
credential_json=credentials,
user_performing_action=admin_user,
)
connector: DATestConnector = ConnectorManager.create(
name="Google Drive Test",
input_type=InputType.POLL,
source=DocumentSource.GOOGLE_DRIVE,
connector_specific_config={
"shared_drive_urls": f"https://drive.google.com/drive/folders/{drive_id}"
},
access_type=AccessType.SYNC,
user_performing_action=admin_user,
)
cc_pair: DATestCCPair = CCPairManager.create(
credential_id=credential.id,
connector_id=connector.id,
access_type=AccessType.SYNC,
user_performing_action=admin_user,
)
CCPairManager.wait_for_indexing_completion(
cc_pair=cc_pair, after=before, user_performing_action=admin_user
)
yield drive_service, drive_id, cc_pair, admin_user, test_user_1, test_user_2
except json.JSONDecodeError:
pytest.skip("FULL_CONTROL_DRIVE_SERVICE_ACCOUNT is not valid JSON")
finally:
# Cleanup drive and file
if drive_id is not None:
GoogleDriveManager.cleanup_drive(drive_service, drive_id)
@pytest.mark.xfail(reason="Needs to be tested for flakiness")
def test_google_permission_sync(
reset: None,
vespa_client: vespa_fixture,
google_drive_test_env_setup: tuple[
GoogleDriveService, str, DATestCCPair, DATestUser, DATestUser, DATestUser
],
) -> None:
(
drive_service,
drive_id,
cc_pair,
admin_user,
test_user_1,
test_user_2,
) = google_drive_test_env_setup
# ----------------------BASELINE TEST----------------------
before = datetime.now(timezone.utc)
# Create empty test doc in drive
doc_id_1 = GoogleDriveManager.create_empty_doc(drive_service, drive_id)
# Append text to doc
doc_text_1 = "The secret number is 12345"
GoogleDriveManager.append_text_to_doc(drive_service, doc_id_1, doc_text_1)
# run indexing
CCPairManager.run_once(cc_pair, admin_user)
CCPairManager.wait_for_indexing_completion(
cc_pair=cc_pair, after=before, user_performing_action=admin_user
)
# run permission sync
CCPairManager.sync(
cc_pair=cc_pair,
user_performing_action=admin_user,
)
CCPairManager.wait_for_sync(
cc_pair=cc_pair,
after=before,
number_of_updated_docs=1,
user_performing_action=admin_user,
)
# Verify admin has access to document
admin_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=admin_user
)
assert doc_text_1 in [result.strip("\ufeff") for result in admin_results]
# Verify test_user_1 cannot access document
user1_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=test_user_1
)
assert doc_text_1 not in [result.strip("\ufeff") for result in user1_results]
# ----------------------GRANT USER 1 DOC PERMISSIONS TEST--------------------------
before = datetime.now(timezone.utc)
# Grant user 1 access to document 1
GoogleDriveManager.update_file_permissions(
drive_service=drive_service,
file_id=doc_id_1,
email=test_user_1.email,
role="reader",
)
# Create a second doc in the drive which user 1 should not have access to
doc_id_2 = GoogleDriveManager.create_empty_doc(drive_service, drive_id)
doc_text_2 = "The secret number is 67890"
GoogleDriveManager.append_text_to_doc(drive_service, doc_id_2, doc_text_2)
# Run indexing
CCPairManager.run_once(cc_pair, admin_user)
CCPairManager.wait_for_indexing_completion(
cc_pair=cc_pair,
after=before,
user_performing_action=admin_user,
)
# Run permission sync
CCPairManager.sync(
cc_pair=cc_pair,
user_performing_action=admin_user,
)
CCPairManager.wait_for_sync(
cc_pair=cc_pair,
after=before,
number_of_updated_docs=1,
user_performing_action=admin_user,
)
# Verify admin can access both documents
admin_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=admin_user
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in admin_results
}
# Verify user 1 can access document 1
user1_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=test_user_1
)
assert doc_text_1 in [result.strip("\ufeff") for result in user1_results]
# Verify user 1 cannot access document 2
user1_results_2 = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=test_user_1
)
assert doc_text_2 not in [result.strip("\ufeff") for result in user1_results_2]
# ----------------------REMOVE USER 1 DOC PERMISSIONS TEST--------------------------
before = datetime.now(timezone.utc)
# Remove user 1 access to document 1
GoogleDriveManager.remove_file_permissions(
drive_service=drive_service, file_id=doc_id_1, email=test_user_1.email
)
# Run permission sync
CCPairManager.sync(
cc_pair=cc_pair,
user_performing_action=admin_user,
)
CCPairManager.wait_for_sync(
cc_pair=cc_pair,
after=before,
number_of_updated_docs=1,
user_performing_action=admin_user,
)
# Verify admin can access both documents
admin_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=admin_user
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in admin_results
}
# Verify user 1 cannot access either document
user1_results = DocumentSearchManager.search_documents(
query="secret numbers", user_performing_action=test_user_1
)
assert {result.strip("\ufeff") for result in user1_results} == set()
# ----------------------GRANT USER 1 DRIVE PERMISSIONS TEST--------------------------
before = datetime.now(timezone.utc)
# Grant user 1 access to drive
GoogleDriveManager.update_file_permissions(
drive_service=drive_service,
file_id=drive_id,
email=test_user_1.email,
role="reader",
)
# Run permission sync
CCPairManager.sync(
cc_pair=cc_pair,
user_performing_action=admin_user,
)
CCPairManager.wait_for_sync(
cc_pair=cc_pair,
after=before,
number_of_updated_docs=2,
user_performing_action=admin_user,
# if we are only updating the group definition for this test we use this varaiable,
# since it doesn't result in a vespa sync so we don't want to wait for it
should_wait_for_vespa_sync=False,
)
# Verify user 1 can access both documents
user1_results = DocumentSearchManager.search_documents(
query="secret numbers", user_performing_action=test_user_1
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in user1_results
}
# ----------------------MAKE DRIVE PUBLIC TEST--------------------------
before = datetime.now(timezone.utc)
# Unable to make drive itself public as Google's security policies prevent this, so we make the documents public instead
GoogleDriveManager.make_file_public(drive_service, doc_id_1)
GoogleDriveManager.make_file_public(drive_service, doc_id_2)
# Run permission sync
CCPairManager.sync(
cc_pair=cc_pair,
user_performing_action=admin_user,
)
CCPairManager.wait_for_sync(
cc_pair=cc_pair,
after=before,
number_of_updated_docs=2,
user_performing_action=admin_user,
)
# Verify all users can access both documents
admin_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=admin_user
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in admin_results
}
user1_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=test_user_1
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in user1_results
}
user2_results = DocumentSearchManager.search_documents(
query="secret number", user_performing_action=test_user_2
)
assert {doc_text_1, doc_text_2} == {
result.strip("\ufeff") for result in user2_results
}