gmail refactor + permission syncing (#3021)

* initial frontend changes and shared google refactoring

* gmail connector is reworked

* added permission syncing for gmail

* tested!

* Added tests for gmail connector

* fixed tests and mypy

* temp fix

* testing done!

* rename

* test fixes maybe?

* removed irrelevant tests

* anotha one

* refactoring changes

* refactor finished

* maybe these fixes work

* dumps

* final fixes
This commit is contained in:
hagen-danswer 2024-11-04 10:06:23 -08:00 committed by GitHub
parent 8e55566f66
commit 2cd1e6be00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1655 additions and 1121 deletions

View File

@ -21,6 +21,8 @@ env:
# Google
GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON_STR: ${{ secrets.GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON_STR }}
GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR: ${{ secrets.GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR }}
GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR: ${{ secrets.GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR }}
GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR: ${{ secrets.GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR }}
jobs:
connectors-check:

View File

@ -1,283 +1,360 @@
import re
import time
from base64 import urlsafe_b64decode
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
from typing import Dict
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from googleapiclient import discovery # type: ignore
from googleapiclient.errors import HttpError # type: ignore
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from danswer.connectors.gmail.connector_auth import (
get_gmail_creds_for_authorized_user,
)
from danswer.connectors.gmail.connector_auth import (
get_gmail_creds_for_service_account,
)
from danswer.connectors.gmail.constants import (
DB_CREDENTIALS_DICT_DELEGATED_USER_KEY,
)
from danswer.connectors.gmail.constants import DB_CREDENTIALS_DICT_TOKEN_KEY
from danswer.connectors.gmail.constants import (
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
from danswer.connectors.google_utils.google_auth import get_google_creds
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.google_utils.resources import get_admin_service
from danswer.connectors.google_utils.resources import get_gmail_service
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from danswer.connectors.google_utils.shared_constants import MISSING_SCOPES_ERROR_STR
from danswer.connectors.google_utils.shared_constants import ONYX_SCOPE_INSTRUCTIONS
from danswer.connectors.google_utils.shared_constants import SLIM_BATCH_SIZE
from danswer.connectors.google_utils.shared_constants import USER_FIELDS
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import GenerateSlimDocumentOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.interfaces import SlimConnector
from danswer.connectors.models import BasicExpertInfo
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.models import SlimDocument
from danswer.utils.logger import setup_logger
from danswer.utils.retry_wrapper import retry_builder
logger = setup_logger()
# This is for the initial list call to get the thread ids
THREAD_LIST_FIELDS = "nextPageToken, threads(id)"
def _execute_with_retry(request: Any) -> Any:
max_attempts = 10
attempt = 0
# These are the fields to retrieve using the ID from the initial list call
PARTS_FIELDS = "parts(body(data), mimeType)"
PAYLOAD_FIELDS = f"payload(headers, {PARTS_FIELDS})"
MESSAGES_FIELDS = f"messages(id, {PAYLOAD_FIELDS})"
THREADS_FIELDS = f"threads(id, {MESSAGES_FIELDS})"
THREAD_FIELDS = f"id, {MESSAGES_FIELDS}"
while attempt < max_attempts:
# Note for reasons unknown, the Google API will sometimes return a 429
# and even after waiting the retry period, it will return another 429.
# It could be due to a few possibilities:
# 1. Other things are also requesting from the Gmail API with the same key
# 2. It's a rolling rate limit so the moment we get some amount of requests cleared, we hit it again very quickly
# 3. The retry-after has a maximum and we've already hit the limit for the day
# or it's something else...
try:
return request.execute()
except HttpError as error:
attempt += 1
EMAIL_FIELDS = [
"cc",
"bcc",
"from",
"to",
]
if error.resp.status == 429:
# Attempt to get 'Retry-After' from headers
retry_after = error.resp.get("Retry-After")
if retry_after:
sleep_time = int(retry_after)
else:
# Extract 'Retry after' timestamp from error message
match = re.search(
r"Retry after (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)",
str(error),
add_retries = retry_builder(tries=50, max_delay=30)
def _build_time_range_query(
time_range_start: SecondsSinceUnixEpoch | None = None,
time_range_end: SecondsSinceUnixEpoch | None = None,
) -> str | None:
query = ""
if time_range_start is not None and time_range_start != 0:
query += f"after:{int(time_range_start)}"
if time_range_end is not None and time_range_end != 0:
query += f" before:{int(time_range_end)}"
query = query.strip()
if len(query) == 0:
return None
return query
def _clean_email_and_extract_name(email: str) -> tuple[str, str | None]:
email = email.strip()
if "<" in email and ">" in email:
# Handle format: "Display Name <email@domain.com>"
display_name = email[: email.find("<")].strip()
email_address = email[email.find("<") + 1 : email.find(">")].strip()
return email_address, display_name if display_name else None
else:
# Handle plain email address
return email.strip(), None
def _get_owners_from_emails(emails: dict[str, str | None]) -> list[BasicExpertInfo]:
owners = []
for email, names in emails.items():
if names:
name_parts = names.split(" ")
first_name = " ".join(name_parts[:-1])
last_name = name_parts[-1]
else:
first_name = None
last_name = None
owners.append(
BasicExpertInfo(email=email, first_name=first_name, last_name=last_name)
)
return owners
def _get_message_body(payload: dict[str, Any]) -> str:
parts = payload.get("parts", [])
message_body = ""
for part in parts:
mime_type = part.get("mimeType")
body = part.get("body")
if mime_type == "text/plain" and body:
data = body.get("data", "")
text = urlsafe_b64decode(data).decode()
message_body += text
return message_body
def message_to_section(message: Dict[str, Any]) -> tuple[Section, dict[str, str]]:
link = f"https://mail.google.com/mail/u/0/#inbox/{message['id']}"
payload = message.get("payload", {})
headers = payload.get("headers", [])
metadata: dict[str, Any] = {}
for header in headers:
name = header.get("name").lower()
value = header.get("value")
if name in EMAIL_FIELDS:
metadata[name] = value
if name == "subject":
metadata["subject"] = value
if name == "date":
metadata["updated_at"] = value
if labels := message.get("labelIds"):
metadata["labels"] = labels
message_data = ""
for name, value in metadata.items():
# updated at isnt super useful for the llm
if name != "updated_at":
message_data += f"{name}: {value}\n"
message_body_text: str = _get_message_body(payload)
return Section(link=link, text=message_body_text + message_data), metadata
def thread_to_document(full_thread: Dict[str, Any]) -> Document | None:
all_messages = full_thread.get("messages", [])
if not all_messages:
return None
sections = []
semantic_identifier = ""
updated_at = None
from_emails: dict[str, str | None] = {}
other_emails: dict[str, str | None] = {}
for message in all_messages:
section, message_metadata = message_to_section(message)
sections.append(section)
for name, value in message_metadata.items():
if name in EMAIL_FIELDS:
email, display_name = _clean_email_and_extract_name(value)
if name == "from":
from_emails[email] = (
display_name if not from_emails.get(email) else None
)
else:
other_emails[email] = (
display_name if not other_emails.get(email) else None
)
if match:
retry_after_timestamp = match.group(1)
retry_after_dt = datetime.strptime(
retry_after_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)
sleep_time = max(
int((retry_after_dt - current_time).total_seconds()),
0,
)
else:
logger.error(
f"No Retry-After header or timestamp found in error message: {error}"
)
sleep_time = 60
sleep_time += 3 # Add a buffer to be safe
# If we haven't set the semantic identifier yet, set it to the subject of the first message
if not semantic_identifier:
semantic_identifier = message_metadata.get("subject", "")
logger.info(
f"Rate limit exceeded. Attempt {attempt}/{max_attempts}. Sleeping for {sleep_time} seconds."
)
time.sleep(sleep_time)
if message_metadata.get("updated_at"):
updated_at = message_metadata.get("updated_at")
else:
raise
updated_at_datetime = None
if updated_at:
updated_at_datetime = time_str_to_utc(updated_at)
# If we've exhausted all attempts
raise Exception(f"Failed to execute request after {max_attempts} attempts")
id = full_thread.get("id")
if not id:
raise ValueError("Thread ID is required")
primary_owners = _get_owners_from_emails(from_emails)
secondary_owners = _get_owners_from_emails(other_emails)
return Document(
id=id,
semantic_identifier=semantic_identifier,
sections=sections,
source=DocumentSource.GMAIL,
# This is used to perform permission sync
primary_owners=primary_owners,
secondary_owners=secondary_owners,
doc_updated_at=updated_at_datetime,
# Not adding emails to metadata because it's already in the sections
metadata={},
)
class GmailConnector(LoadConnector, PollConnector):
class GmailConnector(LoadConnector, PollConnector, SlimConnector):
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
self.batch_size = batch_size
self.creds: OAuthCredentials | ServiceAccountCredentials | None = None
self._creds: OAuthCredentials | ServiceAccountCredentials | None = None
self._primary_admin_email: str | None = None
@property
def primary_admin_email(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
)
return self._primary_admin_email
@property
def google_domain(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
)
return self._primary_admin_email.split("@")[-1]
@property
def creds(self) -> OAuthCredentials | ServiceAccountCredentials:
if self._creds is None:
raise RuntimeError(
"Creds missing, "
"should not call this property "
"before calling load_credentials"
)
return self._creds
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, str] | None:
"""Checks for two different types of credentials.
(1) A credential which holds a token acquired via a user going thorugh
the Google OAuth flow.
(2) A credential which holds a service account key JSON file, which
can then be used to impersonate any user in the workspace.
"""
creds: OAuthCredentials | ServiceAccountCredentials | None = None
new_creds_dict = None
if DB_CREDENTIALS_DICT_TOKEN_KEY in credentials:
access_token_json_str = cast(
str, credentials[DB_CREDENTIALS_DICT_TOKEN_KEY]
)
creds = get_gmail_creds_for_authorized_user(
token_json_str=access_token_json_str
)
primary_admin_email = credentials[DB_CREDENTIALS_PRIMARY_ADMIN_KEY]
self._primary_admin_email = primary_admin_email
# tell caller to update token stored in DB if it has changed
# (e.g. the token has been refreshed)
new_creds_json_str = creds.to_json() if creds else ""
if new_creds_json_str != access_token_json_str:
new_creds_dict = {DB_CREDENTIALS_DICT_TOKEN_KEY: new_creds_json_str}
if GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY in credentials:
service_account_key_json_str = credentials[
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY
]
creds = get_gmail_creds_for_service_account(
service_account_key_json_str=service_account_key_json_str
)
# "Impersonate" a user if one is specified
delegated_user_email = cast(
str | None, credentials.get(DB_CREDENTIALS_DICT_DELEGATED_USER_KEY)
)
if delegated_user_email:
creds = creds.with_subject(delegated_user_email) if creds else None # type: ignore
if creds is None:
raise PermissionError(
"Unable to access Gmail - unknown credential structure."
)
self.creds = creds
self._creds, new_creds_dict = get_google_creds(
credentials=credentials,
source=DocumentSource.GMAIL,
)
return new_creds_dict
def _get_email_body(self, payload: dict[str, Any]) -> str:
parts = payload.get("parts", [])
email_body = ""
for part in parts:
mime_type = part.get("mimeType")
body = part.get("body")
if mime_type == "text/plain":
data = body.get("data", "")
text = urlsafe_b64decode(data).decode()
email_body += text
return email_body
def _get_all_user_emails(self) -> list[str]:
admin_service = get_admin_service(self.creds, self.primary_admin_email)
emails = []
for user in execute_paginated_retrieval(
retrieval_function=admin_service.users().list,
list_key="users",
fields=USER_FIELDS,
domain=self.google_domain,
):
if email := user.get("primaryEmail"):
emails.append(email)
return emails
def _email_to_document(self, full_email: Dict[str, Any]) -> Document:
email_id = full_email["id"]
payload = full_email["payload"]
headers = payload.get("headers")
labels = full_email.get("labelIds", [])
metadata = {}
if headers:
for header in headers:
name = header.get("name").lower()
value = header.get("value")
if name in ["from", "to", "subject", "date", "cc", "bcc"]:
metadata[name] = value
email_data = ""
for name, value in metadata.items():
email_data += f"{name}: {value}\n"
metadata["labels"] = labels
logger.debug(f"{email_data}")
email_body_text: str = self._get_email_body(payload)
date_str = metadata.get("date")
email_updated_at = time_str_to_utc(date_str) if date_str else None
link = f"https://mail.google.com/mail/u/0/#inbox/{email_id}"
return Document(
id=email_id,
sections=[Section(link=link, text=email_data + email_body_text)],
source=DocumentSource.GMAIL,
title=metadata.get("subject"),
semantic_identifier=metadata.get("subject", "Untitled Email"),
doc_updated_at=email_updated_at,
metadata=metadata,
)
@staticmethod
def _build_time_range_query(
time_range_start: SecondsSinceUnixEpoch | None = None,
time_range_end: SecondsSinceUnixEpoch | None = None,
) -> str | None:
query = ""
if time_range_start is not None and time_range_start != 0:
query += f"after:{int(time_range_start)}"
if time_range_end is not None and time_range_end != 0:
query += f" before:{int(time_range_end)}"
query = query.strip()
if len(query) == 0:
return None
return query
def _fetch_mails_from_gmail(
def _fetch_threads(
self,
time_range_start: SecondsSinceUnixEpoch | None = None,
time_range_end: SecondsSinceUnixEpoch | None = None,
) -> GenerateDocumentsOutput:
if self.creds is None:
raise PermissionError("Not logged into Gmail")
page_token = ""
query = GmailConnector._build_time_range_query(time_range_start, time_range_end)
service = discovery.build("gmail", "v1", credentials=self.creds)
while page_token is not None:
result = _execute_with_retry(
service.users()
.messages()
.list(
userId="me",
pageToken=page_token,
q=query,
maxResults=self.batch_size,
)
)
page_token = result.get("nextPageToken")
messages = result.get("messages", [])
doc_batch = []
for message in messages:
message_id = message["id"]
msg = _execute_with_retry(
service.users()
.messages()
.get(userId="me", id=message_id, format="full")
)
doc = self._email_to_document(msg)
query = _build_time_range_query(time_range_start, time_range_end)
doc_batch = []
for user_email in self._get_all_user_emails():
gmail_service = get_gmail_service(self.creds, user_email)
for thread in execute_paginated_retrieval(
retrieval_function=gmail_service.users().threads().list,
list_key="threads",
userId=user_email,
fields=THREAD_LIST_FIELDS,
q=query,
):
full_thread = add_retries(
lambda: gmail_service.users()
.threads()
.get(
userId=user_email,
id=thread["id"],
fields=THREAD_FIELDS,
)
.execute()
)()
doc = thread_to_document(full_thread)
if doc is None:
continue
doc_batch.append(doc)
if len(doc_batch) > 0:
yield doc_batch
if len(doc_batch) > self.batch_size:
yield doc_batch
doc_batch = []
if doc_batch:
yield doc_batch
def _fetch_slim_threads(
self,
time_range_start: SecondsSinceUnixEpoch | None = None,
time_range_end: SecondsSinceUnixEpoch | None = None,
) -> GenerateSlimDocumentOutput:
query = _build_time_range_query(time_range_start, time_range_end)
doc_batch = []
for user_email in self._get_all_user_emails():
gmail_service = get_gmail_service(self.creds, user_email)
for thread in execute_paginated_retrieval(
retrieval_function=gmail_service.users().threads().list,
list_key="threads",
userId=user_email,
fields=THREAD_LIST_FIELDS,
q=query,
):
doc_batch.append(
SlimDocument(
id=thread["id"],
perm_sync_data={"user_email": user_email},
)
)
if len(doc_batch) > SLIM_BATCH_SIZE:
yield doc_batch
doc_batch = []
if doc_batch:
yield doc_batch
def load_from_state(self) -> GenerateDocumentsOutput:
yield from self._fetch_mails_from_gmail()
try:
yield from self._fetch_threads()
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
yield from self._fetch_mails_from_gmail(start, end)
try:
yield from self._fetch_threads(start, end)
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
def retrieve_all_slim_documents(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateSlimDocumentOutput:
try:
yield from self._fetch_slim_threads(start, end)
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
if __name__ == "__main__":
import json
import os
service_account_json_path = os.environ.get("GOOGLE_SERVICE_ACCOUNT_KEY_JSON_PATH")
if not service_account_json_path:
raise ValueError(
"Please set GOOGLE_SERVICE_ACCOUNT_KEY_JSON_PATH environment variable"
)
with open(service_account_json_path) as f:
creds = json.load(f)
credentials_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: json.dumps(creds),
}
delegated_user = os.environ.get("GMAIL_DELEGATED_USER")
if delegated_user:
credentials_dict[DB_CREDENTIALS_DICT_DELEGATED_USER_KEY] = delegated_user
connector = GmailConnector()
connector.load_credentials(
json.loads(credentials_dict[DB_CREDENTIALS_DICT_TOKEN_KEY])
)
document_batch_generator = connector.load_from_state()
for document_batch in document_batch_generator:
print(document_batch)
break
pass

View File

@ -1,197 +0,0 @@
import json
from typing import cast
from urllib.parse import parse_qs
from urllib.parse import ParseResult
from urllib.parse import urlparse
from google.auth.transport.requests import Request # type: ignore
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from google_auth_oauthlib.flow import InstalledAppFlow # type: ignore
from sqlalchemy.orm import Session
from danswer.configs.app_configs import WEB_DOMAIN
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import KV_CRED_KEY
from danswer.configs.constants import KV_GMAIL_CRED_KEY
from danswer.configs.constants import KV_GMAIL_SERVICE_ACCOUNT_KEY
from danswer.connectors.gmail.constants import (
DB_CREDENTIALS_DICT_DELEGATED_USER_KEY,
)
from danswer.connectors.gmail.constants import DB_CREDENTIALS_DICT_TOKEN_KEY
from danswer.connectors.gmail.constants import (
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.connectors.gmail.constants import SCOPES
from danswer.db.credentials import update_credential_json
from danswer.db.models import User
from danswer.key_value_store.factory import get_kv_store
from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey
from danswer.utils.logger import setup_logger
logger = setup_logger()
def _build_frontend_gmail_redirect() -> str:
return f"{WEB_DOMAIN}/admin/connectors/gmail/auth/callback"
def get_gmail_creds_for_authorized_user(
token_json_str: str,
) -> OAuthCredentials | None:
creds_json = json.loads(token_json_str)
creds = OAuthCredentials.from_authorized_user_info(creds_json, SCOPES)
if creds.valid:
return creds
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
if creds.valid:
logger.notice("Refreshed Gmail tokens.")
return creds
except Exception as e:
logger.exception(f"Failed to refresh gmail access token due to: {e}")
return None
return None
def get_gmail_creds_for_service_account(
service_account_key_json_str: str,
) -> ServiceAccountCredentials | None:
service_account_key = json.loads(service_account_key_json_str)
creds = ServiceAccountCredentials.from_service_account_info(
service_account_key, scopes=SCOPES
)
if not creds.valid or not creds.expired:
creds.refresh(Request())
return creds if creds.valid else None
def verify_csrf(credential_id: int, state: str) -> None:
csrf = get_kv_store().load(KV_CRED_KEY.format(str(credential_id)))
if csrf != state:
raise PermissionError(
"State from Gmail Connector callback does not match expected"
)
def get_gmail_auth_url(credential_id: int) -> str:
creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config(
credential_json,
scopes=SCOPES,
redirect_uri=_build_frontend_gmail_redirect(),
)
auth_url, _ = flow.authorization_url(prompt="consent")
parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query)
get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore
return str(auth_url)
def get_auth_url(credential_id: int) -> str:
creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config(
credential_json,
scopes=SCOPES,
redirect_uri=_build_frontend_gmail_redirect(),
)
auth_url, _ = flow.authorization_url(prompt="consent")
parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query)
get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore
return str(auth_url)
def update_gmail_credential_access_tokens(
auth_code: str,
credential_id: int,
user: User,
db_session: Session,
) -> OAuthCredentials | None:
app_credentials = get_google_app_gmail_cred()
flow = InstalledAppFlow.from_client_config(
app_credentials.model_dump(),
scopes=SCOPES,
redirect_uri=_build_frontend_gmail_redirect(),
)
flow.fetch_token(code=auth_code)
creds = flow.credentials
token_json_str = creds.to_json()
new_creds_dict = {DB_CREDENTIALS_DICT_TOKEN_KEY: token_json_str}
if not update_credential_json(credential_id, new_creds_dict, user, db_session):
return None
return creds
def build_service_account_creds(
delegated_user_email: str | None = None,
) -> CredentialBase:
service_account_key = get_gmail_service_account_key()
credential_dict = {
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: service_account_key.json(),
}
if delegated_user_email:
credential_dict[DB_CREDENTIALS_DICT_DELEGATED_USER_KEY] = delegated_user_email
return CredentialBase(
source=DocumentSource.GMAIL,
credential_json=credential_dict,
admin_public=True,
)
def get_google_app_gmail_cred() -> GoogleAppCredentials:
creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
return GoogleAppCredentials(**json.loads(creds_str))
def upsert_google_app_gmail_cred(app_credentials: GoogleAppCredentials) -> None:
get_kv_store().store(KV_GMAIL_CRED_KEY, app_credentials.json(), encrypt=True)
def delete_google_app_gmail_cred() -> None:
get_kv_store().delete(KV_GMAIL_CRED_KEY)
def get_gmail_service_account_key() -> GoogleServiceAccountKey:
creds_str = str(get_kv_store().load(KV_GMAIL_SERVICE_ACCOUNT_KEY))
return GoogleServiceAccountKey(**json.loads(creds_str))
def upsert_gmail_service_account_key(
service_account_key: GoogleServiceAccountKey,
) -> None:
get_kv_store().store(
KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
)
def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None:
get_kv_store().store(
KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
)
def delete_gmail_service_account_key() -> None:
get_kv_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY)
def delete_service_account_key() -> None:
get_kv_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY)

View File

@ -1,4 +0,0 @@
DB_CREDENTIALS_DICT_TOKEN_KEY = "gmail_tokens"
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY = "gmail_service_account_key"
DB_CREDENTIALS_DICT_DELEGATED_USER_KEY = "gmail_delegated_user"
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]

View File

@ -5,26 +5,27 @@ from google.oauth2.credentials import Credentials as OAuthCredentials # type: i
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.connectors.google_drive.connector_auth import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from danswer.connectors.google_drive.connector_auth import get_google_drive_creds
from danswer.connectors.google_drive.constants import MISSING_SCOPES_ERROR_STR
from danswer.connectors.google_drive.constants import ONYX_SCOPE_INSTRUCTIONS
from danswer.connectors.google_drive.constants import SCOPE_DOC_URL
from danswer.connectors.google_drive.constants import SLIM_BATCH_SIZE
from danswer.connectors.google_drive.constants import USER_FIELDS
from danswer.configs.constants import DocumentSource
from danswer.connectors.google_drive.doc_conversion import (
convert_drive_item_to_document,
)
from danswer.connectors.google_drive.file_retrieval import crawl_folders_for_files
from danswer.connectors.google_drive.file_retrieval import get_files_in_my_drive
from danswer.connectors.google_drive.file_retrieval import get_files_in_shared_drive
from danswer.connectors.google_drive.google_utils import execute_paginated_retrieval
from danswer.connectors.google_drive.models import GoogleDriveFileType
from danswer.connectors.google_drive.resources import get_admin_service
from danswer.connectors.google_drive.resources import get_drive_service
from danswer.connectors.google_drive.resources import get_google_docs_service
from danswer.connectors.google_utils.google_auth import get_google_creds
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.google_utils.resources import get_admin_service
from danswer.connectors.google_utils.resources import get_drive_service
from danswer.connectors.google_utils.resources import get_google_docs_service
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from danswer.connectors.google_utils.shared_constants import MISSING_SCOPES_ERROR_STR
from danswer.connectors.google_utils.shared_constants import ONYX_SCOPE_INSTRUCTIONS
from danswer.connectors.google_utils.shared_constants import SCOPE_DOC_URL
from danswer.connectors.google_utils.shared_constants import SLIM_BATCH_SIZE
from danswer.connectors.google_utils.shared_constants import USER_FIELDS
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import GenerateSlimDocumentOutput
from danswer.connectors.interfaces import LoadConnector
@ -105,7 +106,6 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
self.shared_folder_ids = _extract_ids_from_urls(shared_folder_url_list)
self._primary_admin_email: str | None = None
self.google_domain: str | None = None
self._creds: OAuthCredentials | ServiceAccountCredentials | None = None
@ -121,6 +121,16 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
)
return self._primary_admin_email
@property
def google_domain(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
)
return self._primary_admin_email.split("@")[-1]
@property
def creds(self) -> OAuthCredentials | ServiceAccountCredentials:
if self._creds is None:
@ -136,10 +146,12 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, str] | None:
primary_admin_email = credentials[DB_CREDENTIALS_PRIMARY_ADMIN_KEY]
self.google_domain = primary_admin_email.split("@")[1]
self._primary_admin_email = primary_admin_email
self._creds, new_creds_dict = get_google_drive_creds(credentials)
self._creds, new_creds_dict = get_google_creds(
credentials=credentials,
source=DocumentSource.GOOGLE_DRIVE,
)
return new_creds_dict
def _get_all_user_emails(self) -> list[str]:

View File

@ -1,238 +0,0 @@
import json
from typing import cast
from urllib.parse import parse_qs
from urllib.parse import ParseResult
from urllib.parse import urlparse
from google.auth.transport.requests import Request # type: ignore
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from google_auth_oauthlib.flow import InstalledAppFlow # type: ignore
from googleapiclient.discovery import build # type: ignore
from sqlalchemy.orm import Session
from danswer.configs.app_configs import WEB_DOMAIN
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import KV_CRED_KEY
from danswer.configs.constants import KV_GOOGLE_DRIVE_CRED_KEY
from danswer.configs.constants import KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY
from danswer.connectors.google_drive.constants import MISSING_SCOPES_ERROR_STR
from danswer.connectors.google_drive.constants import ONYX_SCOPE_INSTRUCTIONS
from danswer.db.credentials import update_credential_json
from danswer.db.models import User
from danswer.key_value_store.factory import get_kv_store
from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey
from danswer.utils.logger import setup_logger
logger = setup_logger()
# NOTE: do not need https://www.googleapis.com/auth/documents.readonly
# this is counted under `/auth/drive.readonly`
GOOGLE_DRIVE_SCOPES = [
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.user.readonly",
]
DB_CREDENTIALS_DICT_TOKEN_KEY = "google_drive_tokens"
DB_CREDENTIALS_PRIMARY_ADMIN_KEY = "google_drive_primary_admin"
def _build_frontend_google_drive_redirect() -> str:
return f"{WEB_DOMAIN}/admin/connectors/google-drive/auth/callback"
def get_google_drive_creds_for_authorized_user(
token_json_str: str, scopes: list[str]
) -> OAuthCredentials | None:
creds_json = json.loads(token_json_str)
creds = OAuthCredentials.from_authorized_user_info(creds_json, scopes)
if creds.valid:
return creds
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
if creds.valid:
logger.notice("Refreshed Google Drive tokens.")
return creds
except Exception as e:
logger.exception(f"Failed to refresh google drive access token due to: {e}")
return None
return None
def get_google_drive_creds(
credentials: dict[str, str], scopes: list[str] = GOOGLE_DRIVE_SCOPES
) -> tuple[ServiceAccountCredentials | OAuthCredentials, dict[str, str] | None]:
"""Checks for two different types of credentials.
(1) A credential which holds a token acquired via a user going thorough
the Google OAuth flow.
(2) A credential which holds a service account key JSON file, which
can then be used to impersonate any user in the workspace.
"""
oauth_creds = None
service_creds = None
new_creds_dict = None
if DB_CREDENTIALS_DICT_TOKEN_KEY in credentials:
access_token_json_str = cast(str, credentials[DB_CREDENTIALS_DICT_TOKEN_KEY])
oauth_creds = get_google_drive_creds_for_authorized_user(
token_json_str=access_token_json_str, scopes=scopes
)
# tell caller to update token stored in DB if it has changed
# (e.g. the token has been refreshed)
new_creds_json_str = oauth_creds.to_json() if oauth_creds else ""
if new_creds_json_str != access_token_json_str:
new_creds_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: new_creds_json_str,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: credentials[
DB_CREDENTIALS_PRIMARY_ADMIN_KEY
],
}
elif KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY in credentials:
service_account_key_json_str = credentials[KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY]
service_account_key = json.loads(service_account_key_json_str)
service_creds = ServiceAccountCredentials.from_service_account_info(
service_account_key, scopes=scopes
)
if not service_creds.valid or not service_creds.expired:
service_creds.refresh(Request())
if not service_creds.valid:
raise PermissionError(
"Unable to access Google Drive - service account credentials are invalid."
)
creds: ServiceAccountCredentials | OAuthCredentials | None = (
oauth_creds or service_creds
)
if creds is None:
raise PermissionError(
"Unable to access Google Drive - unknown credential structure."
)
return creds, new_creds_dict
def verify_csrf(credential_id: int, state: str) -> None:
csrf = get_kv_store().load(KV_CRED_KEY.format(str(credential_id)))
if csrf != state:
raise PermissionError(
"State from Google Drive Connector callback does not match expected"
)
def get_auth_url(credential_id: int) -> str:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config(
credential_json,
scopes=GOOGLE_DRIVE_SCOPES,
redirect_uri=_build_frontend_google_drive_redirect(),
)
auth_url, _ = flow.authorization_url(prompt="consent")
parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query)
get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore
return str(auth_url)
def update_credential_access_tokens(
auth_code: str,
credential_id: int,
user: User,
db_session: Session,
) -> OAuthCredentials | None:
app_credentials = get_google_app_cred()
flow = InstalledAppFlow.from_client_config(
app_credentials.model_dump(),
scopes=GOOGLE_DRIVE_SCOPES,
redirect_uri=_build_frontend_google_drive_redirect(),
)
flow.fetch_token(code=auth_code)
creds = flow.credentials
token_json_str = creds.to_json()
# Get user email from Google API so we know who
# the primary admin is for this connector
try:
admin_service = build("drive", "v3", credentials=creds)
user_info = (
admin_service.about()
.get(
fields="user(emailAddress)",
)
.execute()
)
email = user_info.get("user", {}).get("emailAddress")
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
new_creds_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: token_json_str,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: email,
}
if not update_credential_json(credential_id, new_creds_dict, user, db_session):
return None
return creds
def build_service_account_creds(
source: DocumentSource,
primary_admin_email: str | None = None,
) -> CredentialBase:
service_account_key = get_service_account_key()
credential_dict = {
KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY: service_account_key.json(),
}
if primary_admin_email:
credential_dict[DB_CREDENTIALS_PRIMARY_ADMIN_KEY] = primary_admin_email
return CredentialBase(
credential_json=credential_dict,
admin_public=True,
source=DocumentSource.GOOGLE_DRIVE,
)
def get_google_app_cred() -> GoogleAppCredentials:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
return GoogleAppCredentials(**json.loads(creds_str))
def upsert_google_app_cred(app_credentials: GoogleAppCredentials) -> None:
get_kv_store().store(KV_GOOGLE_DRIVE_CRED_KEY, app_credentials.json(), encrypt=True)
def delete_google_app_cred() -> None:
get_kv_store().delete(KV_GOOGLE_DRIVE_CRED_KEY)
def get_service_account_key() -> GoogleServiceAccountKey:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY))
return GoogleServiceAccountKey(**json.loads(creds_str))
def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None:
get_kv_store().store(
KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
)
def delete_service_account_key() -> None:
get_kv_store().delete(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY)

View File

@ -2,35 +2,3 @@ UNSUPPORTED_FILE_TYPE_CONTENT = "" # keep empty for now
DRIVE_FOLDER_TYPE = "application/vnd.google-apps.folder"
DRIVE_SHORTCUT_TYPE = "application/vnd.google-apps.shortcut"
DRIVE_FILE_TYPE = "application/vnd.google-apps.file"
FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, "
"shortcutDetails, owners(emailAddress))"
)
SLIM_FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), "
"permissionIds, webViewLink, owners(emailAddress))"
)
FOLDER_FIELDS = "nextPageToken, files(id, name, permissions, modifiedTime, webViewLink, shortcutDetails)"
USER_FIELDS = "nextPageToken, users(primaryEmail)"
# these errors don't represent a failure in the connector, but simply files
# that can't / shouldn't be indexed
ERRORS_TO_CONTINUE_ON = [
"cannotExportFile",
"exportSizeLimitExceeded",
"cannotDownloadFile",
]
# Error message substrings
MISSING_SCOPES_ERROR_STR = "client not authorized for any of the scopes requested"
# Documentation and error messages
SCOPE_DOC_URL = "https://docs.danswer.dev/connectors/google_drive/overview"
ONYX_SCOPE_INSTRUCTIONS = (
"You have upgraded Danswer without updating the Google Drive scopes. "
f"Please refer to the documentation to learn how to update the scopes: {SCOPE_DOC_URL}"
)
# Batch sizes
SLIM_BATCH_SIZE = 500

View File

@ -8,13 +8,12 @@ from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import IGNORE_FOR_QA
from danswer.connectors.google_drive.constants import DRIVE_SHORTCUT_TYPE
from danswer.connectors.google_drive.constants import ERRORS_TO_CONTINUE_ON
from danswer.connectors.google_drive.constants import UNSUPPORTED_FILE_TYPE_CONTENT
from danswer.connectors.google_drive.models import GDriveMimeType
from danswer.connectors.google_drive.models import GoogleDriveFileType
from danswer.connectors.google_drive.resources import GoogleDocsService
from danswer.connectors.google_drive.resources import GoogleDriveService
from danswer.connectors.google_drive.section_extraction import get_document_sections
from danswer.connectors.google_utils.resources import GoogleDocsService
from danswer.connectors.google_utils.resources import GoogleDriveService
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.file_processing.extract_file_text import docx_to_text
@ -26,6 +25,14 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
# these errors don't represent a failure in the connector, but simply files
# that can't / shouldn't be indexed
ERRORS_TO_CONTINUE_ON = [
"cannotExportFile",
"exportSizeLimitExceeded",
"cannotDownloadFile",
]
def _extract_sections_basic(
file: dict[str, str], service: GoogleDriveService

View File

@ -6,16 +6,23 @@ from googleapiclient.discovery import Resource # type: ignore
from danswer.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
from danswer.connectors.google_drive.constants import DRIVE_SHORTCUT_TYPE
from danswer.connectors.google_drive.constants import FILE_FIELDS
from danswer.connectors.google_drive.constants import FOLDER_FIELDS
from danswer.connectors.google_drive.constants import SLIM_FILE_FIELDS
from danswer.connectors.google_drive.google_utils import execute_paginated_retrieval
from danswer.connectors.google_drive.models import GoogleDriveFileType
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.utils.logger import setup_logger
logger = setup_logger()
FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, "
"shortcutDetails, owners(emailAddress))"
)
SLIM_FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), "
"permissionIds, webViewLink, owners(emailAddress))"
)
FOLDER_FIELDS = "nextPageToken, files(id, name, permissions, modifiedTime, webViewLink, shortcutDetails)"
def _generate_time_range_filter(
start: SecondsSinceUnixEpoch | None = None,

View File

@ -1,35 +0,0 @@
from collections.abc import Callable
from collections.abc import Iterator
from typing import Any
from danswer.connectors.google_drive.models import GoogleDriveFileType
from danswer.utils.retry_wrapper import retry_builder
# Google Drive APIs are quite flakey and may 500 for an
# extended period of time. Trying to combat here by adding a very
# long retry period (~20 minutes of trying every minute)
add_retries = retry_builder(tries=50, max_delay=30)
def execute_paginated_retrieval(
retrieval_function: Callable,
list_key: str,
**kwargs: Any,
) -> Iterator[GoogleDriveFileType]:
"""Execute a paginated retrieval from Google Drive API
Args:
retrieval_function: The specific list function to call (e.g., service.files().list)
**kwargs: Arguments to pass to the list function
"""
next_page_token = ""
while next_page_token is not None:
request_kwargs = kwargs.copy()
if next_page_token:
request_kwargs["pageToken"] = next_page_token
results = (lambda: retrieval_function(**request_kwargs).execute())()
next_page_token = results.get("nextPageToken")
for item in results.get(list_key, []):
yield item

View File

@ -2,7 +2,7 @@ from typing import Any
from pydantic import BaseModel
from danswer.connectors.google_drive.resources import GoogleDocsService
from danswer.connectors.google_utils.resources import GoogleDocsService
from danswer.connectors.models import Section

View File

@ -0,0 +1,107 @@
import json
from typing import cast
from google.auth.transport.requests import Request # type: ignore
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from danswer.configs.constants import DocumentSource
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
GOOGLE_SCOPES,
)
from danswer.utils.logger import setup_logger
logger = setup_logger()
def get_google_oauth_creds(
token_json_str: str, source: DocumentSource
) -> OAuthCredentials | None:
creds_json = json.loads(token_json_str)
creds = OAuthCredentials.from_authorized_user_info(
info=creds_json,
scopes=GOOGLE_SCOPES[source],
)
if creds.valid:
return creds
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
if creds.valid:
logger.notice("Refreshed Google Drive tokens.")
return creds
except Exception:
logger.exception("Failed to refresh google drive access token due to:")
return None
return None
def get_google_creds(
credentials: dict[str, str],
source: DocumentSource,
) -> tuple[ServiceAccountCredentials | OAuthCredentials, dict[str, str] | None]:
"""Checks for two different types of credentials.
(1) A credential which holds a token acquired via a user going thorough
the Google OAuth flow.
(2) A credential which holds a service account key JSON file, which
can then be used to impersonate any user in the workspace.
"""
oauth_creds = None
service_creds = None
new_creds_dict = None
if DB_CREDENTIALS_DICT_TOKEN_KEY in credentials:
# OAUTH
access_token_json_str = cast(str, credentials[DB_CREDENTIALS_DICT_TOKEN_KEY])
oauth_creds = get_google_oauth_creds(
token_json_str=access_token_json_str, source=source
)
# tell caller to update token stored in DB if it has changed
# (e.g. the token has been refreshed)
new_creds_json_str = oauth_creds.to_json() if oauth_creds else ""
if new_creds_json_str != access_token_json_str:
new_creds_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: new_creds_json_str,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: credentials[
DB_CREDENTIALS_PRIMARY_ADMIN_KEY
],
}
elif DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY in credentials:
# SERVICE ACCOUNT
service_account_key_json_str = credentials[
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY
]
service_account_key = json.loads(service_account_key_json_str)
service_creds = ServiceAccountCredentials.from_service_account_info(
service_account_key, scopes=GOOGLE_SCOPES[source]
)
if not service_creds.valid or not service_creds.expired:
service_creds.refresh(Request())
if not service_creds.valid:
raise PermissionError(
f"Unable to access {source} - service account credentials are invalid."
)
creds: ServiceAccountCredentials | OAuthCredentials | None = (
oauth_creds or service_creds
)
if creds is None:
raise PermissionError(
f"Unable to access {source} - unknown credential structure."
)
return creds, new_creds_dict

View File

@ -0,0 +1,207 @@
import json
from typing import cast
from urllib.parse import parse_qs
from urllib.parse import ParseResult
from urllib.parse import urlparse
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google_auth_oauthlib.flow import InstalledAppFlow # type: ignore
from googleapiclient.discovery import build # type: ignore
from sqlalchemy.orm import Session
from danswer.configs.app_configs import WEB_DOMAIN
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import KV_CRED_KEY
from danswer.configs.constants import KV_GMAIL_CRED_KEY
from danswer.configs.constants import KV_GMAIL_SERVICE_ACCOUNT_KEY
from danswer.configs.constants import KV_GOOGLE_DRIVE_CRED_KEY
from danswer.configs.constants import KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
GOOGLE_SCOPES,
)
from danswer.connectors.google_utils.shared_constants import (
MISSING_SCOPES_ERROR_STR,
)
from danswer.connectors.google_utils.shared_constants import (
ONYX_SCOPE_INSTRUCTIONS,
)
from danswer.db.credentials import update_credential_json
from danswer.db.models import User
from danswer.key_value_store.factory import get_kv_store
from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey
from danswer.utils.logger import setup_logger
logger = setup_logger()
def _build_frontend_google_drive_redirect() -> str:
return f"{WEB_DOMAIN}/admin/connectors/google-drive/auth/callback"
def verify_csrf(credential_id: int, state: str) -> None:
csrf = get_kv_store().load(KV_CRED_KEY.format(str(credential_id)))
if csrf != state:
raise PermissionError(
"State from Google Drive Connector callback does not match expected"
)
def update_credential_access_tokens(
auth_code: str,
credential_id: int,
user: User,
db_session: Session,
source: DocumentSource,
) -> OAuthCredentials | None:
app_credentials = get_google_app_cred(source)
flow = InstalledAppFlow.from_client_config(
app_credentials.model_dump(),
scopes=GOOGLE_SCOPES,
redirect_uri=_build_frontend_google_drive_redirect(),
)
flow.fetch_token(code=auth_code)
creds = flow.credentials
token_json_str = creds.to_json()
# Get user email from Google API so we know who
# the primary admin is for this connector
try:
admin_service = build("drive", "v3", credentials=creds)
user_info = (
admin_service.about()
.get(
fields="user(emailAddress)",
)
.execute()
)
email = user_info.get("user", {}).get("emailAddress")
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
new_creds_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: token_json_str,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: email,
}
if not update_credential_json(credential_id, new_creds_dict, user, db_session):
return None
return creds
def build_service_account_creds(
source: DocumentSource,
primary_admin_email: str | None = None,
) -> CredentialBase:
service_account_key = get_service_account_key(source=source)
credential_dict = {
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: service_account_key.json(),
}
if primary_admin_email:
credential_dict[DB_CREDENTIALS_PRIMARY_ADMIN_KEY] = primary_admin_email
return CredentialBase(
credential_json=credential_dict,
admin_public=True,
source=source,
)
def get_auth_url(credential_id: int) -> str:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config(
credential_json,
scopes=GOOGLE_SCOPES,
redirect_uri=_build_frontend_google_drive_redirect(),
)
auth_url, _ = flow.authorization_url(prompt="consent")
parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query)
get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore
return str(auth_url)
def get_google_app_cred(source: DocumentSource) -> GoogleAppCredentials:
if source == DocumentSource.GOOGLE_DRIVE:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
elif source == DocumentSource.GMAIL:
creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
else:
raise ValueError(f"Unsupported source: {source}")
return GoogleAppCredentials(**json.loads(creds_str))
def upsert_google_app_cred(
app_credentials: GoogleAppCredentials, source: DocumentSource
) -> None:
if source == DocumentSource.GOOGLE_DRIVE:
get_kv_store().store(
KV_GOOGLE_DRIVE_CRED_KEY, app_credentials.json(), encrypt=True
)
elif source == DocumentSource.GMAIL:
get_kv_store().store(KV_GMAIL_CRED_KEY, app_credentials.json(), encrypt=True)
else:
raise ValueError(f"Unsupported source: {source}")
def delete_google_app_cred(source: DocumentSource) -> None:
if source == DocumentSource.GOOGLE_DRIVE:
get_kv_store().delete(KV_GOOGLE_DRIVE_CRED_KEY)
elif source == DocumentSource.GMAIL:
get_kv_store().delete(KV_GMAIL_CRED_KEY)
else:
raise ValueError(f"Unsupported source: {source}")
def get_service_account_key(source: DocumentSource) -> GoogleServiceAccountKey:
if source == DocumentSource.GOOGLE_DRIVE:
creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY))
elif source == DocumentSource.GMAIL:
creds_str = str(get_kv_store().load(KV_GMAIL_SERVICE_ACCOUNT_KEY))
else:
raise ValueError(f"Unsupported source: {source}")
return GoogleServiceAccountKey(**json.loads(creds_str))
def upsert_service_account_key(
service_account_key: GoogleServiceAccountKey, source: DocumentSource
) -> None:
if source == DocumentSource.GOOGLE_DRIVE:
get_kv_store().store(
KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY,
service_account_key.json(),
encrypt=True,
)
elif source == DocumentSource.GMAIL:
get_kv_store().store(
KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
)
else:
raise ValueError(f"Unsupported source: {source}")
def delete_service_account_key(source: DocumentSource) -> None:
if source == DocumentSource.GOOGLE_DRIVE:
get_kv_store().delete(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY)
elif source == DocumentSource.GMAIL:
get_kv_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY)
else:
raise ValueError(f"Unsupported source: {source}")

View File

@ -0,0 +1,102 @@
import re
import time
from collections.abc import Callable
from collections.abc import Iterator
from datetime import datetime
from datetime import timezone
from typing import Any
from googleapiclient.errors import HttpError # type: ignore
from danswer.connectors.google_drive.models import GoogleDriveFileType
from danswer.utils.logger import setup_logger
from danswer.utils.retry_wrapper import retry_builder
logger = setup_logger()
# Google Drive APIs are quite flakey and may 500 for an
# extended period of time. Trying to combat here by adding a very
# long retry period (~20 minutes of trying every minute)
add_retries = retry_builder(tries=50, max_delay=30)
def _execute_with_retry(request: Any) -> Any:
max_attempts = 10
attempt = 0
while attempt < max_attempts:
# Note for reasons unknown, the Google API will sometimes return a 429
# and even after waiting the retry period, it will return another 429.
# It could be due to a few possibilities:
# 1. Other things are also requesting from the Gmail API with the same key
# 2. It's a rolling rate limit so the moment we get some amount of requests cleared, we hit it again very quickly
# 3. The retry-after has a maximum and we've already hit the limit for the day
# or it's something else...
try:
return request.execute()
except HttpError as error:
attempt += 1
if error.resp.status == 429:
# Attempt to get 'Retry-After' from headers
retry_after = error.resp.get("Retry-After")
if retry_after:
sleep_time = int(retry_after)
else:
# Extract 'Retry after' timestamp from error message
match = re.search(
r"Retry after (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)",
str(error),
)
if match:
retry_after_timestamp = match.group(1)
retry_after_dt = datetime.strptime(
retry_after_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)
sleep_time = max(
int((retry_after_dt - current_time).total_seconds()),
0,
)
else:
logger.error(
f"No Retry-After header or timestamp found in error message: {error}"
)
sleep_time = 60
sleep_time += 3 # Add a buffer to be safe
logger.info(
f"Rate limit exceeded. Attempt {attempt}/{max_attempts}. Sleeping for {sleep_time} seconds."
)
time.sleep(sleep_time)
else:
raise
# If we've exhausted all attempts
raise Exception(f"Failed to execute request after {max_attempts} attempts")
def execute_paginated_retrieval(
retrieval_function: Callable,
list_key: str,
**kwargs: Any,
) -> Iterator[GoogleDriveFileType]:
"""Execute a paginated retrieval from Google Drive API
Args:
retrieval_function: The specific list function to call (e.g., service.files().list)
**kwargs: Arguments to pass to the list function
"""
next_page_token = ""
while next_page_token is not None:
request_kwargs = kwargs.copy()
if next_page_token:
request_kwargs["pageToken"] = next_page_token
results = add_retries(lambda: retrieval_function(**request_kwargs).execute())()
next_page_token = results.get("nextPageToken")
for item in results.get(list_key, []):
yield item

View File

@ -16,12 +16,16 @@ class AdminService(Resource):
pass
class GmailService(Resource):
pass
def _get_google_service(
service_name: str,
service_version: str,
creds: ServiceAccountCredentials | OAuthCredentials,
user_email: str | None = None,
) -> GoogleDriveService:
) -> GoogleDriveService | GoogleDocsService | AdminService | GmailService:
if isinstance(creds, ServiceAccountCredentials):
creds = creds.with_subject(user_email)
service = build(service_name, service_version, credentials=creds)
@ -50,3 +54,10 @@ def get_admin_service(
user_email: str,
) -> AdminService:
return _get_google_service("admin", "directory_v1", creds, user_email)
def get_gmail_service(
creds: ServiceAccountCredentials | OAuthCredentials,
user_email: str,
) -> GmailService:
return _get_google_service("gmail", "v1", creds, user_email)

View File

@ -0,0 +1,40 @@
from danswer.configs.constants import DocumentSource
# NOTE: do not need https://www.googleapis.com/auth/documents.readonly
# this is counted under `/auth/drive.readonly`
GOOGLE_SCOPES = {
DocumentSource.GOOGLE_DRIVE: [
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.user.readonly",
],
DocumentSource.GMAIL: [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly",
],
}
# This is the Oauth token
DB_CREDENTIALS_DICT_TOKEN_KEY = "google_tokens"
# This is the service account key
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY = "google_service_account_key"
# The email saved for both auth types
DB_CREDENTIALS_PRIMARY_ADMIN_KEY = "google_primary_admin"
USER_FIELDS = "nextPageToken, users(primaryEmail)"
# Error message substrings
MISSING_SCOPES_ERROR_STR = "client not authorized for any of the scopes requested"
# Documentation and error messages
SCOPE_DOC_URL = "https://docs.danswer.dev/connectors/google_drive/overview"
ONYX_SCOPE_INSTRUCTIONS = (
"You have upgraded Danswer without updating the Google Drive scopes. "
f"Please refer to the documentation to learn how to update the scopes: {SCOPE_DOC_URL}"
)
# This is the maximum number of threads that can be retrieved at once
SLIM_BATCH_SIZE = 500

View File

@ -11,8 +11,8 @@ from sqlalchemy.sql.expression import or_
from danswer.auth.schemas import UserRole
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY
from danswer.connectors.gmail.constants import (
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Credential
@ -427,9 +427,7 @@ def delete_gmail_service_account_credentials(
) -> None:
credentials = fetch_credentials(db_session=db_session, user=user)
for credential in credentials:
if credential.credential_json.get(
GMAIL_DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY
):
if credential.credential_json.get(DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY):
db_session.delete(credential)
db_session.commit()

View File

@ -22,35 +22,38 @@ from danswer.background.celery.versioned_apps.primary import app as primary_app
from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import FileOrigin
from danswer.connectors.gmail.connector_auth import delete_gmail_service_account_key
from danswer.connectors.gmail.connector_auth import delete_google_app_gmail_cred
from danswer.connectors.gmail.connector_auth import get_gmail_auth_url
from danswer.connectors.gmail.connector_auth import get_gmail_service_account_key
from danswer.connectors.gmail.connector_auth import get_google_app_gmail_cred
from danswer.connectors.gmail.connector_auth import (
update_gmail_credential_access_tokens,
from danswer.connectors.google_utils.google_auth import (
get_google_oauth_creds,
)
from danswer.connectors.gmail.connector_auth import (
upsert_gmail_service_account_key,
from danswer.connectors.google_utils.google_kv import (
build_service_account_creds,
)
from danswer.connectors.gmail.connector_auth import upsert_google_app_gmail_cred
from danswer.connectors.google_drive.connector_auth import build_service_account_creds
from danswer.connectors.google_drive.connector_auth import DB_CREDENTIALS_DICT_TOKEN_KEY
from danswer.connectors.google_drive.connector_auth import delete_google_app_cred
from danswer.connectors.google_drive.connector_auth import delete_service_account_key
from danswer.connectors.google_drive.connector_auth import get_auth_url
from danswer.connectors.google_drive.connector_auth import get_google_app_cred
from danswer.connectors.google_drive.connector_auth import (
get_google_drive_creds_for_authorized_user,
from danswer.connectors.google_utils.google_kv import (
delete_google_app_cred,
)
from danswer.connectors.google_drive.connector_auth import get_service_account_key
from danswer.connectors.google_drive.connector_auth import GOOGLE_DRIVE_SCOPES
from danswer.connectors.google_drive.connector_auth import (
from danswer.connectors.google_utils.google_kv import (
delete_service_account_key,
)
from danswer.connectors.google_utils.google_kv import get_auth_url
from danswer.connectors.google_utils.google_kv import (
get_google_app_cred,
)
from danswer.connectors.google_utils.google_kv import (
get_service_account_key,
)
from danswer.connectors.google_utils.google_kv import (
update_credential_access_tokens,
)
from danswer.connectors.google_drive.connector_auth import upsert_google_app_cred
from danswer.connectors.google_drive.connector_auth import upsert_service_account_key
from danswer.connectors.google_drive.connector_auth import verify_csrf
from danswer.connectors.google_utils.google_kv import (
upsert_google_app_cred,
)
from danswer.connectors.google_utils.google_kv import (
upsert_service_account_key,
)
from danswer.connectors.google_utils.google_kv import verify_csrf
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
)
from danswer.db.connector import create_connector
from danswer.db.connector import delete_connector
from danswer.db.connector import fetch_connector_by_id
@ -125,7 +128,7 @@ def check_google_app_gmail_credentials_exist(
_: User = Depends(current_curator_or_admin_user),
) -> dict[str, str]:
try:
return {"client_id": get_google_app_gmail_cred().web.client_id}
return {"client_id": get_google_app_cred(DocumentSource.GMAIL).web.client_id}
except KvKeyNotFoundError:
raise HTTPException(status_code=404, detail="Google App Credentials not found")
@ -135,7 +138,7 @@ def upsert_google_app_gmail_credentials(
app_credentials: GoogleAppCredentials, _: User = Depends(current_admin_user)
) -> StatusResponse:
try:
upsert_google_app_gmail_cred(app_credentials)
upsert_google_app_cred(app_credentials, DocumentSource.GMAIL)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -150,7 +153,7 @@ def delete_google_app_gmail_credentials(
db_session: Session = Depends(get_session),
) -> StatusResponse:
try:
delete_google_app_gmail_cred()
delete_google_app_cred(DocumentSource.GMAIL)
cleanup_gmail_credentials(db_session=db_session)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -165,7 +168,9 @@ def check_google_app_credentials_exist(
_: User = Depends(current_curator_or_admin_user),
) -> dict[str, str]:
try:
return {"client_id": get_google_app_cred().web.client_id}
return {
"client_id": get_google_app_cred(DocumentSource.GOOGLE_DRIVE).web.client_id
}
except KvKeyNotFoundError:
raise HTTPException(status_code=404, detail="Google App Credentials not found")
@ -175,7 +180,7 @@ def upsert_google_app_credentials(
app_credentials: GoogleAppCredentials, _: User = Depends(current_admin_user)
) -> StatusResponse:
try:
upsert_google_app_cred(app_credentials)
upsert_google_app_cred(app_credentials, DocumentSource.GOOGLE_DRIVE)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -190,7 +195,7 @@ def delete_google_app_credentials(
db_session: Session = Depends(get_session),
) -> StatusResponse:
try:
delete_google_app_cred()
delete_google_app_cred(DocumentSource.GOOGLE_DRIVE)
cleanup_google_drive_credentials(db_session=db_session)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -205,7 +210,11 @@ def check_google_service_gmail_account_key_exist(
_: User = Depends(current_curator_or_admin_user),
) -> dict[str, str]:
try:
return {"service_account_email": get_gmail_service_account_key().client_email}
return {
"service_account_email": get_service_account_key(
DocumentSource.GMAIL
).client_email
}
except KvKeyNotFoundError:
raise HTTPException(
status_code=404, detail="Google Service Account Key not found"
@ -217,7 +226,7 @@ def upsert_google_service_gmail_account_key(
service_account_key: GoogleServiceAccountKey, _: User = Depends(current_admin_user)
) -> StatusResponse:
try:
upsert_gmail_service_account_key(service_account_key)
upsert_service_account_key(service_account_key, DocumentSource.GMAIL)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -232,7 +241,7 @@ def delete_google_service_gmail_account_key(
db_session: Session = Depends(get_session),
) -> StatusResponse:
try:
delete_gmail_service_account_key()
delete_service_account_key(DocumentSource.GMAIL)
cleanup_gmail_credentials(db_session=db_session)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -247,7 +256,11 @@ def check_google_service_account_key_exist(
_: User = Depends(current_curator_or_admin_user),
) -> dict[str, str]:
try:
return {"service_account_email": get_service_account_key().client_email}
return {
"service_account_email": get_service_account_key(
DocumentSource.GOOGLE_DRIVE
).client_email
}
except KvKeyNotFoundError:
raise HTTPException(
status_code=404, detail="Google Service Account Key not found"
@ -259,7 +272,7 @@ def upsert_google_service_account_key(
service_account_key: GoogleServiceAccountKey, _: User = Depends(current_admin_user)
) -> StatusResponse:
try:
upsert_service_account_key(service_account_key)
upsert_service_account_key(service_account_key, DocumentSource.GOOGLE_DRIVE)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -274,7 +287,7 @@ def delete_google_service_account_key(
db_session: Session = Depends(get_session),
) -> StatusResponse:
try:
delete_service_account_key()
delete_service_account_key(DocumentSource.GOOGLE_DRIVE)
cleanup_google_drive_credentials(db_session=db_session)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -322,7 +335,7 @@ def upsert_gmail_service_account_credential(
try:
credential_base = build_service_account_creds(
DocumentSource.GMAIL,
primary_admin_email=service_account_credential_request.gmail_delegated_user,
primary_admin_email=service_account_credential_request.gmail_primary_admin,
)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
@ -349,9 +362,9 @@ def check_drive_tokens(
):
return AuthStatus(authenticated=False)
token_json_str = str(db_credentials.credential_json[DB_CREDENTIALS_DICT_TOKEN_KEY])
google_drive_creds = get_google_drive_creds_for_authorized_user(
google_drive_creds = get_google_oauth_creds(
token_json_str=token_json_str,
scopes=GOOGLE_DRIVE_SCOPES,
source=DocumentSource.GOOGLE_DRIVE,
)
if google_drive_creds is None:
return AuthStatus(authenticated=False)
@ -881,7 +894,7 @@ def gmail_auth(
httponly=True,
max_age=600,
)
return AuthUrl(auth_url=get_gmail_auth_url(int(credential_id)))
return AuthUrl(auth_url=get_auth_url(int(credential_id)))
@router.get("/connector/google-drive/authorize/{credential_id}")
@ -913,8 +926,8 @@ def gmail_callback(
credential_id = int(credential_id_cookie)
verify_csrf(credential_id, callback.state)
if (
update_gmail_credential_access_tokens(
callback.code, credential_id, user, db_session
update_credential_access_tokens(
callback.code, credential_id, user, db_session, DocumentSource.GMAIL
)
is None
):
@ -941,7 +954,7 @@ def google_drive_callback(
verify_csrf(credential_id, callback.state)
credentials: Credentials | None = update_credential_access_tokens(
callback.code, credential_id, user, db_session
callback.code, credential_id, user, db_session, DocumentSource.GOOGLE_DRIVE
)
if credentials is None:
raise HTTPException(

View File

@ -378,15 +378,15 @@ class GoogleServiceAccountKey(BaseModel):
class GoogleServiceAccountCredentialRequest(BaseModel):
google_drive_primary_admin: str | None = None # email of user to impersonate
gmail_delegated_user: str | None = None # email of user to impersonate
gmail_primary_admin: str | None = None # email of user to impersonate
@model_validator(mode="after")
def check_user_delegation(self) -> "GoogleServiceAccountCredentialRequest":
if (self.google_drive_primary_admin is None) == (
self.gmail_delegated_user is None
self.gmail_primary_admin is None
):
raise ValueError(
"Exactly one of google_drive_primary_admin or gmail_delegated_user must be set"
"Exactly one of google_drive_primary_admin or gmail_primary_admin must be set"
)
return self

View File

@ -197,10 +197,10 @@ def bulk_invite_users(
email_info = validate_email(email)
normalized_emails.append(email_info.normalized) # type: ignore
except (EmailUndeliverableError, EmailNotValidError):
except (EmailUndeliverableError, EmailNotValidError) as e:
raise HTTPException(
status_code=400,
detail="One or more emails in the list are invalid",
detail=f"Invalid email address: {email} - {str(e)}",
)
if MULTI_TENANT:

View File

@ -0,0 +1,68 @@
from datetime import datetime
from datetime import timezone
from sqlalchemy.orm import Session
from danswer.access.models import ExternalAccess
from danswer.connectors.gmail.connector import GmailConnector
from danswer.connectors.interfaces import GenerateSlimDocumentOutput
from danswer.db.models import ConnectorCredentialPair
from danswer.db.users import batch_add_non_web_user_if_not_exists__no_commit
from danswer.utils.logger import setup_logger
from ee.danswer.db.document import upsert_document_external_perms__no_commit
logger = setup_logger()
def _get_slim_doc_generator(
cc_pair: ConnectorCredentialPair,
gmail_connector: GmailConnector,
) -> GenerateSlimDocumentOutput:
current_time = datetime.now(timezone.utc)
start_time = (
cc_pair.last_time_perm_sync.replace(tzinfo=timezone.utc).timestamp()
if cc_pair.last_time_perm_sync
else 0.0
)
return gmail_connector.retrieve_all_slim_documents(
start=start_time, end=current_time.timestamp()
)
def gmail_doc_sync(
db_session: Session,
cc_pair: ConnectorCredentialPair,
) -> None:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
it in postgres so that when it gets created later, the permissions are
already populated
"""
gmail_connector = GmailConnector(**cc_pair.connector.connector_specific_config)
gmail_connector.load_credentials(cc_pair.credential.credential_json)
slim_doc_generator = _get_slim_doc_generator(cc_pair, gmail_connector)
for slim_doc_batch in slim_doc_generator:
for slim_doc in slim_doc_batch:
if slim_doc.perm_sync_data is None:
logger.warning(f"No permissions found for document {slim_doc.id}")
continue
if user_email := slim_doc.perm_sync_data.get("user_email"):
ext_access = ExternalAccess(
external_user_emails=set([user_email]),
external_user_group_ids=set(),
is_public=False,
)
batch_add_non_web_user_if_not_exists__no_commit(
db_session=db_session,
emails=list(ext_access.external_user_emails),
)
upsert_document_external_perms__no_commit(
db_session=db_session,
doc_id=slim_doc.id,
external_access=ext_access,
source_type=cc_pair.connector.source,
)

View File

@ -6,8 +6,8 @@ from sqlalchemy.orm import Session
from danswer.access.models import ExternalAccess
from danswer.connectors.google_drive.connector import GoogleDriveConnector
from danswer.connectors.google_drive.google_utils import execute_paginated_retrieval
from danswer.connectors.google_drive.resources import get_drive_service
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.google_utils.resources import get_drive_service
from danswer.connectors.interfaces import GenerateSlimDocumentOutput
from danswer.connectors.models import SlimDocument
from danswer.db.models import ConnectorCredentialPair

View File

@ -1,8 +1,8 @@
from sqlalchemy.orm import Session
from danswer.connectors.google_drive.connector import GoogleDriveConnector
from danswer.connectors.google_drive.google_utils import execute_paginated_retrieval
from danswer.connectors.google_drive.resources import get_admin_service
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.google_utils.resources import get_admin_service
from danswer.db.models import ConnectorCredentialPair
from danswer.db.users import batch_add_non_web_user_if_not_exists__no_commit
from danswer.utils.logger import setup_logger

View File

@ -6,6 +6,7 @@ from danswer.configs.constants import DocumentSource
from danswer.db.models import ConnectorCredentialPair
from ee.danswer.external_permissions.confluence.doc_sync import confluence_doc_sync
from ee.danswer.external_permissions.confluence.group_sync import confluence_group_sync
from ee.danswer.external_permissions.gmail.doc_sync import gmail_doc_sync
from ee.danswer.external_permissions.google_drive.doc_sync import gdrive_doc_sync
from ee.danswer.external_permissions.google_drive.group_sync import gdrive_group_sync
from ee.danswer.external_permissions.slack.doc_sync import slack_doc_sync
@ -28,6 +29,7 @@ DOC_PERMISSIONS_FUNC_MAP: dict[DocumentSource, SyncFuncType] = {
DocumentSource.GOOGLE_DRIVE: gdrive_doc_sync,
DocumentSource.CONFLUENCE: confluence_doc_sync,
DocumentSource.SLACK: slack_doc_sync,
DocumentSource.GMAIL: gmail_doc_sync,
}
# These functions update:

View File

@ -0,0 +1,89 @@
import json
import os
from collections.abc import Callable
import pytest
from danswer.connectors.gmail.connector import GmailConnector
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
from tests.load_env_vars import load_env_vars
# Load environment variables at the module level
load_env_vars()
def parse_credentials(env_str: str) -> dict:
"""
Parse a double-escaped JSON string from environment variables into a Python dictionary.
Args:
env_str (str): The double-escaped JSON string from environment variables
Returns:
dict: Parsed OAuth credentials
"""
# first try normally
try:
return json.loads(env_str)
except Exception:
# First, try remove extra escaping backslashes
unescaped = env_str.replace('\\"', '"')
# remove leading / trailing quotes
unescaped = unescaped.strip('"')
# Now parse the JSON
return json.loads(unescaped)
@pytest.fixture
def google_gmail_oauth_connector_factory() -> Callable[..., GmailConnector]:
def _connector_factory(
primary_admin_email: str = "admin@onyx-test.com",
) -> GmailConnector:
print("Creating GmailConnector with OAuth credentials")
connector = GmailConnector()
json_string = os.environ["GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR"]
refried_json_string = json.dumps(parse_credentials(json_string))
credentials_json = {
DB_CREDENTIALS_DICT_TOKEN_KEY: refried_json_string,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: primary_admin_email,
}
connector.load_credentials(credentials_json)
return connector
return _connector_factory
@pytest.fixture
def google_gmail_service_acct_connector_factory() -> Callable[..., GmailConnector]:
def _connector_factory(
primary_admin_email: str = "admin@onyx-test.com",
) -> GmailConnector:
print("Creating GmailConnector with service account credentials")
connector = GmailConnector()
json_string = os.environ["GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR"]
refried_json_string = json.dumps(parse_credentials(json_string))
# Load Service Account Credentials
connector.load_credentials(
{
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: refried_json_string,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: primary_admin_email,
}
)
return connector
return _connector_factory

View File

@ -0,0 +1,125 @@
from collections.abc import Callable
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from danswer.connectors.gmail.connector import GmailConnector
from danswer.connectors.models import Document
from danswer.connectors.models import SlimDocument
_THREAD_1_START_TIME = 1730568700
_THREAD_1_END_TIME = 1730569000
"""
This thread was 4 emails long:
admin@onyx-test.com -> test-group-1@onyx-test.com (conaining test_user_1 and test_user_2)
test_user_1@onyx-test.com -> admin@onyx-test.com
admin@onyx-test.com -> test_user_2@onyx-test.com + BCC: test_user_3@onyx-test.com
test_user_3@onyx-test.com -> admin@onyx-test.com
"""
_THREAD_1_BY_ID: dict[str, dict[str, Any]] = {
"192edefb315737c3": {
"email": "admin@onyx-test.com",
"sections_count": 4,
"primary_owners": set(
[
"admin@onyx-test.com",
"test_user_1@onyx-test.com",
"test_user_3@onyx-test.com",
]
),
"secondary_owners": set(
[
"test-group-1@onyx-test.com",
"admin@onyx-test.com",
"test_user_2@onyx-test.com",
"test_user_3@onyx-test.com",
]
),
},
"192edf020d2f5def": {
"email": "test_user_1@onyx-test.com",
"sections_count": 2,
"primary_owners": set(["admin@onyx-test.com", "test_user_1@onyx-test.com"]),
"secondary_owners": set(["test-group-1@onyx-test.com", "admin@onyx-test.com"]),
},
"192edf020ae90aab": {
"email": "test_user_2@onyx-test.com",
"sections_count": 2,
"primary_owners": set(["admin@onyx-test.com"]),
"secondary_owners": set(
["test-group-1@onyx-test.com", "test_user_2@onyx-test.com"]
),
},
"192edf18316015fa": {
"email": "test_user_3@onyx-test.com",
"sections_count": 2,
"primary_owners": set(["admin@onyx-test.com", "test_user_3@onyx-test.com"]),
"secondary_owners": set(
[
"admin@onyx-test.com",
"test_user_2@onyx-test.com",
"test_user_3@onyx-test.com",
]
),
},
}
@patch(
"danswer.file_processing.extract_file_text.get_unstructured_api_key",
return_value=None,
)
def test_slim_docs_retrieval(
mock_get_api_key: MagicMock,
google_gmail_service_acct_connector_factory: Callable[..., GmailConnector],
) -> None:
print("\n\nRunning test_slim_docs_retrieval")
connector = google_gmail_service_acct_connector_factory()
retrieved_slim_docs: list[SlimDocument] = []
for doc_batch in connector.retrieve_all_slim_documents(
_THREAD_1_START_TIME, _THREAD_1_END_TIME
):
retrieved_slim_docs.extend(doc_batch)
assert len(retrieved_slim_docs) == 4
for doc in retrieved_slim_docs:
permission_info = doc.perm_sync_data
assert isinstance(permission_info, dict)
user_email = permission_info["user_email"]
assert _THREAD_1_BY_ID[doc.id]["email"] == user_email
@patch(
"danswer.file_processing.extract_file_text.get_unstructured_api_key",
return_value=None,
)
def test_docs_retrieval(
mock_get_api_key: MagicMock,
google_gmail_service_acct_connector_factory: Callable[..., GmailConnector],
) -> None:
print("\n\nRunning test_docs_retrieval")
connector = google_gmail_service_acct_connector_factory()
retrieved_docs: list[Document] = []
for doc_batch in connector.poll_source(_THREAD_1_START_TIME, _THREAD_1_END_TIME):
retrieved_docs.extend(doc_batch)
assert len(retrieved_docs) == 4
for doc in retrieved_docs:
id = doc.id
if doc.primary_owners:
retrieved_primary_owner_emails = set(
[owner.email for owner in doc.primary_owners]
)
if doc.secondary_owners:
retrieved_secondary_owner_emails = set(
[owner.email for owner in doc.secondary_owners]
)
assert _THREAD_1_BY_ID[id]["sections_count"] == len(doc.sections)
assert _THREAD_1_BY_ID[id]["primary_owners"] == retrieved_primary_owner_emails
assert (
_THREAD_1_BY_ID[id]["secondary_owners"] == retrieved_secondary_owner_emails
)

View File

@ -4,27 +4,17 @@ from collections.abc import Callable
import pytest
from danswer.configs.constants import KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY
from danswer.connectors.google_drive.connector import GoogleDriveConnector
from danswer.connectors.google_drive.connector_auth import DB_CREDENTIALS_DICT_TOKEN_KEY
from danswer.connectors.google_drive.connector_auth import (
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
)
from danswer.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_PRIMARY_ADMIN_KEY,
)
def load_env_vars(env_file: str = ".env") -> None:
current_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(current_dir, env_file)
try:
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
os.environ[key] = value.strip()
print("Successfully loaded environment variables")
except FileNotFoundError:
print(f"File {env_file} not found")
from tests.load_env_vars import load_env_vars
# Load environment variables at the module level
@ -65,6 +55,7 @@ def google_drive_oauth_connector_factory() -> Callable[..., GoogleDriveConnector
my_drive_emails: str | None = None,
shared_folder_urls: str | None = None,
) -> GoogleDriveConnector:
print("Creating GoogleDriveConnector with OAuth credentials")
connector = GoogleDriveConnector(
include_shared_drives=include_shared_drives,
shared_drive_urls=shared_drive_urls,
@ -113,7 +104,7 @@ def google_drive_service_acct_connector_factory() -> (
# Load Service Account Credentials
connector.load_credentials(
{
KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY: refried_json_string,
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY: refried_json_string,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: primary_admin_email,
}
)

View File

@ -102,7 +102,7 @@ def test_include_my_drives_only(
retrieved_docs.extend(doc_batch)
# Should only get everyone's My Drives
expected_file_ids = DRIVE_ID_MAPPING["ADMIN"]
expected_file_ids = list(range(0, 5)) # Admin's My Drive only
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,
@ -131,12 +131,7 @@ def test_drive_one_only(
retrieved_docs.extend(doc_batch)
# We ignore shared_drive_urls if include_shared_drives is False
expected_file_ids = (
DRIVE_ID_MAPPING["SHARED_DRIVE_1"]
+ DRIVE_ID_MAPPING["FOLDER_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
)
expected_file_ids = list(range(20, 40)) # Shared Drive 1 and its folders
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,
@ -166,14 +161,9 @@ def test_folder_and_shared_drive(
# Should
expected_file_ids = (
DRIVE_ID_MAPPING["ADMIN"]
+ DRIVE_ID_MAPPING["SHARED_DRIVE_1"]
+ DRIVE_ID_MAPPING["FOLDER_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
+ DRIVE_ID_MAPPING["FOLDER_2"]
+ DRIVE_ID_MAPPING["FOLDER_2_1"]
+ DRIVE_ID_MAPPING["FOLDER_2_2"]
list(range(0, 5)) # Admin's My Drive
+ list(range(20, 40)) # Shared Drive 1 and its folders
+ list(range(45, 60)) # Folder 2 and its subfolders
)
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
@ -205,12 +195,9 @@ def test_folders_only(
for doc_batch in connector.poll_source(0, time.time()):
retrieved_docs.extend(doc_batch)
expected_file_ids = (
DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
+ DRIVE_ID_MAPPING["FOLDER_2_1"]
+ DRIVE_ID_MAPPING["FOLDER_2_2"]
)
expected_file_ids = list(range(30, 40)) + list( # Folders 1_1 and 1_2
range(50, 60)
) # Folders 2_1 and 2_2
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,

View File

@ -105,12 +105,7 @@ def test_include_my_drives_only(
retrieved_docs.extend(doc_batch)
# Should only get everyone's My Drives
expected_file_ids = (
DRIVE_ID_MAPPING["ADMIN"]
+ DRIVE_ID_MAPPING["TEST_USER_1"]
+ DRIVE_ID_MAPPING["TEST_USER_2"]
+ DRIVE_ID_MAPPING["TEST_USER_3"]
)
expected_file_ids = list(range(0, 20)) # All My Drives
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,
@ -137,12 +132,7 @@ def test_drive_one_only(
retrieved_docs.extend(doc_batch)
# We ignore shared_drive_urls if include_shared_drives is False
expected_file_ids = (
DRIVE_ID_MAPPING["SHARED_DRIVE_1"]
+ DRIVE_ID_MAPPING["FOLDER_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
)
expected_file_ids = list(range(20, 40)) # Shared Drive 1 and its folders
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,
@ -174,17 +164,9 @@ def test_folder_and_shared_drive(
# Should
expected_file_ids = (
DRIVE_ID_MAPPING["ADMIN"]
+ DRIVE_ID_MAPPING["TEST_USER_1"]
+ DRIVE_ID_MAPPING["TEST_USER_2"]
+ DRIVE_ID_MAPPING["TEST_USER_3"]
+ DRIVE_ID_MAPPING["SHARED_DRIVE_1"]
+ DRIVE_ID_MAPPING["FOLDER_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
+ DRIVE_ID_MAPPING["FOLDER_2"]
+ DRIVE_ID_MAPPING["FOLDER_2_1"]
+ DRIVE_ID_MAPPING["FOLDER_2_2"]
list(range(0, 20)) # All My Drives
+ list(range(20, 40)) # Shared Drive 1 and its folders
+ list(range(45, 60)) # Folder 2 and its subfolders
)
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
@ -216,12 +198,9 @@ def test_folders_only(
for doc_batch in connector.poll_source(0, time.time()):
retrieved_docs.extend(doc_batch)
expected_file_ids = (
DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
+ DRIVE_ID_MAPPING["FOLDER_2_1"]
+ DRIVE_ID_MAPPING["FOLDER_2_2"]
)
expected_file_ids = list(range(30, 40)) + list( # Folders 1_1 and 1_2
range(50, 60)
) # Folders 2_1 and 2_2
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,
@ -250,9 +229,9 @@ def test_specific_emails(
for doc_batch in connector.poll_source(0, time.time()):
retrieved_docs.extend(doc_batch)
expected_file_ids = (
DRIVE_ID_MAPPING["TEST_USER_1"] + DRIVE_ID_MAPPING["TEST_USER_3"]
)
expected_file_ids = list(range(5, 10)) + list(
range(15, 20)
) # TEST_USER_1 and TEST_USER_3 My Drives
assert_retrieved_docs_match_expected(
retrieved_docs=retrieved_docs,
expected_file_ids=expected_file_ids,

View File

@ -5,13 +5,11 @@ from unittest.mock import patch
from danswer.access.models import ExternalAccess
from danswer.connectors.google_drive.connector import GoogleDriveConnector
from danswer.connectors.google_drive.google_utils import execute_paginated_retrieval
from danswer.connectors.google_drive.resources import get_admin_service
from danswer.connectors.google_utils.google_utils import execute_paginated_retrieval
from danswer.connectors.google_utils.resources import get_admin_service
from ee.danswer.external_permissions.google_drive.doc_sync import (
_get_permissions_from_slim_doc,
)
from tests.daily.connectors.google_drive.helpers import ACCESS_MAPPING
from tests.daily.connectors.google_drive.helpers import DRIVE_ID_MAPPING
from tests.daily.connectors.google_drive.helpers import EMAIL_MAPPING
from tests.daily.connectors.google_drive.helpers import file_name_template
from tests.daily.connectors.google_drive.helpers import print_discrepencies
@ -130,19 +128,19 @@ def test_all_permissions(
print(file_name, external_access)
expected_file_range = (
DRIVE_ID_MAPPING["ADMIN"]
+ DRIVE_ID_MAPPING["TEST_USER_1"]
+ DRIVE_ID_MAPPING["TEST_USER_2"]
+ DRIVE_ID_MAPPING["TEST_USER_3"]
+ DRIVE_ID_MAPPING["SHARED_DRIVE_1"]
+ DRIVE_ID_MAPPING["FOLDER_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_1"]
+ DRIVE_ID_MAPPING["FOLDER_1_2"]
+ DRIVE_ID_MAPPING["SHARED_DRIVE_2"]
+ DRIVE_ID_MAPPING["FOLDER_2"]
+ DRIVE_ID_MAPPING["FOLDER_2_1"]
+ DRIVE_ID_MAPPING["FOLDER_2_2"]
+ DRIVE_ID_MAPPING["SECTIONS"]
list(range(0, 5)) # Admin's My Drive
+ list(range(5, 10)) # TEST_USER_1's My Drive
+ list(range(10, 15)) # TEST_USER_2's My Drive
+ list(range(15, 20)) # TEST_USER_3's My Drive
+ list(range(20, 25)) # Shared Drive 1
+ list(range(25, 30)) # Folder 1
+ list(range(30, 35)) # Folder 1_1
+ list(range(35, 40)) # Folder 1_2
+ list(range(40, 45)) # Shared Drive 2
+ list(range(45, 50)) # Folder 2
+ list(range(50, 55)) # Folder 2_1
+ list(range(55, 60)) # Folder 2_2
+ [61] # Sections
)
# Should get everything
@ -154,26 +152,33 @@ def test_all_permissions(
assert_correct_access_for_user(
user_email=EMAIL_MAPPING["ADMIN"],
expected_access_ids=ACCESS_MAPPING["ADMIN"],
expected_access_ids=list(range(0, 5)) # Admin's My Drive
+ list(range(20, 60)) # All shared drive content
+ [61], # Sections
group_map=group_map,
retrieved_access_map=access_map,
)
assert_correct_access_for_user(
user_email=EMAIL_MAPPING["TEST_USER_1"],
expected_access_ids=ACCESS_MAPPING["TEST_USER_1"],
expected_access_ids=list(range(5, 10)) # TEST_USER_1's My Drive
+ list(range(20, 40)) # Shared Drive 1 and its folders
+ list(range(0, 2)), # Access to some of Admin's files
group_map=group_map,
retrieved_access_map=access_map,
)
assert_correct_access_for_user(
user_email=EMAIL_MAPPING["TEST_USER_2"],
expected_access_ids=ACCESS_MAPPING["TEST_USER_2"],
expected_access_ids=list(range(10, 15)) # TEST_USER_2's My Drive
+ list(range(25, 40)) # Folder 1 and its subfolders
+ list(range(50, 55)) # Folder 2_1
+ list(range(45, 47)), # Some files in Folder 2
group_map=group_map,
retrieved_access_map=access_map,
)
assert_correct_access_for_user(
user_email=EMAIL_MAPPING["TEST_USER_3"],
expected_access_ids=ACCESS_MAPPING["TEST_USER_3"],
expected_access_ids=list(range(15, 20)), # TEST_USER_3's My Drive only
group_map=group_map,
retrieved_access_map=access_map,
)

View File

@ -0,0 +1,16 @@
import os
def load_env_vars(env_file: str = ".env") -> None:
current_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(current_dir, env_file)
try:
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
os.environ[key] = value.strip()
print("Successfully loaded environment variables")
except FileNotFoundError:
print(f"File {env_file} not found")

View File

@ -1,205 +1,42 @@
import datetime
import pytest
from pytest_mock import MockFixture
import json
import os
from danswer.configs.constants import DocumentSource
from danswer.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from danswer.connectors.gmail.connector import GmailConnector
from danswer.connectors.gmail.connector import _build_time_range_query
from danswer.connectors.gmail.connector import thread_to_document
from danswer.connectors.models import Document
def test_email_to_document() -> None:
connector = GmailConnector()
email_id = "18cabedb1ea46b03"
email_subject = "Danswer Test Subject"
email_sender = "Google <no-reply@accounts.google.com>"
email_recipient = "test.mail@gmail.com"
email_date = "Wed, 27 Dec 2023 15:38:49 GMT"
email_labels = ["UNREAD", "IMPORTANT", "CATEGORY_UPDATES", "STARRED", "INBOX"]
full_email = {
"id": email_id,
"threadId": email_id,
"labelIds": email_labels,
"snippet": "A new sign-in. We noticed a new sign-in to your Google Account. If this was you, you don&#39;t need to do",
"payload": {
"partId": "",
"mimeType": "multipart/alternative",
"filename": "",
"headers": [
{"name": "Delivered-To", "value": email_recipient},
{"name": "Date", "value": email_date},
{
"name": "Message-ID",
"value": "<OhMtIhHwNS1NoOQRSQEWqw@notifications.google.com>",
},
{"name": "Subject", "value": email_subject},
{"name": "From", "value": email_sender},
{"name": "To", "value": email_recipient},
],
"body": {"size": 0},
"parts": [
{
"partId": "0",
"mimeType": "text/plain",
"filename": "",
"headers": [
{
"name": "Content-Type",
"value": 'text/plain; charset="UTF-8"; format=flowed; delsp=yes',
},
{"name": "Content-Transfer-Encoding", "value": "base64"},
],
"body": {
"size": 9,
"data": "dGVzdCBkYXRh",
},
},
{
"partId": "1",
"mimeType": "text/html",
"filename": "",
"headers": [
{"name": "Content-Type", "value": 'text/html; charset="UTF-8"'},
{
"name": "Content-Transfer-Encoding",
"value": "quoted-printable",
},
],
"body": {
"size": 9,
"data": "dGVzdCBkYXRh",
},
},
],
},
"sizeEstimate": 12048,
"historyId": "697762",
"internalDate": "1703691529000",
}
doc = connector._email_to_document(full_email)
def test_thread_to_document() -> None:
json_path = os.path.join(os.path.dirname(__file__), "thread.json")
with open(json_path, "r") as f:
full_email_thread = json.load(f)
doc = thread_to_document(full_email_thread)
assert type(doc) == Document
assert doc.source == DocumentSource.GMAIL
assert doc.title == "Danswer Test Subject"
assert doc.semantic_identifier == "Email Chain 1"
assert doc.doc_updated_at == datetime.datetime(
2023, 12, 27, 15, 38, 49, tzinfo=datetime.timezone.utc
2024, 11, 2, 17, 34, 55, tzinfo=datetime.timezone.utc
)
assert doc.metadata == {
"labels": email_labels,
"from": email_sender,
"to": email_recipient,
"date": email_date,
"subject": email_subject,
}
def test_fetch_mails_from_gmail_empty(mocker: MockFixture) -> None:
mock_discovery = mocker.patch("danswer.connectors.gmail.connector.discovery")
mock_discovery.build.return_value.users.return_value.messages.return_value.list.return_value.execute.return_value = {
"messages": []
}
connector = GmailConnector()
connector.creds = mocker.Mock()
with pytest.raises(StopIteration):
next(connector.load_from_state())
def test_fetch_mails_from_gmail(mocker: MockFixture) -> None:
mock_discovery = mocker.patch("danswer.connectors.gmail.connector.discovery")
email_id = "18cabedb1ea46b03"
email_subject = "Danswer Test Subject"
email_sender = "Google <no-reply@accounts.google.com>"
email_recipient = "test.mail@gmail.com"
mock_discovery.build.return_value.users.return_value.messages.return_value.list.return_value.execute.return_value = {
"messages": [{"id": email_id, "threadId": email_id}],
"nextPageToken": "14473313008248105741",
"resultSizeEstimate": 201,
}
mock_discovery.build.return_value.users.return_value.messages.return_value.get.return_value.execute.return_value = {
"id": email_id,
"threadId": email_id,
"labelIds": ["UNREAD", "IMPORTANT", "CATEGORY_UPDATES", "STARRED", "INBOX"],
"snippet": "A new sign-in. We noticed a new sign-in to your Google Account. If this was you, you don&#39;t need to do",
"payload": {
"partId": "",
"mimeType": "multipart/alternative",
"filename": "",
"headers": [
{"name": "Delivered-To", "value": email_recipient},
{"name": "Date", "value": "Wed, 27 Dec 2023 15:38:49 GMT"},
{
"name": "Message-ID",
"value": "<OhMtIhHwNS1NoOQRSQEWqw@notifications.google.com>",
},
{"name": "Subject", "value": email_subject},
{"name": "From", "value": email_sender},
{"name": "To", "value": email_recipient},
],
"body": {"size": 0},
"parts": [
{
"partId": "0",
"mimeType": "text/plain",
"filename": "",
"headers": [
{
"name": "Content-Type",
"value": 'text/plain; charset="UTF-8"; format=flowed; delsp=yes',
},
{"name": "Content-Transfer-Encoding", "value": "base64"},
],
"body": {
"size": 9,
"data": "dGVzdCBkYXRh",
},
},
{
"partId": "1",
"mimeType": "text/html",
"filename": "",
"headers": [
{"name": "Content-Type", "value": 'text/html; charset="UTF-8"'},
{
"name": "Content-Transfer-Encoding",
"value": "quoted-printable",
},
],
"body": {
"size": 9,
"data": "dGVzdCBkYXRh",
},
},
],
},
"sizeEstimate": 12048,
"historyId": "697762",
"internalDate": "1703691529000",
}
connector = GmailConnector()
connector.creds = mocker.Mock()
docs = next(connector.load_from_state())
assert len(docs) == 1
doc: Document = docs[0]
assert type(doc) == Document
assert doc.id == email_id
assert doc.title == email_subject
assert email_recipient in doc.sections[0].text
assert email_sender in doc.sections[0].text
assert len(doc.sections) == 4
assert doc.metadata == {}
def test_build_time_range_query() -> None:
time_range_start = 1703066296.159339
time_range_end = 1704984791.657404
query = GmailConnector._build_time_range_query(time_range_start, time_range_end)
query = _build_time_range_query(time_range_start, time_range_end)
assert query == "after:1703066296 before:1704984791"
query = GmailConnector._build_time_range_query(time_range_start, None)
query = _build_time_range_query(time_range_start, None)
assert query == "after:1703066296"
query = GmailConnector._build_time_range_query(None, time_range_end)
query = _build_time_range_query(None, time_range_end)
assert query == "before:1704984791"
query = GmailConnector._build_time_range_query(0.0, time_range_end)
query = _build_time_range_query(0.0, time_range_end)
assert query == "before:1704984791"
query = GmailConnector._build_time_range_query(None, None)
query = _build_time_range_query(None, None)
assert query is None

View File

@ -0,0 +1,349 @@
{
"id": "192edefb315737c3",
"messages": [
{
"id": "192edeff0dc743cf",
"payload": {
"headers": [
{
"name": "MIME-Version",
"value": "1.0"
},
{
"name": "Date",
"value": "Sat, 2 Nov 2024 10:32:57 -0700"
},
{
"name": "Message-ID",
"value": "<CABnEGTWbSYxvRDsxnXy1b2iQF=peGsHuOmrOcixpQFCJ9EBHHg@mail.gmail.com>"
},
{
"name": "Subject",
"value": "Email Chain 1"
},
{
"name": "From",
"value": "Test Admin Admin <admin@onyx-test.com>"
},
{
"name": "To",
"value": "test-group-1@onyx-test.com"
},
{
"name": "Content-Type",
"value": "multipart/alternative; boundary=\"0000000000004480480625f17117\""
}
],
"parts": [
{
"mimeType": "text/plain",
"body": {
"data": "VGhpcyBpcyBlbWFpbCAxIGluIGNoYWluIDENCg=="
}
},
{
"mimeType": "text/html",
"body": {
"data": "PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCAxIGluIGNoYWluIDE8L2Rpdj4NCg=="
}
}
]
}
},
{
"id": "192edf07fbcc8b2c",
"payload": {
"headers": [
{
"name": "Delivered-To",
"value": "admin@onyx-test.com"
},
{
"name": "Received",
"value": "by 2002:a59:b3cc:0:b0:491:1bbc:5e54 with SMTP id g12csp1873533vqt; Sat, 2 Nov 2024 10:33:34 -0700 (PDT)"
},
{
"name": "X-Received",
"value": "by 2002:a05:6102:1284:b0:4a9:555b:fb50 with SMTP id ada2fe7eead31-4a9555bfd21mr8428882137.20.1730568814436; Sat, 02 Nov 2024 10:33:34 -0700 (PDT)"
},
{
"name": "ARC-Seal",
"value": "i=1; a=rsa-sha256; t=1730568814; cv=none; d=google.com; s=arc-20240605; b=A75GBczY/LN8OhNdpZ1VM3opx5VWU3HWYnwCIL9TLBqEpNz2X74TXNkCevJkImB3VF BkFY7gHg7d8oGdsQvUp2EEdRBXKoYT8P4PTc3ZSD2W8LYU2XCudIbA5xtGObELmI0h0f bCXT8dE7m6hGJPTg0WPSlkvGs2bY52bmSbCbrnrA/Mx/oyxYPzwv5cMw3CLMXo/8nOLO FAzrnMTKRqYtn/QvYjUne7PpVSYPk0Edg5261/jn9qatyyL8VePU4FriQTffjAC85Ayc jikVA5QnsYO79aXJE0SIw4xBHwtOgmyWhU9TPw2NfuQHZWrm39JudUYlmZb8MV4VpX6p otxw=="
},
{
"name": "ARC-Message-Signature",
"value": "i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20240605; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :dkim-signature; bh=9Eo5wYdnqXP3axXBCAwTODK4DvptOqG5RNct/xfBak4=; fh=/JhVJcrFVXWWzpGRY8HXA/cCDTQzCntn8VCeyDmjzic=; b=bkhR3iHOUD64TOG3Mqfd9BMT/2IF9gHEjHZWR/tet5J05UKFhk2d4k69wuSLNJcxlF dB6zzgt1vvEnCbSV+XBCEG1zW76T/sN6Ldn7+5xomsGFYvTZsW4E7OJqxkedfdpFeWwc eBlgX765wnBs4ztktDhK6gO8igWx3CaYH5wbX72DV4wqcQpDNpMqNHK7sHrlOG2YJGzV 7i3tli4dJqu1zgQK+lo1or1QQyadFzhbwX2iFdSLTNSNR3s70kqqBOT69lDMv84dfKCp +hXE0uwjOY/9lGG9rO1/e5WWEDC2BSZ7wzjvvyBRjDG+lavBqTggUizd8W+MlRYXONAX t7Kg==; dara=google.com"
},
{
"name": "ARC-Authentication-Results",
"value": "i=1; mx.google.com; dkim=pass header.i=@onyx-test-com.20230601.gappssmtp.com header.s=20230601 header.b=Z57TqzI7; spf=none (google.com: test_user_1@onyx-test.com does not designate permitted sender hosts) smtp.mailfrom=test_user_1@onyx-test.com; dara=pass header.i=@onyx-test.com"
},
{
"name": "Return-Path",
"value": "<test_user_1@onyx-test.com>"
},
{
"name": "Received",
"value": "from mail-sor-f41.google.com (mail-sor-f41.google.com. [209.85.220.41]) by mx.google.com with SMTPS id a1e0cc1a2514c-855dae589a1sor1192309241.6.2024.11.02.10.33.34 for <admin@onyx-test.com> (Google Transport Security); Sat, 02 Nov 2024 10:33:34 -0700 (PDT)"
},
{
"name": "Received-SPF",
"value": "none (google.com: test_user_1@onyx-test.com does not designate permitted sender hosts) client-ip=209.85.220.41;"
},
{
"name": "Authentication-Results",
"value": "mx.google.com; dkim=pass header.i=@onyx-test-com.20230601.gappssmtp.com header.s=20230601 header.b=Z57TqzI7; spf=none (google.com: test_user_1@onyx-test.com does not designate permitted sender hosts) smtp.mailfrom=test_user_1@onyx-test.com; dara=pass header.i=@onyx-test.com"
},
{
"name": "DKIM-Signature",
"value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=onyx-test-com.20230601.gappssmtp.com; s=20230601; t=1730568814; x=1731173614; darn=onyx-test.com; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :from:to:cc:subject:date:message-id:reply-to; bh=9Eo5wYdnqXP3axXBCAwTODK4DvptOqG5RNct/xfBak4=; b=Z57TqzI7sEwwOumQx0z6YhibC1x2CHlNmBjwyQT1mNOUScZbzo6nmH8Ydo7slsTfgZ rgwKEEYkf/CYlFWGUEzGzc22jVUCSMjNMFB0nEtfj+GPJaNjDR9FxjFLTUfSq64H/RCI eO9+oEAJHaa5QmceX2yiSJFXNqmVEMJNT+K6CnlbN5gW6CUD2tBt46vW83PVJgxKMc76 A7/eaDxdZDLUvpjHes4SvM7x0eBM9t7w9wb/jEjGqA54HI2YHVcxM4HJxrbCChYn8UoG 7+UOpfOmHTZLdLYgMtSqYanJ3BTENEdyVp2LIOZOhlUT7Hbr9esyeVyy765XTuRAWxmo DGPQ=="
},
{
"name": "X-Google-DKIM-Signature",
"value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1730568814; x=1731173614; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :x-gm-message-state:from:to:cc:subject:date:message-id:reply-to; bh=9Eo5wYdnqXP3axXBCAwTODK4DvptOqG5RNct/xfBak4=; b=fxuobWT2rW8kvQ14LUHbJEJOdCM4uBP+Obo7jL4w0BvwLrBNNbMPqMUc8d8u17dnS7 gczFCprOr5PZnVNmOZMQvmRTJ6poTkWOGQhsOyDOSLNI0IzuaN2wh9qjmFez6Z9nTx3f Lo0I0uahwzNkExywHC9x0H3NOZlS4074qkyLJObgnOHa5vml8SEcChMuzOQuCSU9wNjO t26urEoct8LArf0K/xztjxpEpDCgnf4Cr/KmZfi4/2Sjv4jwQzkLVuiwADraHIJbLv1m UMNs92dakWYK0cBbuwOx/sYpUWWyhVmv6Q0LqXzJjtpY4Z0zsnpI2UCrkAdAOSh7geEJ LCnw=="
},
{
"name": "X-Gm-Message-State",
"value": "AOJu0YyCYZOHIzoRHgMd7foUCpX2JYDwPS2XsTjWiMkkR364/mhFKFsQ vixTj7QM6pDecoDxn8pS0btM7b8z+cwo/8hFiYNgp26wK5L0aGymu+M8OuEk/73fuEthWVV0eko B9LvS5+qixa/oNO/HkRJpVTQmAH7OTT25KeZJj0Dd3x1JqsrfiNE="
},
{
"name": "X-Google-Smtp-Source",
"value": "AGHT+IHCMrQhOT9sgPUOQJL1oVfxMruiLg3BZ5DXqKMdQ7PYF2puka6Ovabv3BPg08CeyS1ovKydIdwHT2uleZkkAaU="
},
{
"name": "X-Received",
"value": "by 2002:a05:6102:5092:b0:4a3:e05e:f6a3 with SMTP id ada2fe7eead31-4a900e11589mr14462681137.3.1730568813787; Sat, 02 Nov 2024 10:33:33 -0700 (PDT)"
},
{
"name": "MIME-Version",
"value": "1.0"
},
{
"name": "References",
"value": "<CABnEGTWbSYxvRDsxnXy1b2iQF=peGsHuOmrOcixpQFCJ9EBHHg@mail.gmail.com>"
},
{
"name": "In-Reply-To",
"value": "<CABnEGTWbSYxvRDsxnXy1b2iQF=peGsHuOmrOcixpQFCJ9EBHHg@mail.gmail.com>"
},
{
"name": "From",
"value": "test_user_1 1 <test_user_1@onyx-test.com>"
},
{
"name": "Date",
"value": "Sat, 2 Nov 2024 10:33:22 -0700"
},
{
"name": "Message-ID",
"value": "<CANSSAx8n6=Kr4sQaGVYaKj63Hdb4=NCffD6OhAADYm+2fe7_dw@mail.gmail.com>"
},
{
"name": "Subject",
"value": "Re: Email Chain 1"
},
{
"name": "To",
"value": "Test Admin Admin <admin@onyx-test.com>"
},
{
"name": "Content-Type",
"value": "multipart/alternative; boundary=\"00000000000067dbf70625f1730f\""
}
],
"parts": [
{
"mimeType": "text/plain",
"body": {
"data": "VGhpcyBpcyBlbWFpbCAyIGluIGNoYWluIDENCg0KT24gU2F0LCBOb3YgMiwgMjAyNCBhdCAxMDozM-KAr0FNIFRlc3QgQWRtaW4gQWRtaW4gPGFkbWluQG9ueXgtdGVzdC5jb20-DQp3cm90ZToNCg0KPiBUaGlzIGlzIGVtYWlsIDEgaW4gY2hhaW4gMQ0KPg0K"
}
},
{
"mimeType": "text/html",
"body": {
"data": "PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCAyIGluIGNoYWluIDE8L2Rpdj48YnI-PGRpdiBjbGFzcz0iZ21haWxfcXVvdGUiPjxkaXYgZGlyPSJsdHIiIGNsYXNzPSJnbWFpbF9hdHRyIj5PbiBTYXQsIE5vdiAyLCAyMDI0IGF0IDEwOjMz4oCvQU0gVGVzdCBBZG1pbiBBZG1pbiAmbHQ7PGEgaHJlZj0ibWFpbHRvOmFkbWluQG9ueXgtdGVzdC5jb20iPmFkbWluQG9ueXgtdGVzdC5jb208L2E-Jmd0OyB3cm90ZTo8YnI-PC9kaXY-PGJsb2NrcXVvdGUgY2xhc3M9ImdtYWlsX3F1b3RlIiBzdHlsZT0ibWFyZ2luOjBweCAwcHggMHB4IDAuOGV4O2JvcmRlci1sZWZ0OjFweCBzb2xpZCByZ2IoMjA0LDIwNCwyMDQpO3BhZGRpbmctbGVmdDoxZXgiPjxkaXYgZGlyPSJsdHIiPlRoaXMgaXMgZW1haWwgMSBpbiBjaGFpbiAxPC9kaXY-DQo8L2Jsb2NrcXVvdGU-PC9kaXY-DQo="
}
}
]
}
},
{
"id": "192edf157175fcec",
"payload": {
"headers": [
{
"name": "MIME-Version",
"value": "1.0"
},
{
"name": "Date",
"value": "Sat, 2 Nov 2024 10:34:29 -0700"
},
{
"name": "References",
"value": "<CABnEGTWbSYxvRDsxnXy1b2iQF=peGsHuOmrOcixpQFCJ9EBHHg@mail.gmail.com> <CANSSAx8n6=Kr4sQaGVYaKj63Hdb4=NCffD6OhAADYm+2fe7_dw@mail.gmail.com>"
},
{
"name": "In-Reply-To",
"value": "<CANSSAx8n6=Kr4sQaGVYaKj63Hdb4=NCffD6OhAADYm+2fe7_dw@mail.gmail.com>"
},
{
"name": "Bcc",
"value": "test_user_3@onyx-test.com"
},
{
"name": "Message-ID",
"value": "<CABnEGTUEDvhfyOWTCauhTCn5mVXGp6p1=yw65RUsGu8E=c2k4g@mail.gmail.com>"
},
{
"name": "Subject",
"value": "Fwd: Email Chain 1"
},
{
"name": "From",
"value": "Test Admin Admin <admin@onyx-test.com>"
},
{
"name": "To",
"value": "test_user_2 2 <test_user_2@onyx-test.com>"
},
{
"name": "Content-Type",
"value": "multipart/alternative; boundary=\"000000000000bf7afd0625f1764f\""
}
],
"parts": [
{
"mimeType": "text/plain",
"body": {
"data": "VGhpcyBpcyBlbWFpbCAzIGluIGNoYWluIDENCg0KLS0tLS0tLS0tLSBGb3J3YXJkZWQgbWVzc2FnZSAtLS0tLS0tLS0NCkZyb206IHRlc3RfdXNlcl8xIDEgPHRlc3RfdXNlcl8xQG9ueXgtdGVzdC5jb20-DQpEYXRlOiBTYXQsIE5vdiAyLCAyMDI0IGF0IDEwOjMz4oCvQU0NClN1YmplY3Q6IFJlOiBFbWFpbCBDaGFpbiAxDQpUbzogVGVzdCBBZG1pbiBBZG1pbiA8YWRtaW5Ab255eC10ZXN0LmNvbT4NCg0KDQpUaGlzIGlzIGVtYWlsIDIgaW4gY2hhaW4gMQ0KDQpPbiBTYXQsIE5vdiAyLCAyMDI0IGF0IDEwOjMz4oCvQU0gVGVzdCBBZG1pbiBBZG1pbiA8YWRtaW5Ab255eC10ZXN0LmNvbT4NCndyb3RlOg0KDQo-IFRoaXMgaXMgZW1haWwgMSBpbiBjaGFpbiAxDQo-DQo="
}
},
{
"mimeType": "text/html",
"body": {
"data": "PGRpdiBkaXI9Imx0ciI-PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCAzIGluIGNoYWluIDE8L2Rpdj48YnI-PGRpdiBjbGFzcz0iZ21haWxfcXVvdGUiPjxkaXYgZGlyPSJsdHIiIGNsYXNzPSJnbWFpbF9hdHRyIj4tLS0tLS0tLS0tIEZvcndhcmRlZCBtZXNzYWdlIC0tLS0tLS0tLTxicj5Gcm9tOiA8c3Ryb25nIGNsYXNzPSJnbWFpbF9zZW5kZXJuYW1lIiBkaXI9ImF1dG8iPnRlc3RfdXNlcl8xIDE8L3N0cm9uZz4gPHNwYW4gZGlyPSJhdXRvIj4mbHQ7PGEgaHJlZj0ibWFpbHRvOnRlc3RfdXNlcl8xQG9ueXgtdGVzdC5jb20iPnRlc3RfdXNlcl8xQG9ueXgtdGVzdC5jb208L2E-Jmd0Ozwvc3Bhbj48YnI-RGF0ZTogU2F0LCBOb3YgMiwgMjAyNCBhdCAxMDozM-KAr0FNPGJyPlN1YmplY3Q6IFJlOiBFbWFpbCBDaGFpbiAxPGJyPlRvOiBUZXN0IEFkbWluIEFkbWluICZsdDs8YSBocmVmPSJtYWlsdG86YWRtaW5Ab255eC10ZXN0LmNvbSI-YWRtaW5Ab255eC10ZXN0LmNvbTwvYT4mZ3Q7PGJyPjwvZGl2Pjxicj48YnI-PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCAyIGluIGNoYWluIDE8L2Rpdj48YnI-PGRpdiBjbGFzcz0iZ21haWxfcXVvdGUiPjxkaXYgZGlyPSJsdHIiIGNsYXNzPSJnbWFpbF9hdHRyIj5PbiBTYXQsIE5vdiAyLCAyMDI0IGF0IDEwOjMz4oCvQU0gVGVzdCBBZG1pbiBBZG1pbiAmbHQ7PGEgaHJlZj0ibWFpbHRvOmFkbWluQG9ueXgtdGVzdC5jb20iIHRhcmdldD0iX2JsYW5rIj5hZG1pbkBvbnl4LXRlc3QuY29tPC9hPiZndDsgd3JvdGU6PGJyPjwvZGl2PjxibG9ja3F1b3RlIGNsYXNzPSJnbWFpbF9xdW90ZSIgc3R5bGU9Im1hcmdpbjowcHggMHB4IDBweCAwLjhleDtib3JkZXItbGVmdDoxcHggc29saWQgcmdiKDIwNCwyMDQsMjA0KTtwYWRkaW5nLWxlZnQ6MWV4Ij48ZGl2IGRpcj0ibHRyIj5UaGlzIGlzIGVtYWlsIDEgaW4gY2hhaW4gMTwvZGl2Pg0KPC9ibG9ja3F1b3RlPjwvZGl2Pg0KPC9kaXY-PC9kaXY-DQo="
}
}
]
}
},
{
"id": "192edf1e8f7ecbb4",
"payload": {
"headers": [
{
"name": "Delivered-To",
"value": "admin@onyx-test.com"
},
{
"name": "Received",
"value": "by 2002:a59:b3cc:0:b0:491:1bbc:5e54 with SMTP id g12csp1874156vqt; Sat, 2 Nov 2024 10:35:07 -0700 (PDT)"
},
{
"name": "X-Received",
"value": "by 2002:a05:6122:319c:b0:50d:81f9:5210 with SMTP id 71dfb90a1353d-5105d128958mr15853812e0c.13.1730568906834; Sat, 02 Nov 2024 10:35:06 -0700 (PDT)"
},
{
"name": "ARC-Seal",
"value": "i=1; a=rsa-sha256; t=1730568906; cv=none; d=google.com; s=arc-20240605; b=JUd7S6ql1poKM5ox92op2g2Z67AS8sEkp5f/S+Mr5+7KSichsjAwixWg/YhhRhvaY/ UcykrbdaAeWfCuGtJgSq1nr1z5hB3iAltv/D2XCdJdOXzVDpVvaV9lT/YU6266VKtsnq gFVKfjyMe/MnNKvDITQL67A2gRvhiR3XWxwEVvrMArMpUb9bbudlF/5L3MQY4BCIvWLL 9uBv1ZnclghscsxspoG3CkULkGqHGUTKq6bPoUn/hOljiVdsVVagoOwhbDEcyMRKUDnm 2t3H7iiujhlBIDbRoLJR/6C+A6AMyNKPAFA3axM6EXrTOADMZ8a0JqFj8O4rktYpRV+d zHxQ=="
},
{
"name": "ARC-Message-Signature",
"value": "i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20240605; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :dkim-signature; bh=K0g0X/4URFSC1nuXjI7ZESJA66WnWcqwgfHOUDQ/kQo=; fh=/JhVJcrFVXWWzpGRY8HXA/cCDTQzCntn8VCeyDmjzic=; b=IarHhl5g5tjBhlMRRXo6WwTzaFOI4Q3w4ebNunftDUHwzV7Qu1hY0y7r3SRNaBb+qD ZncYUI6PF/Oo7eMG65IloXfu+kHUI8NJMaoERUWgEk21Tj6cOSRO4x/W6V5PSX7a4lWZ K1cNdAlaiWI09Esv07Vel975Bgrd+XiCwoVgJAAslHOJ2bZwSYWzvwLqdkCRVrAGJQ9/ I80kvOnNVesIFdIR6SGrhdz8xNIIoe60k8PjJRzkmzy/tEeKCYBz6W+NW4xoIaAVmKUw RvjI8JozUVkGzh+LLyx64MakPCZPWM+ft+D35JodarYh+KesF+HV/Oe7rjaw7JXZ1WoE OdJQ==; dara=google.com"
},
{
"name": "ARC-Authentication-Results",
"value": "i=1; mx.google.com; dkim=pass header.i=@onyx-test-com.20230601.gappssmtp.com header.s=20230601 header.b=1U8JkCbL; spf=none (google.com: test_user_3@onyx-test.com does not designate permitted sender hosts) smtp.mailfrom=test_user_3@onyx-test.com; dara=pass header.i=@onyx-test.com"
},
{
"name": "Return-Path",
"value": "<test_user_3@onyx-test.com>"
},
{
"name": "Received",
"value": "from mail-sor-f41.google.com (mail-sor-f41.google.com. [209.85.220.41]) by mx.google.com with SMTPS id 71dfb90a1353d-5106f3f9037sor1051490e0c.7.2024.11.02.10.35.06 for <admin@onyx-test.com> (Google Transport Security); Sat, 02 Nov 2024 10:35:06 -0700 (PDT)"
},
{
"name": "Received-SPF",
"value": "none (google.com: test_user_3@onyx-test.com does not designate permitted sender hosts) client-ip=209.85.220.41;"
},
{
"name": "Authentication-Results",
"value": "mx.google.com; dkim=pass header.i=@onyx-test-com.20230601.gappssmtp.com header.s=20230601 header.b=1U8JkCbL; spf=none (google.com: test_user_3@onyx-test.com does not designate permitted sender hosts) smtp.mailfrom=test_user_3@onyx-test.com; dara=pass header.i=@onyx-test.com"
},
{
"name": "DKIM-Signature",
"value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=onyx-test-com.20230601.gappssmtp.com; s=20230601; t=1730568906; x=1731173706; darn=onyx-test.com; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :from:to:cc:subject:date:message-id:reply-to; bh=K0g0X/4URFSC1nuXjI7ZESJA66WnWcqwgfHOUDQ/kQo=; b=1U8JkCbLjicGtH7otVX3QjKv/XK5fGnmOIVMTD/b9cO1w8ai2GwCuJbBo+z1IuGqto aRuNCcEqUIaFvVFiezvhL9xg7scIwHHvLOrSpmc0h0JMSx8q4kKaUGKEJpewsYvkStmr DYv/cUIeaPTIChSuUDV7FVMhf7jIyIaYry3i9/EIlw+on18nD30C9kXwds5yWW8XGvtR /OUuSdgJzuoNmypUt8v9Ebqd+LP23YTs+78/G1Ag+JjugxxF+C9cm7SxmooWueukRkm8 o8nQO5QVx/y/xsCZdM2XXcKCLcZIntuY48amlfFyIqrhG1/DEM6htD64meMGctNTptQf jHrw=="
},
{
"name": "X-Google-DKIM-Signature",
"value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1730568906; x=1731173706; h=to:subject:message-id:date:from:in-reply-to:references:mime-version :x-gm-message-state:from:to:cc:subject:date:message-id:reply-to; bh=K0g0X/4URFSC1nuXjI7ZESJA66WnWcqwgfHOUDQ/kQo=; b=J4+ozlusGGM1Hn95EZkDeYbExgkyOlAdcY6LcV4Wx1zeI78HtEXGgvqcZ5sP7HzS1X /A3i7WkgmjpC9bU2/zKLrfXDvYQ7udQwTJtKsKaUo4O65Al7Wtgz8e8rBDYikhqEEAZQ GbEwqp+qa+v0T4rPhkQKd4zpIE3AUd3eh5u5iF/UEYc1NcyV35uMGWRP4jOK6F67MwS7 73MgObcGqmBH48I4K+ITYAkNEMGOBpY6fheGxCxyDpcG5gbf8swlWX2Dd0EM9H72o+Xb jvAslOq1lZzPZUgyyZJ2wVEASxF8S7depiOLcTPKwsw+pgXIMAUBExBvu0u4PhO0qG+z pftQ=="
},
{
"name": "X-Gm-Message-State",
"value": "AOJu0Yy2r0aT3w7HBU7t0JGla+x3AddG9WdnQT06r6T/HGZwZ9Wp9TUs Orb/HMtgvXivtYFkG14NJkMTBO4EqSynmzaxAvEheDXB1uYE2LS21XoqrvycvYQh3GUHBwUdS8L lE6BUjm4TJfXlZWAqKRxg4C0j1UFSuVdkXf6P1GCsdyKKTeS6A9eohw=="
},
{
"name": "X-Google-Smtp-Source",
"value": "AGHT+IHXTB7Ar9w/Q3G3gCT19SVELYvWl30pNGuNiTmkYZgMWFS7YUWTkG/DS4/mrjMRXpYuclOLHv8BeOmw9Jovkr4="
},
{
"name": "X-Received",
"value": "by 2002:a05:6102:3a10:b0:4a9:49:26d2 with SMTP id ada2fe7eead31-4a90109fb68mr15589362137.29.1730568906301; Sat, 02 Nov 2024 10:35:06 -0700 (PDT)"
},
{
"name": "MIME-Version",
"value": "1.0"
},
{
"name": "References",
"value": "<CABnEGTWbSYxvRDsxnXy1b2iQF=peGsHuOmrOcixpQFCJ9EBHHg@mail.gmail.com> <CANSSAx8n6=Kr4sQaGVYaKj63Hdb4=NCffD6OhAADYm+2fe7_dw@mail.gmail.com> <CABnEGTUEDvhfyOWTCauhTCn5mVXGp6p1=yw65RUsGu8E=c2k4g@mail.gmail.com>"
},
{
"name": "In-Reply-To",
"value": "<CABnEGTUEDvhfyOWTCauhTCn5mVXGp6p1=yw65RUsGu8E=c2k4g@mail.gmail.com>"
},
{
"name": "From",
"value": "test_user_3 3 <test_user_3@onyx-test.com>"
},
{
"name": "Date",
"value": "Sat, 2 Nov 2024 10:34:55 -0700"
},
{
"name": "Message-ID",
"value": "<CACcF+8GU1V2_CcYsUFNOh0+oSkMG=oN-ioyPPXRsD+0Ghr-u-Q@mail.gmail.com>"
},
{
"name": "Subject",
"value": "Re: Email Chain 1"
},
{
"name": "To",
"value": "Test Admin Admin <admin@onyx-test.com>"
},
{
"name": "Content-Type",
"value": "multipart/alternative; boundary=\"000000000000eb82a70625f178cf\""
}
],
"parts": [
{
"mimeType": "text/plain",
"body": {
"data": "VGhpcyBpcyBlbWFpbCA0IGluIGNoYWluIDENCg0KT24gU2F0LCBOb3YgMiwgMjAyNCBhdCAxMDozNOKAr0FNIFRlc3QgQWRtaW4gQWRtaW4gPGFkbWluQG9ueXgtdGVzdC5jb20-DQp3cm90ZToNCg0KPiBUaGlzIGlzIGVtYWlsIDMgaW4gY2hhaW4gMQ0KPg0KPiAtLS0tLS0tLS0tIEZvcndhcmRlZCBtZXNzYWdlIC0tLS0tLS0tLQ0KPiBGcm9tOiB0ZXN0X3VzZXJfMSAxIDx0ZXN0X3VzZXJfMUBvbnl4LXRlc3QuY29tPg0KPiBEYXRlOiBTYXQsIE5vdiAyLCAyMDI0IGF0IDEwOjMz4oCvQU0NCj4gU3ViamVjdDogUmU6IEVtYWlsIENoYWluIDENCj4gVG86IFRlc3QgQWRtaW4gQWRtaW4gPGFkbWluQG9ueXgtdGVzdC5jb20-DQo-DQo-DQo-IFRoaXMgaXMgZW1haWwgMiBpbiBjaGFpbiAxDQo-DQo-IE9uIFNhdCwgTm92IDIsIDIwMjQgYXQgMTA6MzPigK9BTSBUZXN0IEFkbWluIEFkbWluIDxhZG1pbkBvbnl4LXRlc3QuY29tPg0KPiB3cm90ZToNCj4NCj4-IFRoaXMgaXMgZW1haWwgMSBpbiBjaGFpbiAxDQo-Pg0KPg0K"
}
},
{
"mimeType": "text/html",
"body": {
"data": "PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCA0IGluIGNoYWluIDE8YnIgY2xhc3M9ImdtYWlsLUFwcGxlLWludGVyY2hhbmdlLW5ld2xpbmUiPjwvZGl2Pjxicj48ZGl2IGNsYXNzPSJnbWFpbF9xdW90ZSI-PGRpdiBkaXI9Imx0ciIgY2xhc3M9ImdtYWlsX2F0dHIiPk9uIFNhdCwgTm92IDIsIDIwMjQgYXQgMTA6MzTigK9BTSBUZXN0IEFkbWluIEFkbWluICZsdDs8YSBocmVmPSJtYWlsdG86YWRtaW5Ab255eC10ZXN0LmNvbSI-YWRtaW5Ab255eC10ZXN0LmNvbTwvYT4mZ3Q7IHdyb3RlOjxicj48L2Rpdj48YmxvY2txdW90ZSBjbGFzcz0iZ21haWxfcXVvdGUiIHN0eWxlPSJtYXJnaW46MHB4IDBweCAwcHggMC44ZXg7Ym9yZGVyLWxlZnQ6MXB4IHNvbGlkIHJnYigyMDQsMjA0LDIwNCk7cGFkZGluZy1sZWZ0OjFleCI-PGRpdiBkaXI9Imx0ciI-PGRpdiBkaXI9Imx0ciI-VGhpcyBpcyBlbWFpbCAzIGluIGNoYWluIDE8L2Rpdj48YnI-PGRpdiBjbGFzcz0iZ21haWxfcXVvdGUiPjxkaXYgZGlyPSJsdHIiIGNsYXNzPSJnbWFpbF9hdHRyIj4tLS0tLS0tLS0tIEZvcndhcmRlZCBtZXNzYWdlIC0tLS0tLS0tLTxicj5Gcm9tOiA8c3Ryb25nIGNsYXNzPSJnbWFpbF9zZW5kZXJuYW1lIiBkaXI9ImF1dG8iPnRlc3RfdXNlcl8xIDE8L3N0cm9uZz4gPHNwYW4gZGlyPSJhdXRvIj4mbHQ7PGEgaHJlZj0ibWFpbHRvOnRlc3RfdXNlcl8xQG9ueXgtdGVzdC5jb20iIHRhcmdldD0iX2JsYW5rIj50ZXN0X3VzZXJfMUBvbnl4LXRlc3QuY29tPC9hPiZndDs8L3NwYW4-PGJyPkRhdGU6IFNhdCwgTm92IDIsIDIwMjQgYXQgMTA6MzPigK9BTTxicj5TdWJqZWN0OiBSZTogRW1haWwgQ2hhaW4gMTxicj5UbzogVGVzdCBBZG1pbiBBZG1pbiAmbHQ7PGEgaHJlZj0ibWFpbHRvOmFkbWluQG9ueXgtdGVzdC5jb20iIHRhcmdldD0iX2JsYW5rIj5hZG1pbkBvbnl4LXRlc3QuY29tPC9hPiZndDs8YnI-PC9kaXY-PGJyPjxicj48ZGl2IGRpcj0ibHRyIj5UaGlzIGlzIGVtYWlsIDIgaW4gY2hhaW4gMTwvZGl2Pjxicj48ZGl2IGNsYXNzPSJnbWFpbF9xdW90ZSI-PGRpdiBkaXI9Imx0ciIgY2xhc3M9ImdtYWlsX2F0dHIiPk9uIFNhdCwgTm92IDIsIDIwMjQgYXQgMTA6MzPigK9BTSBUZXN0IEFkbWluIEFkbWluICZsdDs8YSBocmVmPSJtYWlsdG86YWRtaW5Ab255eC10ZXN0LmNvbSIgdGFyZ2V0PSJfYmxhbmsiPmFkbWluQG9ueXgtdGVzdC5jb208L2E-Jmd0OyB3cm90ZTo8YnI-PC9kaXY-PGJsb2NrcXVvdGUgY2xhc3M9ImdtYWlsX3F1b3RlIiBzdHlsZT0ibWFyZ2luOjBweCAwcHggMHB4IDAuOGV4O2JvcmRlci1sZWZ0OjFweCBzb2xpZCByZ2IoMjA0LDIwNCwyMDQpO3BhZGRpbmctbGVmdDoxZXgiPjxkaXYgZGlyPSJsdHIiPlRoaXMgaXMgZW1haWwgMSBpbiBjaGFpbiAxPC9kaXY-DQo8L2Jsb2NrcXVvdGU-PC9kaXY-DQo8L2Rpdj48L2Rpdj4NCjwvYmxvY2txdW90ZT48L2Rpdj4NCg=="
}
}
]
}
}
]
}

View File

@ -142,8 +142,8 @@ export default function AddConnector({
const { popup, setPopup } = usePopup();
// Hooks for Google Drive and Gmail credentials
const { liveGDriveCredential } = useGoogleDriveCredentials();
const { liveGmailCredential } = useGmailCredentials();
const { liveGDriveCredential } = useGoogleDriveCredentials(connector);
const { liveGmailCredential } = useGmailCredentials(connector);
// Check if credential is activated
const credentialActivated =

View File

@ -413,7 +413,7 @@ export const DriveAuthSection = ({
<Form>
<TextFormField
name="google_drive_primary_admin"
label="User email to impersonate:"
label="Primary Admin Email:"
subtext="Enter the email of the user whose Google Drive access you want to delegate to the service account."
/>
<div className="flex">

View File

@ -10,6 +10,7 @@ import { GMAIL_AUTH_IS_ADMIN_COOKIE_NAME } from "@/lib/constants";
import Cookies from "js-cookie";
import { TextFormField } from "@/components/admin/connectors/Field";
import { Form, Formik } from "formik";
import { User } from "@/lib/types";
import CardSection from "@/components/admin/CardSection";
import {
Credential,
@ -293,9 +294,10 @@ interface DriveCredentialSectionProps {
setPopup: (popupSpec: PopupSpec | null) => void;
refreshCredentials: () => void;
connectorExists: boolean;
user: User | null;
}
export const GmailOAuthSection = ({
export const GmailAuthSection = ({
gmailPublicCredential,
gmailServiceAccountCredential,
serviceAccountKeyData,
@ -303,6 +305,7 @@ export const GmailOAuthSection = ({
setPopup,
refreshCredentials,
connectorExists,
user,
}: DriveCredentialSectionProps) => {
const router = useRouter();
@ -342,24 +345,22 @@ export const GmailOAuthSection = ({
return (
<div>
<p className="text-sm mb-2">
When using a Gmail Service Account, you can either have Danswer act as
the service account itself OR you can specify an account for the
service account to impersonate.
When using a Gmail Service Account, you must specify the email of the
primary admin that you would like the service account to impersonate.
<br />
<br />
If you want to use the service account itself, leave the{" "}
<b>&apos;User email to impersonate&apos;</b> field blank when
submitting. If you do choose this option, make sure you have shared
the documents you want to index with the service account.
For this connector to index all users Gmail, the primary admin email
should be an owner/admin of the Google Organization that being
indexed.
</p>
<CardSection>
<Formik
initialValues={{
gmail_delegated_user: "",
gmail_primary_admin: user?.email || "",
}}
validationSchema={Yup.object().shape({
gmail_delegated_user: Yup.string().optional(),
gmail_primary_admin: Yup.string().required(),
})}
onSubmit={async (values, formikHelpers) => {
formikHelpers.setSubmitting(true);
@ -372,7 +373,7 @@ export const GmailOAuthSection = ({
"Content-Type": "application/json",
},
body: JSON.stringify({
gmail_delegated_user: values.gmail_delegated_user,
gmail_primary_admin: values.gmail_primary_admin,
}),
}
);
@ -395,9 +396,9 @@ export const GmailOAuthSection = ({
{({ isSubmitting }) => (
<Form>
<TextFormField
name="gmail_delegated_user"
label="[Optional] User email to impersonate:"
subtext="If left blank, Danswer will use the service account itself."
name="gmail_primary_admin"
label="Primary Admin Email:"
subtext="You must provide an admin/owner account to retrieve all org emails."
/>
<div className="flex">
<button

View File

@ -10,7 +10,7 @@ import {
GmailCredentialJson,
GmailServiceAccountCredentialJson,
} from "@/lib/connectors/credentials";
import { GmailOAuthSection, GmailJsonUploadSection } from "./Credential";
import { GmailAuthSection, GmailJsonUploadSection } from "./Credential";
import {
usePublicCredentials,
useConnectorCredentialIndexingStatus,
@ -20,7 +20,7 @@ import { GmailConfig } from "@/lib/connectors/connectors";
import { useUser } from "@/components/user/UserProvider";
export const GmailMain = () => {
const { isLoadingUser, isAdmin } = useUser();
const { isLoadingUser, isAdmin, user } = useUser();
const {
data: appCredentialData,
@ -142,7 +142,7 @@ export const GmailMain = () => {
<Title className="mb-2 mt-6 ml-auto mr-auto">
Step 2: Authenticate with Danswer
</Title>
<GmailOAuthSection
<GmailAuthSection
setPopup={setPopup}
refreshCredentials={refreshCredentials}
gmailPublicCredential={gmailPublicCredential}
@ -150,6 +150,7 @@ export const GmailMain = () => {
appCredentialData={appCredentialData}
serviceAccountKeyData={serviceAccountKeyData}
connectorExists={gmailConnectorIndexingStatuses.length > 0}
user={user}
/>
</>
)}

View File

@ -12,7 +12,7 @@ import {
GoogleDriveServiceAccountCredentialJson,
} from "@/lib/connectors/credentials";
export const useGmailCredentials = () => {
export const useGmailCredentials = (connector: string) => {
const {
data: credentialsData,
isLoading: isCredentialsLoading,
@ -23,7 +23,9 @@ export const useGmailCredentials = () => {
const gmailPublicCredential: Credential<GmailCredentialJson> | undefined =
credentialsData?.find(
(credential) =>
credential.credential_json?.gmail_tokens && credential.admin_public
credential.credential_json?.google_service_account_key &&
credential.admin_public &&
credential.source === connector
);
const gmailServiceAccountCredential:
@ -36,18 +38,20 @@ export const useGmailCredentials = () => {
gmailPublicCredential || gmailServiceAccountCredential;
return {
liveGmailCredential,
liveGmailCredential: liveGmailCredential,
};
};
export const useGoogleDriveCredentials = () => {
export const useGoogleDriveCredentials = (connector: string) => {
const { data: credentialsData } = usePublicCredentials();
const googleDrivePublicCredential:
| Credential<GoogleDriveCredentialJson>
| undefined = credentialsData?.find(
(credential) =>
credential.credential_json?.google_drive_tokens && credential.admin_public
credential.credential_json?.google_service_account_key &&
credential.admin_public &&
credential.source === connector
);
const googleDriveServiceAccountCredential:
@ -60,6 +64,6 @@ export const useGoogleDriveCredentials = () => {
googleDrivePublicCredential || googleDriveServiceAccountCredential;
return {
liveGDriveCredential,
liveGDriveCredential: liveGDriveCredential,
};
};

View File

@ -13,5 +13,6 @@ export const autoSyncConfigBySource: Record<
> = {
confluence: {},
google_drive: {},
gmail: {},
slack: {},
};

View File

@ -54,6 +54,7 @@ export interface SlackCredentialJson {
export interface GmailCredentialJson {
gmail_tokens: string;
gmail_primary_admin: string;
}
export interface GoogleDriveCredentialJson {
@ -63,7 +64,7 @@ export interface GoogleDriveCredentialJson {
export interface GmailServiceAccountCredentialJson {
gmail_service_account_key: string;
gmail_delegated_user: string;
gmail_primary_admin: string;
}
export interface GoogleDriveServiceAccountCredentialJson {
@ -339,7 +340,7 @@ export const credentialDisplayNames: Record<string, string> = {
// Gmail Service Account
gmail_service_account_key: "Gmail Service Account Key",
gmail_delegated_user: "Gmail Delegated User",
gmail_primary_admin: "Gmail Primary Admin",
// Google Drive Service Account
google_drive_service_account_key: "Google Drive Service Account Key",

View File

@ -278,6 +278,7 @@ export type ConfigurableSources = Exclude<
export const validAutoSyncSources = [
"confluence",
"google_drive",
"gmail",
"slack",
] as const;
export type ValidAutoSyncSources = (typeof validAutoSyncSources)[number];