mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-08 20:08:36 +02:00
Confluence permission sync fix for server deployment (#2784)
* initial commit * Made perm sync with with cql * filter fix * undo connector changes * fixed everything * whoops
This commit is contained in:
parent
89eaa8bc30
commit
494fda906d
@ -1,18 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
from atlassian import Confluence # type:ignore
|
||||
|
||||
|
||||
def build_confluence_client(
|
||||
connector_specific_config: dict[str, Any], raw_credentials_json: dict[str, Any]
|
||||
) -> Confluence:
|
||||
is_cloud = connector_specific_config.get("is_cloud", False)
|
||||
return Confluence(
|
||||
api_version="cloud" if is_cloud else "latest",
|
||||
# Remove trailing slash from wiki_base if present
|
||||
url=connector_specific_config["wiki_base"].rstrip("/"),
|
||||
# passing in username causes issues for Confluence data center
|
||||
username=raw_credentials_json["confluence_username"] if is_cloud else None,
|
||||
password=raw_credentials_json["confluence_access_token"] if is_cloud else None,
|
||||
token=raw_credentials_json["confluence_access_token"] if not is_cloud else None,
|
||||
)
|
@ -1,12 +1,18 @@
|
||||
"""
|
||||
Rules defined here:
|
||||
https://confluence.atlassian.com/conf85/check-who-can-view-a-page-1283360557.html
|
||||
"""
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qs
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from atlassian import Confluence # type:ignore
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.access.models import ExternalAccess
|
||||
from danswer.connectors.confluence.confluence_utils import (
|
||||
build_confluence_document_id,
|
||||
)
|
||||
from danswer.connectors.confluence.connector import DanswerConfluence
|
||||
from danswer.connectors.confluence.rate_limit_handler import (
|
||||
make_confluence_call_handle_rate_limit,
|
||||
)
|
||||
@ -14,36 +20,136 @@ from danswer.db.models import ConnectorCredentialPair
|
||||
from danswer.db.users import batch_add_non_web_user_if_not_exists__no_commit
|
||||
from danswer.utils.logger import setup_logger
|
||||
from ee.danswer.db.document import upsert_document_external_perms__no_commit
|
||||
from ee.danswer.external_permissions.confluence.confluence_sync_utils import (
|
||||
from ee.danswer.external_permissions.confluence.sync_utils import (
|
||||
build_confluence_client,
|
||||
)
|
||||
|
||||
from ee.danswer.external_permissions.confluence.sync_utils import (
|
||||
get_user_email_from_username__server,
|
||||
)
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
_VIEWSPACE_PERMISSION_TYPE = "VIEWSPACE"
|
||||
_REQUEST_PAGINATION_LIMIT = 100
|
||||
|
||||
|
||||
def _extract_user_email(subjects: dict[str, Any]) -> str | None:
|
||||
# If the subject is a user, then return the user's email
|
||||
user = subjects.get("user", {})
|
||||
result = user.get("results", [{}])[0]
|
||||
return result.get("email")
|
||||
def _get_server_space_permissions(
|
||||
confluence_client: DanswerConfluence, space_key: str
|
||||
) -> ExternalAccess:
|
||||
get_space_permissions = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_space_permissions
|
||||
)
|
||||
|
||||
permissions = get_space_permissions(space_key)
|
||||
|
||||
viewspace_permissions = []
|
||||
for permission_category in permissions:
|
||||
if permission_category.get("type") == _VIEWSPACE_PERMISSION_TYPE:
|
||||
viewspace_permissions.extend(
|
||||
permission_category.get("spacePermissions", [])
|
||||
)
|
||||
|
||||
user_names = set()
|
||||
group_names = set()
|
||||
for permission in viewspace_permissions:
|
||||
if user_name := permission.get("userName"):
|
||||
user_names.add(user_name)
|
||||
if group_name := permission.get("groupName"):
|
||||
group_names.add(group_name)
|
||||
|
||||
user_emails = set()
|
||||
for user_name in user_names:
|
||||
user_email = get_user_email_from_username__server(confluence_client, user_name)
|
||||
if user_email:
|
||||
user_emails.add(user_email)
|
||||
else:
|
||||
logger.warning(f"Email for user {user_name} not found in Confluence")
|
||||
|
||||
return ExternalAccess(
|
||||
external_user_emails=user_emails,
|
||||
external_user_group_ids=group_names,
|
||||
# TODO: Check if the space is publicly accessible
|
||||
# Currently, we assume the space is not public
|
||||
# We need to check if anonymous access is turned on for the site and space
|
||||
# This information is paywalled so it remains unimplemented
|
||||
is_public=False,
|
||||
)
|
||||
|
||||
|
||||
def _extract_group_name(subjects: dict[str, Any]) -> str | None:
|
||||
# If the subject is a group, then return the group's name
|
||||
group = subjects.get("group", {})
|
||||
result = group.get("results", [{}])[0]
|
||||
return result.get("name")
|
||||
def _get_cloud_space_permissions(
|
||||
confluence_client: DanswerConfluence, space_key: str
|
||||
) -> ExternalAccess:
|
||||
get_space_permissions = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_space
|
||||
)
|
||||
space_permissions_result = get_space_permissions(
|
||||
space_key=space_key, expand="permissions"
|
||||
)
|
||||
space_permissions = space_permissions_result.get("permissions", [])
|
||||
|
||||
user_emails = set()
|
||||
group_names = set()
|
||||
is_externally_public = False
|
||||
for permission in space_permissions:
|
||||
subs = permission.get("subjects")
|
||||
if subs:
|
||||
# If there are subjects, then there are explicit users or groups with access
|
||||
if email := subs.get("user", {}).get("results", [{}])[0].get("email"):
|
||||
user_emails.add(email)
|
||||
if group_name := subs.get("group", {}).get("results", [{}])[0].get("name"):
|
||||
group_names.add(group_name)
|
||||
else:
|
||||
# If there are no subjects, then the permission is for everyone
|
||||
if permission.get("operation", {}).get(
|
||||
"operation"
|
||||
) == "read" and permission.get("anonymousAccess", False):
|
||||
# If the permission specifies read access for anonymous users, then
|
||||
# the space is publicly accessible
|
||||
is_externally_public = True
|
||||
|
||||
return ExternalAccess(
|
||||
external_user_emails=user_emails,
|
||||
external_user_group_ids=group_names,
|
||||
is_public=is_externally_public,
|
||||
)
|
||||
|
||||
|
||||
def _is_public_read_permission(permission: dict[str, Any]) -> bool:
|
||||
# If the permission is a public read permission, then return True
|
||||
operation = permission.get("operation", {})
|
||||
operation_value = operation.get("operation")
|
||||
anonymous_access = permission.get("anonymousAccess", False)
|
||||
return operation_value == "read" and anonymous_access
|
||||
def _get_space_permissions(
|
||||
confluence_client: DanswerConfluence,
|
||||
is_cloud: bool,
|
||||
) -> dict[str, ExternalAccess]:
|
||||
# Gets all the spaces in the Confluence instance
|
||||
get_all_spaces = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_all_spaces
|
||||
)
|
||||
all_space_keys = []
|
||||
start = 0
|
||||
while True:
|
||||
spaces_batch = get_all_spaces(start=start, limit=_REQUEST_PAGINATION_LIMIT)
|
||||
for space in spaces_batch.get("results", []):
|
||||
all_space_keys.append(space.get("key"))
|
||||
|
||||
if len(spaces_batch.get("results", [])) < _REQUEST_PAGINATION_LIMIT:
|
||||
break
|
||||
|
||||
start += len(spaces_batch.get("results", []))
|
||||
|
||||
# Gets the permissions for each space
|
||||
space_permissions_by_space_key: dict[str, ExternalAccess] = {}
|
||||
for space_key in all_space_keys:
|
||||
if is_cloud:
|
||||
space_permissions = _get_cloud_space_permissions(
|
||||
confluence_client=confluence_client, space_key=space_key
|
||||
)
|
||||
else:
|
||||
space_permissions = _get_server_space_permissions(
|
||||
confluence_client=confluence_client, space_key=space_key
|
||||
)
|
||||
|
||||
# Stores the permissions for each space
|
||||
space_permissions_by_space_key[space_key] = space_permissions
|
||||
|
||||
return space_permissions_by_space_key
|
||||
|
||||
|
||||
def _extract_read_access_restrictions(
|
||||
@ -75,50 +181,7 @@ def _extract_read_access_restrictions(
|
||||
return read_access_user_emails, read_access_group_names
|
||||
|
||||
|
||||
def _get_space_permissions(
|
||||
db_session: Session,
|
||||
confluence_client: Confluence,
|
||||
space_id: str,
|
||||
) -> ExternalAccess:
|
||||
get_space_permissions = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_space_permissions
|
||||
)
|
||||
|
||||
space_permissions_result = get_space_permissions(space_id)
|
||||
logger.debug(f"space_permissions_result: {space_permissions_result}")
|
||||
|
||||
space_permissions = space_permissions_result.get("permissions", [])
|
||||
user_emails = set()
|
||||
# Confluence enforces that group names are unique
|
||||
group_names = set()
|
||||
is_externally_public = False
|
||||
for permission in space_permissions:
|
||||
subjects = permission.get("subjects")
|
||||
if subjects:
|
||||
# If there are subjects, then there are explicit users or groups with access
|
||||
if email := _extract_user_email(subjects):
|
||||
user_emails.add(email)
|
||||
if group_name := _extract_group_name(subjects):
|
||||
group_names.add(group_name)
|
||||
else:
|
||||
# If there are no subjects, then the permission is for everyone
|
||||
if _is_public_read_permission(permission):
|
||||
# If the permission specifies read access for anonymous users, then
|
||||
# the space is publicly accessible
|
||||
is_externally_public = True
|
||||
|
||||
batch_add_non_web_user_if_not_exists__no_commit(
|
||||
db_session=db_session, emails=list(user_emails)
|
||||
)
|
||||
return ExternalAccess(
|
||||
external_user_emails=user_emails,
|
||||
external_user_group_ids=group_names,
|
||||
is_public=is_externally_public,
|
||||
)
|
||||
|
||||
|
||||
def _get_page_specific_restrictions(
|
||||
db_session: Session,
|
||||
page: dict[str, Any],
|
||||
) -> ExternalAccess | None:
|
||||
user_emails, group_names = _extract_read_access_restrictions(
|
||||
@ -131,9 +194,6 @@ def _get_page_specific_restrictions(
|
||||
if is_space_public:
|
||||
return None
|
||||
|
||||
batch_add_non_web_user_if_not_exists__no_commit(
|
||||
db_session=db_session, emails=list(user_emails)
|
||||
)
|
||||
return ExternalAccess(
|
||||
external_user_emails=set(user_emails),
|
||||
external_user_group_ids=set(group_names),
|
||||
@ -143,7 +203,7 @@ def _get_page_specific_restrictions(
|
||||
|
||||
|
||||
def _fetch_attachment_document_ids_for_page_paginated(
|
||||
confluence_client: Confluence, page: dict[str, Any]
|
||||
confluence_client: DanswerConfluence, page: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Starts by just extracting the first page of attachments from
|
||||
@ -184,11 +244,11 @@ def _fetch_attachment_document_ids_for_page_paginated(
|
||||
|
||||
|
||||
def _fetch_all_pages_paginated(
|
||||
confluence_client: Confluence,
|
||||
space_id: str,
|
||||
confluence_client: DanswerConfluence,
|
||||
cql_query: str,
|
||||
) -> list[dict[str, Any]]:
|
||||
get_all_pages_from_space = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_all_pages_from_space
|
||||
get_all_pages = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.danswer_cql
|
||||
)
|
||||
|
||||
# For each page, this fetches the page's attachments and restrictions.
|
||||
@ -196,39 +256,43 @@ def _fetch_all_pages_paginated(
|
||||
"children.attachment",
|
||||
"restrictions.read.restrictions.user",
|
||||
"restrictions.read.restrictions.group",
|
||||
"space",
|
||||
]
|
||||
expansion_string = ",".join(expansion_strings)
|
||||
|
||||
all_pages = []
|
||||
start = 0
|
||||
all_pages: list[dict[str, Any]] = []
|
||||
cursor = None
|
||||
while True:
|
||||
pages_dict = get_all_pages_from_space(
|
||||
space=space_id,
|
||||
start=start,
|
||||
limit=_REQUEST_PAGINATION_LIMIT,
|
||||
response = get_all_pages(
|
||||
cql=cql_query,
|
||||
expand=expansion_string,
|
||||
cursor=cursor,
|
||||
limit=_REQUEST_PAGINATION_LIMIT,
|
||||
)
|
||||
all_pages.extend(pages_dict)
|
||||
|
||||
response_size = len(pages_dict)
|
||||
if response_size < _REQUEST_PAGINATION_LIMIT:
|
||||
all_pages.extend(response.get("results", []))
|
||||
|
||||
# Handle pagination
|
||||
next_cursor = response.get("_links", {}).get("next", "")
|
||||
cursor = parse_qs(urlparse(next_cursor).query).get("cursor", [None])[0]
|
||||
|
||||
if not cursor:
|
||||
break
|
||||
start += response_size
|
||||
|
||||
return all_pages
|
||||
|
||||
|
||||
def _fetch_all_page_restrictions_for_space(
|
||||
db_session: Session,
|
||||
confluence_client: Confluence,
|
||||
space_id: str,
|
||||
) -> dict[str, ExternalAccess | None]:
|
||||
confluence_client: DanswerConfluence,
|
||||
cql_query: str,
|
||||
space_permissions_by_space_key: dict[str, ExternalAccess],
|
||||
) -> dict[str, ExternalAccess]:
|
||||
all_pages = _fetch_all_pages_paginated(
|
||||
confluence_client=confluence_client,
|
||||
space_id=space_id,
|
||||
cql_query=cql_query,
|
||||
)
|
||||
|
||||
document_restrictions: dict[str, ExternalAccess | None] = {}
|
||||
document_restrictions: dict[str, ExternalAccess] = {}
|
||||
for page in all_pages:
|
||||
"""
|
||||
This assigns the same permissions to all attachments of a page and
|
||||
@ -257,10 +321,17 @@ def _fetch_all_page_restrictions_for_space(
|
||||
|
||||
# Get the page's specific restrictions
|
||||
page_permissions = _get_page_specific_restrictions(
|
||||
db_session=db_session,
|
||||
page=page,
|
||||
)
|
||||
|
||||
if not page_permissions:
|
||||
# If there are no page specific restrictions,
|
||||
# the page inherits the space's restrictions
|
||||
page_permissions = space_permissions_by_space_key.get(page["space"]["key"])
|
||||
if not page_permissions:
|
||||
# If nothing is in the dict, then the space has not been indexed, so we move on
|
||||
continue
|
||||
|
||||
# Apply the page's specific restrictions to the page and its attachments
|
||||
for document_id in document_ids:
|
||||
document_restrictions[document_id] = page_permissions
|
||||
@ -268,6 +339,19 @@ def _fetch_all_page_restrictions_for_space(
|
||||
return document_restrictions
|
||||
|
||||
|
||||
def _build_cql_query_from_connector_config(
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
) -> str:
|
||||
cql_query = cc_pair.connector.connector_specific_config.get("cql_query")
|
||||
if cql_query:
|
||||
return cql_query
|
||||
|
||||
space_id = cc_pair.connector.connector_specific_config.get("space")
|
||||
if space_id:
|
||||
return f"type=page and space='{space_id}'"
|
||||
return "type=page"
|
||||
|
||||
|
||||
def confluence_doc_sync(
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
@ -279,26 +363,32 @@ def confluence_doc_sync(
|
||||
already populated
|
||||
"""
|
||||
confluence_client = build_confluence_client(
|
||||
cc_pair.connector.connector_specific_config, cc_pair.credential.credential_json
|
||||
connector_specific_config=cc_pair.connector.connector_specific_config,
|
||||
credentials_json=cc_pair.credential.credential_json,
|
||||
)
|
||||
space_permissions = _get_space_permissions(
|
||||
db_session=db_session,
|
||||
confluence_client=confluence_client,
|
||||
space_id=cc_pair.connector.connector_specific_config["space"],
|
||||
)
|
||||
fresh_doc_permissions = _fetch_all_page_restrictions_for_space(
|
||||
db_session=db_session,
|
||||
confluence_client=confluence_client,
|
||||
space_id=cc_pair.connector.connector_specific_config["space"],
|
||||
)
|
||||
for doc_id, page_specific_access in fresh_doc_permissions.items():
|
||||
# If there are no page specific restrictions, then
|
||||
# the page inherits the space's restrictions
|
||||
page_access = page_specific_access or space_permissions
|
||||
|
||||
cql_query = _build_cql_query_from_connector_config(cc_pair)
|
||||
is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False)
|
||||
|
||||
space_permissions_by_space_key = _get_space_permissions(
|
||||
confluence_client=confluence_client,
|
||||
is_cloud=is_cloud,
|
||||
)
|
||||
|
||||
permissions_by_doc_id = _fetch_all_page_restrictions_for_space(
|
||||
confluence_client=confluence_client,
|
||||
cql_query=cql_query,
|
||||
space_permissions_by_space_key=space_permissions_by_space_key,
|
||||
)
|
||||
|
||||
all_emails = set()
|
||||
for doc_id, page_specific_access in permissions_by_doc_id.items():
|
||||
upsert_document_external_perms__no_commit(
|
||||
db_session=db_session,
|
||||
doc_id=doc_id,
|
||||
external_access=page_access,
|
||||
external_access=page_specific_access,
|
||||
source_type=cc_pair.connector.source,
|
||||
)
|
||||
all_emails.update(page_specific_access.external_user_emails)
|
||||
|
||||
batch_add_non_web_user_if_not_exists__no_commit(db_session, list(all_emails))
|
||||
|
@ -12,9 +12,12 @@ from danswer.db.users import batch_add_non_web_user_if_not_exists__no_commit
|
||||
from danswer.utils.logger import setup_logger
|
||||
from ee.danswer.db.external_perm import ExternalUserGroup
|
||||
from ee.danswer.db.external_perm import replace_user__ext_group_for_cc_pair__no_commit
|
||||
from ee.danswer.external_permissions.confluence.confluence_sync_utils import (
|
||||
from ee.danswer.external_permissions.confluence.sync_utils import (
|
||||
build_confluence_client,
|
||||
)
|
||||
from ee.danswer.external_permissions.confluence.sync_utils import (
|
||||
get_user_email_from_username__server,
|
||||
)
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
@ -50,11 +53,12 @@ def _get_confluence_group_names_paginated(
|
||||
def _get_group_members_email_paginated(
|
||||
confluence_client: Confluence,
|
||||
group_name: str,
|
||||
) -> list[str]:
|
||||
is_cloud: bool,
|
||||
) -> set[str]:
|
||||
get_group_members = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_group_members
|
||||
)
|
||||
group_member_emails: list[str] = []
|
||||
group_member_emails: set[str] = set()
|
||||
start = 0
|
||||
while True:
|
||||
try:
|
||||
@ -66,12 +70,22 @@ def _get_group_members_email_paginated(
|
||||
return group_member_emails
|
||||
raise e
|
||||
|
||||
group_member_emails.extend(
|
||||
[member.get("email") for member in members if member.get("email")]
|
||||
)
|
||||
for member in members:
|
||||
if is_cloud:
|
||||
email = member.get("email")
|
||||
elif user_name := member.get("username"):
|
||||
email = get_user_email_from_username__server(
|
||||
confluence_client, user_name
|
||||
)
|
||||
|
||||
if email:
|
||||
group_member_emails.add(email)
|
||||
|
||||
if len(members) < _PAGE_SIZE:
|
||||
break
|
||||
|
||||
start += _PAGE_SIZE
|
||||
|
||||
return group_member_emails
|
||||
|
||||
|
||||
@ -80,22 +94,25 @@ def confluence_group_sync(
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
) -> None:
|
||||
confluence_client = build_confluence_client(
|
||||
cc_pair.connector.connector_specific_config, cc_pair.credential.credential_json
|
||||
connector_specific_config=cc_pair.connector.connector_specific_config,
|
||||
credentials_json=cc_pair.credential.credential_json,
|
||||
)
|
||||
|
||||
danswer_groups: list[ExternalUserGroup] = []
|
||||
is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False)
|
||||
# Confluence enforces that group names are unique
|
||||
for group_name in _get_confluence_group_names_paginated(confluence_client):
|
||||
group_member_emails = _get_group_members_email_paginated(
|
||||
confluence_client, group_name
|
||||
confluence_client, group_name, is_cloud
|
||||
)
|
||||
group_members = batch_add_non_web_user_if_not_exists__no_commit(
|
||||
db_session=db_session, emails=group_member_emails
|
||||
db_session=db_session, emails=list(group_member_emails)
|
||||
)
|
||||
if group_members:
|
||||
danswer_groups.append(
|
||||
ExternalUserGroup(
|
||||
id=group_name, user_ids=[user.id for user in group_members]
|
||||
id=group_name,
|
||||
user_ids=[user.id for user in group_members],
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -0,0 +1,40 @@
|
||||
from typing import Any
|
||||
|
||||
from danswer.connectors.confluence.connector import DanswerConfluence
|
||||
from danswer.connectors.confluence.rate_limit_handler import (
|
||||
make_confluence_call_handle_rate_limit,
|
||||
)
|
||||
|
||||
_USER_EMAIL_CACHE: dict[str, str | None] = {}
|
||||
|
||||
|
||||
def build_confluence_client(
|
||||
connector_specific_config: dict[str, Any], credentials_json: dict[str, Any]
|
||||
) -> DanswerConfluence:
|
||||
is_cloud = connector_specific_config.get("is_cloud", False)
|
||||
return DanswerConfluence(
|
||||
api_version="cloud" if is_cloud else "latest",
|
||||
# Remove trailing slash from wiki_base if present
|
||||
url=connector_specific_config["wiki_base"].rstrip("/"),
|
||||
# passing in username causes issues for Confluence data center
|
||||
username=credentials_json["confluence_username"] if is_cloud else None,
|
||||
password=credentials_json["confluence_access_token"] if is_cloud else None,
|
||||
token=credentials_json["confluence_access_token"] if not is_cloud else None,
|
||||
)
|
||||
|
||||
|
||||
def get_user_email_from_username__server(
|
||||
confluence_client: DanswerConfluence, user_name: str
|
||||
) -> str | None:
|
||||
global _USER_EMAIL_CACHE
|
||||
get_user_info = make_confluence_call_handle_rate_limit(
|
||||
confluence_client.get_mobile_parameters
|
||||
)
|
||||
if _USER_EMAIL_CACHE.get(user_name) is None:
|
||||
try:
|
||||
response = get_user_info(user_name)
|
||||
email = response.get("email")
|
||||
except Exception:
|
||||
email = None
|
||||
_USER_EMAIL_CACHE[user_name] = email
|
||||
return _USER_EMAIL_CACHE[user_name]
|
Loading…
x
Reference in New Issue
Block a user