diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index d6814ab97..1c348df6a 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -37,16 +37,18 @@ from danswer.utils.logger import setup_logger logger = setup_logger() # Potential Improvements -# 1. If wiki page instead of space, do a search of all the children of the page instead of index all in the space -# 2. Include attachments, etc -# 3. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost +# 1. Include attachments, etc +# 2. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost -def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str]: +def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str, str]: """Sample - https://danswer.atlassian.net/wiki/spaces/1234abcd/overview + URL w/ page: https://danswer.atlassian.net/wiki/spaces/1234abcd/pages/5678efgh/overview + URL w/o page: https://danswer.atlassian.net/wiki/spaces/ASAM/overview + wiki_base is https://danswer.atlassian.net/wiki space is 1234abcd + page_id is 5678efgh """ parsed_url = urlparse(wiki_url) wiki_base = ( @@ -55,18 +57,25 @@ def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str]: + parsed_url.netloc + parsed_url.path.split("/spaces")[0] ) - space = parsed_url.path.split("/")[3] - return wiki_base, space + + path_parts = parsed_url.path.split("/") + space = path_parts[3] + + page_id = path_parts[5] if len(path_parts) > 5 else "" + return wiki_base, space, page_id -def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, str]: +def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, str, str]: """Sample - https://danswer.ai/confluence/display/1234abcd/overview + URL w/ page https://danswer.ai/confluence/display/1234abcd/pages/5678efgh/overview + URL w/o page https://danswer.ai/confluence/display/1234abcd/overview wiki_base is https://danswer.ai/confluence space is 1234abcd + page_id is 5678efgh """ - # /display/ is always right before the space and at the end of the base url + # /display/ is always right before the space and at the end of the base print() DISPLAY = "/display/" + PAGE = "/pages/" parsed_url = urlparse(wiki_url) wiki_base = ( @@ -76,10 +85,13 @@ def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, st + parsed_url.path.split(DISPLAY)[0] ) space = DISPLAY.join(parsed_url.path.split(DISPLAY)[1:]).split("/")[0] - return wiki_base, space + page_id = "" + if (content := parsed_url.path.split(PAGE)) and len(content) > 1: + page_id = content[1] + return wiki_base, space, page_id -def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, bool]: +def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, str, bool]: is_confluence_cloud = ( ".atlassian.net/wiki/spaces/" in wiki_url or ".jira.com/wiki/spaces/" in wiki_url @@ -87,15 +99,19 @@ def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, bool]: try: if is_confluence_cloud: - wiki_base, space = _extract_confluence_keys_from_cloud_url(wiki_url) + wiki_base, space, page_id = _extract_confluence_keys_from_cloud_url( + wiki_url + ) else: - wiki_base, space = _extract_confluence_keys_from_datacenter_url(wiki_url) + wiki_base, space, page_id = _extract_confluence_keys_from_datacenter_url( + wiki_url + ) except Exception as e: - error_msg = f"Not a valid Confluence Wiki Link, unable to extract wiki base and space names. Exception: {e}" + error_msg = f"Not a valid Confluence Wiki Link, unable to extract wiki base, space, and page id. Exception: {e}" logger.error(error_msg) raise ValueError(error_msg) - return wiki_base, space, is_confluence_cloud + return wiki_base, space, page_id, is_confluence_cloud @lru_cache() @@ -196,10 +212,135 @@ def _comment_dfs( return comments_str +class RecursiveIndexer: + def __init__( + self, + batch_size: int, + confluence_client: Confluence, + index_origin: bool, + origin_page_id: str, + ) -> None: + self.batch_size = 1 + # batch_size + self.confluence_client = confluence_client + self.index_origin = index_origin + self.origin_page_id = origin_page_id + self.pages = self.recurse_children_pages(0, self.origin_page_id) + + def get_pages(self, ind: int, size: int) -> list[dict]: + if ind * size > len(self.pages): + return [] + return self.pages[ind * size : (ind + 1) * size] + + def _fetch_origin_page( + self, + ) -> dict[str, Any]: + get_page_by_id = make_confluence_call_handle_rate_limit( + self.confluence_client.get_page_by_id + ) + try: + origin_page = get_page_by_id( + self.origin_page_id, expand="body.storage.value,version" + ) + return origin_page + except Exception as e: + logger.warning( + f"Appending orgin page with id {self.origin_page_id} failed: {e}" + ) + return {} + + def recurse_children_pages( + self, + start_ind: int, + page_id: str, + ) -> list[dict[str, Any]]: + pages: list[dict[str, Any]] = [] + current_level_pages: list[dict[str, Any]] = [] + next_level_pages: list[dict[str, Any]] = [] + + # Initial fetch of first level children + index = start_ind + while batch := self._fetch_single_depth_child_pages( + index, self.batch_size, page_id + ): + current_level_pages.extend(batch) + index += len(batch) + + pages.extend(current_level_pages) + + # Recursively index children and children's children, etc. + while current_level_pages: + for child in current_level_pages: + child_index = 0 + while child_batch := self._fetch_single_depth_child_pages( + child_index, self.batch_size, child["id"] + ): + next_level_pages.extend(child_batch) + child_index += len(child_batch) + + pages.extend(next_level_pages) + current_level_pages = next_level_pages + next_level_pages = [] + + if self.index_origin: + try: + origin_page = self._fetch_origin_page() + pages.append(origin_page) + except Exception as e: + logger.warning(f"Appending origin page with id {page_id} failed: {e}") + + return pages + + def _fetch_single_depth_child_pages( + self, start_ind: int, batch_size: int, page_id: str + ) -> list[dict[str, Any]]: + child_pages: list[dict[str, Any]] = [] + + get_page_child_by_type = make_confluence_call_handle_rate_limit( + self.confluence_client.get_page_child_by_type + ) + + try: + child_page = get_page_child_by_type( + page_id, + type="page", + start=start_ind, + limit=batch_size, + expand="body.storage.value,version", + ) + + child_pages.extend(child_page) + return child_pages + + except Exception: + logger.warning( + f"Batch failed with page {page_id} at offset {start_ind} " + f"with size {batch_size}, processing pages individually..." + ) + + for i in range(batch_size): + ind = start_ind + i + try: + child_page = get_page_child_by_type( + page_id, + type="page", + start=ind, + limit=1, + expand="body.storage.value,version", + ) + child_pages.extend(child_page) + except Exception as e: + logger.warning(f"Page {page_id} at offset {ind} failed: {e}") + raise e + + return child_pages + + class ConfluenceConnector(LoadConnector, PollConnector): def __init__( self, wiki_page_url: str, + index_origin: bool = True, batch_size: int = INDEX_BATCH_SIZE, continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE, # if a page has one of the labels specified in this list, we will just @@ -210,11 +351,27 @@ class ConfluenceConnector(LoadConnector, PollConnector): self.batch_size = batch_size self.continue_on_failure = continue_on_failure self.labels_to_skip = set(labels_to_skip) - self.wiki_base, self.space, self.is_cloud = extract_confluence_keys_from_url( - wiki_page_url - ) + self.recursive_indexer: RecursiveIndexer | None = None + self.index_origin = index_origin + ( + self.wiki_base, + self.space, + self.page_id, + self.is_cloud, + ) = extract_confluence_keys_from_url(wiki_page_url) + + self.space_level_scan = False + self.confluence_client: Confluence | None = None + if self.page_id is None or self.page_id == "": + self.space_level_scan = True + + logger.info( + f"wiki_base: {self.wiki_base}, space: {self.space}, page_id: {self.page_id}," + + f" space_level_scan: {self.space_level_scan}, origin: {self.index_origin}" + ) + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: username = credentials["confluence_username"] access_token = credentials["confluence_access_token"] @@ -232,8 +389,8 @@ class ConfluenceConnector(LoadConnector, PollConnector): self, confluence_client: Confluence, start_ind: int, - ) -> Collection[dict[str, Any]]: - def _fetch(start_ind: int, batch_size: int) -> Collection[dict[str, Any]]: + ) -> list[dict[str, Any]]: + def _fetch_space(start_ind: int, batch_size: int) -> list[dict[str, Any]]: get_all_pages_from_space = make_confluence_call_handle_rate_limit( confluence_client.get_all_pages_from_space ) @@ -242,9 +399,11 @@ class ConfluenceConnector(LoadConnector, PollConnector): self.space, start=start_ind, limit=batch_size, - status="current" - if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES - else None, + status=( + "current" + if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES + else None + ), expand="body.storage.value,version", ) except Exception: @@ -263,9 +422,11 @@ class ConfluenceConnector(LoadConnector, PollConnector): self.space, start=start_ind + i, limit=1, - status="current" - if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES - else None, + status=( + "current" + if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES + else None + ), expand="body.storage.value,version", ) ) @@ -286,17 +447,41 @@ class ConfluenceConnector(LoadConnector, PollConnector): return view_pages + def _fetch_page(start_ind: int, batch_size: int) -> list[dict[str, Any]]: + if self.recursive_indexer is None: + self.recursive_indexer = RecursiveIndexer( + origin_page_id=self.page_id, + batch_size=self.batch_size, + confluence_client=self.confluence_client, + index_origin=self.index_origin, + ) + + return self.recursive_indexer.get_pages(start_ind, batch_size) + + pages: list[dict[str, Any]] = [] + try: - return _fetch(start_ind, self.batch_size) + pages = ( + _fetch_space(start_ind, self.batch_size) + if self.space_level_scan + else _fetch_page(start_ind, self.batch_size) + ) + return pages + except Exception as e: if not self.continue_on_failure: raise e # error checking phase, only reachable if `self.continue_on_failure=True` - pages: list[dict[str, Any]] = [] for i in range(self.batch_size): try: - pages.extend(_fetch(start_ind + i, 1)) + pages = ( + _fetch_space(start_ind, self.batch_size) + if self.space_level_scan + else _fetch_page(start_ind, self.batch_size) + ) + return pages + except Exception: logger.exception( "Ran into exception when fetching pages from Confluence" @@ -308,6 +493,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): get_page_child_by_type = make_confluence_call_handle_rate_limit( confluence_client.get_page_child_by_type ) + try: comment_pages = cast( Collection[dict[str, Any]], @@ -356,7 +542,14 @@ class ConfluenceConnector(LoadConnector, PollConnector): page_id, start=0, limit=500 ) for attachment in attachments_container["results"]: - if attachment["metadata"]["mediaType"] in ["image/jpeg", "image/png"]: + if attachment["metadata"]["mediaType"] in [ + "image/jpeg", + "image/png", + "image/gif", + "image/svg+xml", + "video/mp4", + "video/quicktime", + ]: continue if attachment["title"] not in files_in_used: @@ -367,9 +560,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): if response.status_code == 200: extract = extract_file_text( - attachment["title"], - io.BytesIO(response.content), - break_on_unprocessable=False, + attachment["title"], io.BytesIO(response.content), False ) files_attachment_content.append(extract) @@ -389,8 +580,8 @@ class ConfluenceConnector(LoadConnector, PollConnector): if self.confluence_client is None: raise ConnectorMissingCredentialError("Confluence") - batch = self._fetch_pages(self.confluence_client, start_ind) + for page in batch: last_modified_str = page["version"]["when"] author = cast(str | None, page["version"].get("by", {}).get("email")) @@ -405,6 +596,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): if time_filter is None or time_filter(last_modified): page_id = page["id"] + if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING: page_labels = self._fetch_labels(self.confluence_client, page_id) @@ -416,6 +608,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): f"Page with ID '{page_id}' has a label which has been " f"designated as disallowed: {label_intersection}. Skipping." ) + continue page_html = ( @@ -436,7 +629,6 @@ class ConfluenceConnector(LoadConnector, PollConnector): page_text += attachment_text comments_text = self._fetch_comments(self.confluence_client, page_id) page_text += comments_text - doc_metadata: dict[str, str | list[str]] = { "Wiki Space Name": self.space } @@ -450,9 +642,9 @@ class ConfluenceConnector(LoadConnector, PollConnector): source=DocumentSource.CONFLUENCE, semantic_identifier=page["title"], doc_updated_at=last_modified, - primary_owners=[BasicExpertInfo(email=author)] - if author - else None, + primary_owners=( + [BasicExpertInfo(email=author)] if author else None + ), metadata=doc_metadata, ) ) diff --git a/web/src/app/admin/connectors/confluence/page.tsx b/web/src/app/admin/connectors/confluence/page.tsx index 25c69cdbd..981d63e95 100644 --- a/web/src/app/admin/connectors/confluence/page.tsx +++ b/web/src/app/admin/connectors/confluence/page.tsx @@ -2,7 +2,10 @@ import * as Yup from "yup"; import { ConfluenceIcon, TrashIcon } from "@/components/icons/icons"; -import { TextFormField } from "@/components/admin/connectors/Field"; +import { + BooleanFormField, + TextFormField, +} from "@/components/admin/connectors/Field"; import { HealthCheckBanner } from "@/components/health/healthcheck"; import { CredentialForm } from "@/components/admin/connectors/CredentialForm"; import { @@ -207,13 +210,17 @@ const Main = () => {
Specify any link to a Confluence page below and click "Index" to Index. Based on the provided link, we will - index the ENTIRE SPACE, not just the specified page. For example, - entering{" "} + index either the entire page and its subpages OR the entire space. + For example, entering{" "} https://danswer.atlassian.net/wiki/spaces/Engineering/overview {" "} and clicking the Index button will index the whole{" "} - Engineering Confluence space. + Engineering Confluence space, but entering + https://danswer.atlassian.net/wiki/spaces/Engineering/pages/164331/example+page + will index that page's children (and optionally, itself). Use + the checkbox below to determine whether or not to index the parent + page in addition to its children.
{confluenceConnectorIndexingStatuses.length > 0 && ( @@ -274,9 +281,8 @@ const Main = () => {