added logging and bugfixing to conf (#3167)

* standardized escaping of CQL strings

* think i found it

* fix

* should be fixed

* added handling for special linking behavior in confluence

* Update onyx_confluence.py

* Update onyx_confluence.py

---------

Co-authored-by: rkuo-danswer <rkuo@danswer.ai>
This commit is contained in:
hagen-danswer 2024-11-20 10:40:21 -08:00 committed by GitHub
parent 645e7e828e
commit e89dcd7f84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 193 additions and 129 deletions

View File

@ -70,7 +70,7 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
) -> None: ) -> None:
self.batch_size = batch_size self.batch_size = batch_size
self.continue_on_failure = continue_on_failure self.continue_on_failure = continue_on_failure
self.confluence_client: OnyxConfluence | None = None self._confluence_client: OnyxConfluence | None = None
self.is_cloud = is_cloud self.is_cloud = is_cloud
# Remove trailing slash from wiki_base if present # Remove trailing slash from wiki_base if present
@ -97,13 +97,21 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
self.cql_label_filter = "" self.cql_label_filter = ""
if labels_to_skip: if labels_to_skip:
labels_to_skip = list(set(labels_to_skip)) labels_to_skip = list(set(labels_to_skip))
comma_separated_labels = ",".join(f"'{label}'" for label in labels_to_skip) comma_separated_labels = ",".join(
f"'{quote(label)}'" for label in labels_to_skip
)
self.cql_label_filter = f" and label not in ({comma_separated_labels})" self.cql_label_filter = f" and label not in ({comma_separated_labels})"
@property
def confluence_client(self) -> OnyxConfluence:
if self._confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
return self._confluence_client
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
# see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py # see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py
# for a list of other hidden constructor args # for a list of other hidden constructor args
self.confluence_client = build_confluence_client( self._confluence_client = build_confluence_client(
credentials_json=credentials, credentials_json=credentials,
is_cloud=self.is_cloud, is_cloud=self.is_cloud,
wiki_base=self.wiki_base, wiki_base=self.wiki_base,
@ -111,24 +119,21 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
return None return None
def _get_comment_string_for_page_id(self, page_id: str) -> str: def _get_comment_string_for_page_id(self, page_id: str) -> str:
if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
comment_string = "" comment_string = ""
comment_cql = f"type=comment and container='{page_id}'" comment_cql = f"type=comment and container='{page_id}'"
comment_cql += self.cql_label_filter comment_cql += self.cql_label_filter
expand = ",".join(_COMMENT_EXPANSION_FIELDS) expand = ",".join(_COMMENT_EXPANSION_FIELDS)
for comments in self.confluence_client.paginated_cql_page_retrieval( for comment in self.confluence_client.paginated_cql_retrieval(
cql=comment_cql, cql=comment_cql,
expand=expand, expand=expand,
): ):
for comment in comments:
comment_string += "\nComment:\n" comment_string += "\nComment:\n"
comment_string += extract_text_from_confluence_html( comment_string += extract_text_from_confluence_html(
confluence_client=self.confluence_client, confluence_client=self.confluence_client,
confluence_object=comment, confluence_object=comment,
fetched_titles=set(),
) )
return comment_string return comment_string
@ -141,9 +146,6 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
If its a page, it extracts the text, adds the comments for the document text. If its a page, it extracts the text, adds the comments for the document text.
If its an attachment, it just downloads the attachment and converts that into a document. If its an attachment, it just downloads the attachment and converts that into a document.
""" """
if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
# The url and the id are the same # The url and the id are the same
object_url = build_confluence_document_id( object_url = build_confluence_document_id(
self.wiki_base, confluence_object["_links"]["webui"], self.is_cloud self.wiki_base, confluence_object["_links"]["webui"], self.is_cloud
@ -153,16 +155,19 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
# Extract text from page # Extract text from page
if confluence_object["type"] == "page": if confluence_object["type"] == "page":
object_text = extract_text_from_confluence_html( object_text = extract_text_from_confluence_html(
self.confluence_client, confluence_object confluence_client=self.confluence_client,
confluence_object=confluence_object,
fetched_titles={confluence_object.get("title", "")},
) )
# Add comments to text # Add comments to text
object_text += self._get_comment_string_for_page_id(confluence_object["id"]) object_text += self._get_comment_string_for_page_id(confluence_object["id"])
elif confluence_object["type"] == "attachment": elif confluence_object["type"] == "attachment":
object_text = attachment_to_content( object_text = attachment_to_content(
self.confluence_client, confluence_object confluence_client=self.confluence_client, attachment=confluence_object
) )
if object_text is None: if object_text is None:
# This only happens for attachments that are not parseable
return None return None
# Get space name # Get space name
@ -193,20 +198,16 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
) )
def _fetch_document_batches(self) -> GenerateDocumentsOutput: def _fetch_document_batches(self) -> GenerateDocumentsOutput:
if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
doc_batch: list[Document] = [] doc_batch: list[Document] = []
confluence_page_ids: list[str] = [] confluence_page_ids: list[str] = []
page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter
# Fetch pages as Documents # Fetch pages as Documents
for page_batch in self.confluence_client.paginated_cql_page_retrieval( for page in self.confluence_client.paginated_cql_retrieval(
cql=page_query, cql=page_query,
expand=",".join(_PAGE_EXPANSION_FIELDS), expand=",".join(_PAGE_EXPANSION_FIELDS),
limit=self.batch_size, limit=self.batch_size,
): ):
for page in page_batch:
confluence_page_ids.append(page["id"]) confluence_page_ids.append(page["id"])
doc = self._convert_object_to_document(page) doc = self._convert_object_to_document(page)
if doc is not None: if doc is not None:
@ -220,11 +221,10 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
attachment_cql = f"type=attachment and container='{confluence_page_id}'" attachment_cql = f"type=attachment and container='{confluence_page_id}'"
attachment_cql += self.cql_label_filter attachment_cql += self.cql_label_filter
# TODO: maybe should add time filter as well? # TODO: maybe should add time filter as well?
for attachments in self.confluence_client.paginated_cql_page_retrieval( for attachment in self.confluence_client.paginated_cql_retrieval(
cql=attachment_cql, cql=attachment_cql,
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS), expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
): ):
for attachment in attachments:
doc = self._convert_object_to_document(attachment) doc = self._convert_object_to_document(attachment)
if doc is not None: if doc is not None:
doc_batch.append(doc) doc_batch.append(doc)
@ -255,19 +255,15 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
start: SecondsSinceUnixEpoch | None = None, start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None,
) -> GenerateSlimDocumentOutput: ) -> GenerateSlimDocumentOutput:
if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
doc_metadata_list: list[SlimDocument] = [] doc_metadata_list: list[SlimDocument] = []
restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS) restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS)
page_query = self.cql_page_query + self.cql_label_filter page_query = self.cql_page_query + self.cql_label_filter
for pages in self.confluence_client.cql_paginate_all_expansions( for page in self.confluence_client.cql_paginate_all_expansions(
cql=page_query, cql=page_query,
expand=restrictions_expand, expand=restrictions_expand,
): ):
for page in pages:
# If the page has restrictions, add them to the perm_sync_data # If the page has restrictions, add them to the perm_sync_data
# These will be used by doc_sync.py to sync permissions # These will be used by doc_sync.py to sync permissions
perm_sync_data = { perm_sync_data = {
@ -287,11 +283,10 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
) )
attachment_cql = f"type=attachment and container='{page['id']}'" attachment_cql = f"type=attachment and container='{page['id']}'"
attachment_cql += self.cql_label_filter attachment_cql += self.cql_label_filter
for attachments in self.confluence_client.cql_paginate_all_expansions( for attachment in self.confluence_client.cql_paginate_all_expansions(
cql=attachment_cql, cql=attachment_cql,
expand=restrictions_expand, expand=restrictions_expand,
): ):
for attachment in attachments:
doc_metadata_list.append( doc_metadata_list.append(
SlimDocument( SlimDocument(
id=build_confluence_document_id( id=build_confluence_document_id(

View File

@ -20,6 +20,10 @@ F = TypeVar("F", bound=Callable[..., Any])
RATE_LIMIT_MESSAGE_LOWERCASE = "Rate limit exceeded".lower() RATE_LIMIT_MESSAGE_LOWERCASE = "Rate limit exceeded".lower()
# https://jira.atlassian.com/browse/CONFCLOUD-76433
_PROBLEMATIC_EXPANSIONS = "body.storage.value"
_REPLACEMENT_EXPANSIONS = "body.view.value"
class ConfluenceRateLimitError(Exception): class ConfluenceRateLimitError(Exception):
pass pass
@ -141,7 +145,7 @@ class OnyxConfluence(Confluence):
def _paginate_url( def _paginate_url(
self, url_suffix: str, limit: int | None = None self, url_suffix: str, limit: int | None = None
) -> Iterator[list[dict[str, Any]]]: ) -> Iterator[dict[str, Any]]:
""" """
This will paginate through the top level query. This will paginate through the top level query.
""" """
@ -153,46 +157,43 @@ class OnyxConfluence(Confluence):
while url_suffix: while url_suffix:
try: try:
logger.debug(f"Making confluence call to {url_suffix}")
next_response = self.get(url_suffix) next_response = self.get(url_suffix)
except Exception as e: except Exception as e:
logger.exception("Error in danswer_cql: \n") logger.warning(f"Error in confluence call to {url_suffix}")
# If the problematic expansion is in the url, replace it
# with the replacement expansion and try again
# If that fails, raise the error
if _PROBLEMATIC_EXPANSIONS not in url_suffix:
logger.exception(f"Error in confluence call to {url_suffix}")
raise e raise e
yield next_response.get("results", []) logger.warning(
f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}"
" and trying again."
)
url_suffix = url_suffix.replace(
_PROBLEMATIC_EXPANSIONS,
_REPLACEMENT_EXPANSIONS,
)
continue
# yield the results individually
yield from next_response.get("results", [])
url_suffix = next_response.get("_links", {}).get("next") url_suffix = next_response.get("_links", {}).get("next")
def paginated_groups_retrieval( def paginated_cql_retrieval(
self,
limit: int | None = None,
) -> Iterator[list[dict[str, Any]]]:
return self._paginate_url("rest/api/group", limit)
def paginated_group_members_retrieval(
self,
group_name: str,
limit: int | None = None,
) -> Iterator[list[dict[str, Any]]]:
group_name = quote(group_name)
return self._paginate_url(f"rest/api/group/{group_name}/member", limit)
def paginated_cql_user_retrieval(
self, self,
cql: str, cql: str,
expand: str | None = None, expand: str | None = None,
limit: int | None = None, limit: int | None = None,
) -> Iterator[list[dict[str, Any]]]: ) -> Iterator[dict[str, Any]]:
"""
The content/search endpoint can be used to fetch pages, attachments, and comments.
"""
expand_string = f"&expand={expand}" if expand else "" expand_string = f"&expand={expand}" if expand else ""
return self._paginate_url( yield from self._paginate_url(
f"rest/api/search/user?cql={cql}{expand_string}", limit
)
def paginated_cql_page_retrieval(
self,
cql: str,
expand: str | None = None,
limit: int | None = None,
) -> Iterator[list[dict[str, Any]]]:
expand_string = f"&expand={expand}" if expand else ""
return self._paginate_url(
f"rest/api/content/search?cql={cql}{expand_string}", limit f"rest/api/content/search?cql={cql}{expand_string}", limit
) )
@ -201,7 +202,7 @@ class OnyxConfluence(Confluence):
cql: str, cql: str,
expand: str | None = None, expand: str | None = None,
limit: int | None = None, limit: int | None = None,
) -> Iterator[list[dict[str, Any]]]: ) -> Iterator[dict[str, Any]]:
""" """
This function will paginate through the top level query first, then This function will paginate through the top level query first, then
paginate through all of the expansions. paginate through all of the expansions.
@ -221,6 +222,44 @@ class OnyxConfluence(Confluence):
for item in data: for item in data:
_traverse_and_update(item) _traverse_and_update(item)
for results in self.paginated_cql_page_retrieval(cql, expand, limit): for confluence_object in self.paginated_cql_retrieval(cql, expand, limit):
_traverse_and_update(results) _traverse_and_update(confluence_object)
yield results yield confluence_object
def paginated_cql_user_retrieval(
self,
cql: str,
expand: str | None = None,
limit: int | None = None,
) -> Iterator[dict[str, Any]]:
"""
The search/user endpoint can be used to fetch users.
It's a seperate endpoint from the content/search endpoint used only for users.
Otherwise it's very similar to the content/search endpoint.
"""
expand_string = f"&expand={expand}" if expand else ""
yield from self._paginate_url(
f"rest/api/search/user?cql={cql}{expand_string}", limit
)
def paginated_groups_retrieval(
self,
limit: int | None = None,
) -> Iterator[dict[str, Any]]:
"""
This is not an SQL like query.
It's a confluence specific endpoint that can be used to fetch groups.
"""
yield from self._paginate_url("rest/api/group", limit)
def paginated_group_members_retrieval(
self,
group_name: str,
limit: int | None = None,
) -> Iterator[dict[str, Any]]:
"""
This is not an SQL like query.
It's a confluence specific endpoint that can be used to fetch the members of a group.
"""
group_name = quote(group_name)
yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit)

View File

@ -2,6 +2,7 @@ import io
from datetime import datetime from datetime import datetime
from datetime import timezone from datetime import timezone
from typing import Any from typing import Any
from urllib.parse import quote
import bs4 import bs4
@ -71,7 +72,9 @@ def _get_user(confluence_client: OnyxConfluence, user_id: str) -> str:
def extract_text_from_confluence_html( def extract_text_from_confluence_html(
confluence_client: OnyxConfluence, confluence_object: dict[str, Any] confluence_client: OnyxConfluence,
confluence_object: dict[str, Any],
fetched_titles: set[str],
) -> str: ) -> str:
"""Parse a Confluence html page and replace the 'user Id' by the real """Parse a Confluence html page and replace the 'user Id' by the real
User Display Name User Display Name
@ -79,7 +82,7 @@ def extract_text_from_confluence_html(
Args: Args:
confluence_object (dict): The confluence object as a dict confluence_object (dict): The confluence object as a dict
confluence_client (Confluence): Confluence client confluence_client (Confluence): Confluence client
fetched_titles (set[str]): The titles of the pages that have already been fetched
Returns: Returns:
str: loaded and formated Confluence page str: loaded and formated Confluence page
""" """
@ -101,38 +104,72 @@ def extract_text_from_confluence_html(
# Include @ sign for tagging, more clear for LLM # Include @ sign for tagging, more clear for LLM
user.replaceWith("@" + _get_user(confluence_client, user_id)) user.replaceWith("@" + _get_user(confluence_client, user_id))
for html_page_reference in soup.findAll("ri:page"): for html_page_reference in soup.findAll("ac:structured-macro"):
# Wrap this in a try-except because there are some pages that might not exist # Here, we only want to process page within page macros
try: if html_page_reference.attrs.get("ac:name") != "include":
page_title = html_page_reference.attrs["ri:content-title"]
if not page_title:
continue continue
page_query = f"type=page and title='{page_title}'" page_data = html_page_reference.find("ri:page")
if not page_data:
logger.warning(
f"Skipping retrieval of {html_page_reference} because because page data is missing"
)
continue
page_title = page_data.attrs.get("ri:content-title")
if not page_title:
# only fetch pages that have a title
logger.warning(
f"Skipping retrieval of {html_page_reference} because it has no title"
)
continue
if page_title in fetched_titles:
# prevent recursive fetching of pages
logger.debug(f"Skipping {page_title} because it has already been fetched")
continue
fetched_titles.add(page_title)
# Wrap this in a try-except because there are some pages that might not exist
try:
page_query = f"type=page and title='{quote(page_title)}'"
page_contents: dict[str, Any] | None = None page_contents: dict[str, Any] | None = None
# Confluence enforces title uniqueness, so we should only get one result here # Confluence enforces title uniqueness, so we should only get one result here
for page_batch in confluence_client.paginated_cql_page_retrieval( for page in confluence_client.paginated_cql_retrieval(
cql=page_query, cql=page_query,
expand="body.storage.value", expand="body.storage.value",
limit=1, limit=1,
): ):
page_contents = page_batch[0] page_contents = page
break break
except Exception: except Exception as e:
logger.warning( logger.warning(
f"Error getting page contents for object {confluence_object}" f"Error getting page contents for object {confluence_object}: {e}"
) )
continue continue
if not page_contents: if not page_contents:
continue continue
text_from_page = extract_text_from_confluence_html( text_from_page = extract_text_from_confluence_html(
confluence_client, page_contents confluence_client=confluence_client,
confluence_object=page_contents,
fetched_titles=fetched_titles,
) )
html_page_reference.replaceWith(text_from_page) html_page_reference.replaceWith(text_from_page)
for html_link_body in soup.findAll("ac:link-body"):
# This extracts the text from inline links in the page so they can be
# represented in the document text as plain text
try:
text_from_link = html_link_body.text
html_link_body.replaceWith(f"(LINK TEXT: {text_from_link})")
except Exception as e:
logger.warning(f"Error processing ac:link-body: {e}")
return format_document_soup(soup) return format_document_soup(soup)

View File

@ -1,5 +1,3 @@
from typing import Any
from danswer.connectors.confluence.onyx_confluence import OnyxConfluence from danswer.connectors.confluence.onyx_confluence import OnyxConfluence
from danswer.connectors.confluence.utils import build_confluence_client from danswer.connectors.confluence.utils import build_confluence_client
from danswer.connectors.confluence.utils import get_user_email_from_username__server from danswer.connectors.confluence.utils import get_user_email_from_username__server
@ -15,12 +13,8 @@ def _get_group_members_email_paginated(
confluence_client: OnyxConfluence, confluence_client: OnyxConfluence,
group_name: str, group_name: str,
) -> set[str]: ) -> set[str]:
members: list[dict[str, Any]] = []
for member_batch in confluence_client.paginated_group_members_retrieval(group_name):
members.extend(member_batch)
group_member_emails: set[str] = set() group_member_emails: set[str] = set()
for member in members: for member in confluence_client.paginated_group_members_retrieval(group_name):
email = member.get("email") email = member.get("email")
if not email: if not email:
user_name = member.get("username") user_name = member.get("username")
@ -47,8 +41,7 @@ def confluence_group_sync(
# Get all group names # Get all group names
group_names: list[str] = [] group_names: list[str] = []
for group_batch in confluence_client.paginated_groups_retrieval(): for group in confluence_client.paginated_groups_retrieval():
for group in group_batch:
if group_name := group.get("name"): if group_name := group.get("name"):
group_names.append(group_name) group_names.append(group_name)