Confluence Connector to Sync Child pages only (#1629)

---------

Co-authored-by: Varun Gaur <vgaur@roku.com>
Co-authored-by: hagen-danswer <hagen@danswer.ai>
Co-authored-by: pablodanswer <pablo@danswer.ai>
This commit is contained in:
Varun Gaur 2024-07-10 16:17:03 -05:00 committed by GitHub
parent 09a11b5e1a
commit 6c51f001dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 251 additions and 46 deletions

View File

@ -37,16 +37,18 @@ from danswer.utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
# Potential Improvements # Potential Improvements
# 1. If wiki page instead of space, do a search of all the children of the page instead of index all in the space # 1. Include attachments, etc
# 2. Include attachments, etc # 2. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost
# 3. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost
def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str]: def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str, str]:
"""Sample """Sample
https://danswer.atlassian.net/wiki/spaces/1234abcd/overview URL w/ page: https://danswer.atlassian.net/wiki/spaces/1234abcd/pages/5678efgh/overview
URL w/o page: https://danswer.atlassian.net/wiki/spaces/ASAM/overview
wiki_base is https://danswer.atlassian.net/wiki wiki_base is https://danswer.atlassian.net/wiki
space is 1234abcd space is 1234abcd
page_id is 5678efgh
""" """
parsed_url = urlparse(wiki_url) parsed_url = urlparse(wiki_url)
wiki_base = ( wiki_base = (
@ -55,18 +57,25 @@ def _extract_confluence_keys_from_cloud_url(wiki_url: str) -> tuple[str, str]:
+ parsed_url.netloc + parsed_url.netloc
+ parsed_url.path.split("/spaces")[0] + parsed_url.path.split("/spaces")[0]
) )
space = parsed_url.path.split("/")[3]
return wiki_base, space path_parts = parsed_url.path.split("/")
space = path_parts[3]
page_id = path_parts[5] if len(path_parts) > 5 else ""
return wiki_base, space, page_id
def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, str]: def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, str, str]:
"""Sample """Sample
https://danswer.ai/confluence/display/1234abcd/overview URL w/ page https://danswer.ai/confluence/display/1234abcd/pages/5678efgh/overview
URL w/o page https://danswer.ai/confluence/display/1234abcd/overview
wiki_base is https://danswer.ai/confluence wiki_base is https://danswer.ai/confluence
space is 1234abcd space is 1234abcd
page_id is 5678efgh
""" """
# /display/ is always right before the space and at the end of the base url # /display/ is always right before the space and at the end of the base print()
DISPLAY = "/display/" DISPLAY = "/display/"
PAGE = "/pages/"
parsed_url = urlparse(wiki_url) parsed_url = urlparse(wiki_url)
wiki_base = ( wiki_base = (
@ -76,10 +85,13 @@ def _extract_confluence_keys_from_datacenter_url(wiki_url: str) -> tuple[str, st
+ parsed_url.path.split(DISPLAY)[0] + parsed_url.path.split(DISPLAY)[0]
) )
space = DISPLAY.join(parsed_url.path.split(DISPLAY)[1:]).split("/")[0] space = DISPLAY.join(parsed_url.path.split(DISPLAY)[1:]).split("/")[0]
return wiki_base, space page_id = ""
if (content := parsed_url.path.split(PAGE)) and len(content) > 1:
page_id = content[1]
return wiki_base, space, page_id
def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, bool]: def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, str, bool]:
is_confluence_cloud = ( is_confluence_cloud = (
".atlassian.net/wiki/spaces/" in wiki_url ".atlassian.net/wiki/spaces/" in wiki_url
or ".jira.com/wiki/spaces/" in wiki_url or ".jira.com/wiki/spaces/" in wiki_url
@ -87,15 +99,19 @@ def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str, bool]:
try: try:
if is_confluence_cloud: if is_confluence_cloud:
wiki_base, space = _extract_confluence_keys_from_cloud_url(wiki_url) wiki_base, space, page_id = _extract_confluence_keys_from_cloud_url(
wiki_url
)
else: else:
wiki_base, space = _extract_confluence_keys_from_datacenter_url(wiki_url) wiki_base, space, page_id = _extract_confluence_keys_from_datacenter_url(
wiki_url
)
except Exception as e: except Exception as e:
error_msg = f"Not a valid Confluence Wiki Link, unable to extract wiki base and space names. Exception: {e}" error_msg = f"Not a valid Confluence Wiki Link, unable to extract wiki base, space, and page id. Exception: {e}"
logger.error(error_msg) logger.error(error_msg)
raise ValueError(error_msg) raise ValueError(error_msg)
return wiki_base, space, is_confluence_cloud return wiki_base, space, page_id, is_confluence_cloud
@lru_cache() @lru_cache()
@ -196,10 +212,135 @@ def _comment_dfs(
return comments_str return comments_str
class RecursiveIndexer:
def __init__(
self,
batch_size: int,
confluence_client: Confluence,
index_origin: bool,
origin_page_id: str,
) -> None:
self.batch_size = 1
# batch_size
self.confluence_client = confluence_client
self.index_origin = index_origin
self.origin_page_id = origin_page_id
self.pages = self.recurse_children_pages(0, self.origin_page_id)
def get_pages(self, ind: int, size: int) -> list[dict]:
if ind * size > len(self.pages):
return []
return self.pages[ind * size : (ind + 1) * size]
def _fetch_origin_page(
self,
) -> dict[str, Any]:
get_page_by_id = make_confluence_call_handle_rate_limit(
self.confluence_client.get_page_by_id
)
try:
origin_page = get_page_by_id(
self.origin_page_id, expand="body.storage.value,version"
)
return origin_page
except Exception as e:
logger.warning(
f"Appending orgin page with id {self.origin_page_id} failed: {e}"
)
return {}
def recurse_children_pages(
self,
start_ind: int,
page_id: str,
) -> list[dict[str, Any]]:
pages: list[dict[str, Any]] = []
current_level_pages: list[dict[str, Any]] = []
next_level_pages: list[dict[str, Any]] = []
# Initial fetch of first level children
index = start_ind
while batch := self._fetch_single_depth_child_pages(
index, self.batch_size, page_id
):
current_level_pages.extend(batch)
index += len(batch)
pages.extend(current_level_pages)
# Recursively index children and children's children, etc.
while current_level_pages:
for child in current_level_pages:
child_index = 0
while child_batch := self._fetch_single_depth_child_pages(
child_index, self.batch_size, child["id"]
):
next_level_pages.extend(child_batch)
child_index += len(child_batch)
pages.extend(next_level_pages)
current_level_pages = next_level_pages
next_level_pages = []
if self.index_origin:
try:
origin_page = self._fetch_origin_page()
pages.append(origin_page)
except Exception as e:
logger.warning(f"Appending origin page with id {page_id} failed: {e}")
return pages
def _fetch_single_depth_child_pages(
self, start_ind: int, batch_size: int, page_id: str
) -> list[dict[str, Any]]:
child_pages: list[dict[str, Any]] = []
get_page_child_by_type = make_confluence_call_handle_rate_limit(
self.confluence_client.get_page_child_by_type
)
try:
child_page = get_page_child_by_type(
page_id,
type="page",
start=start_ind,
limit=batch_size,
expand="body.storage.value,version",
)
child_pages.extend(child_page)
return child_pages
except Exception:
logger.warning(
f"Batch failed with page {page_id} at offset {start_ind} "
f"with size {batch_size}, processing pages individually..."
)
for i in range(batch_size):
ind = start_ind + i
try:
child_page = get_page_child_by_type(
page_id,
type="page",
start=ind,
limit=1,
expand="body.storage.value,version",
)
child_pages.extend(child_page)
except Exception as e:
logger.warning(f"Page {page_id} at offset {ind} failed: {e}")
raise e
return child_pages
class ConfluenceConnector(LoadConnector, PollConnector): class ConfluenceConnector(LoadConnector, PollConnector):
def __init__( def __init__(
self, self,
wiki_page_url: str, wiki_page_url: str,
index_origin: bool = True,
batch_size: int = INDEX_BATCH_SIZE, batch_size: int = INDEX_BATCH_SIZE,
continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE, continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE,
# if a page has one of the labels specified in this list, we will just # if a page has one of the labels specified in this list, we will just
@ -210,11 +351,27 @@ class ConfluenceConnector(LoadConnector, PollConnector):
self.batch_size = batch_size self.batch_size = batch_size
self.continue_on_failure = continue_on_failure self.continue_on_failure = continue_on_failure
self.labels_to_skip = set(labels_to_skip) self.labels_to_skip = set(labels_to_skip)
self.wiki_base, self.space, self.is_cloud = extract_confluence_keys_from_url( self.recursive_indexer: RecursiveIndexer | None = None
wiki_page_url self.index_origin = index_origin
) (
self.wiki_base,
self.space,
self.page_id,
self.is_cloud,
) = extract_confluence_keys_from_url(wiki_page_url)
self.space_level_scan = False
self.confluence_client: Confluence | None = None self.confluence_client: Confluence | None = None
if self.page_id is None or self.page_id == "":
self.space_level_scan = True
logger.info(
f"wiki_base: {self.wiki_base}, space: {self.space}, page_id: {self.page_id},"
+ f" space_level_scan: {self.space_level_scan}, origin: {self.index_origin}"
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
username = credentials["confluence_username"] username = credentials["confluence_username"]
access_token = credentials["confluence_access_token"] access_token = credentials["confluence_access_token"]
@ -232,8 +389,8 @@ class ConfluenceConnector(LoadConnector, PollConnector):
self, self,
confluence_client: Confluence, confluence_client: Confluence,
start_ind: int, start_ind: int,
) -> Collection[dict[str, Any]]: ) -> list[dict[str, Any]]:
def _fetch(start_ind: int, batch_size: int) -> Collection[dict[str, Any]]: def _fetch_space(start_ind: int, batch_size: int) -> list[dict[str, Any]]:
get_all_pages_from_space = make_confluence_call_handle_rate_limit( get_all_pages_from_space = make_confluence_call_handle_rate_limit(
confluence_client.get_all_pages_from_space confluence_client.get_all_pages_from_space
) )
@ -242,9 +399,11 @@ class ConfluenceConnector(LoadConnector, PollConnector):
self.space, self.space,
start=start_ind, start=start_ind,
limit=batch_size, limit=batch_size,
status="current" status=(
if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES "current"
else None, if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES
else None
),
expand="body.storage.value,version", expand="body.storage.value,version",
) )
except Exception: except Exception:
@ -263,9 +422,11 @@ class ConfluenceConnector(LoadConnector, PollConnector):
self.space, self.space,
start=start_ind + i, start=start_ind + i,
limit=1, limit=1,
status="current" status=(
if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES "current"
else None, if CONFLUENCE_CONNECTOR_INDEX_ONLY_ACTIVE_PAGES
else None
),
expand="body.storage.value,version", expand="body.storage.value,version",
) )
) )
@ -286,17 +447,41 @@ class ConfluenceConnector(LoadConnector, PollConnector):
return view_pages return view_pages
def _fetch_page(start_ind: int, batch_size: int) -> list[dict[str, Any]]:
if self.recursive_indexer is None:
self.recursive_indexer = RecursiveIndexer(
origin_page_id=self.page_id,
batch_size=self.batch_size,
confluence_client=self.confluence_client,
index_origin=self.index_origin,
)
return self.recursive_indexer.get_pages(start_ind, batch_size)
pages: list[dict[str, Any]] = []
try: try:
return _fetch(start_ind, self.batch_size) pages = (
_fetch_space(start_ind, self.batch_size)
if self.space_level_scan
else _fetch_page(start_ind, self.batch_size)
)
return pages
except Exception as e: except Exception as e:
if not self.continue_on_failure: if not self.continue_on_failure:
raise e raise e
# error checking phase, only reachable if `self.continue_on_failure=True` # error checking phase, only reachable if `self.continue_on_failure=True`
pages: list[dict[str, Any]] = []
for i in range(self.batch_size): for i in range(self.batch_size):
try: try:
pages.extend(_fetch(start_ind + i, 1)) pages = (
_fetch_space(start_ind, self.batch_size)
if self.space_level_scan
else _fetch_page(start_ind, self.batch_size)
)
return pages
except Exception: except Exception:
logger.exception( logger.exception(
"Ran into exception when fetching pages from Confluence" "Ran into exception when fetching pages from Confluence"
@ -308,6 +493,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
get_page_child_by_type = make_confluence_call_handle_rate_limit( get_page_child_by_type = make_confluence_call_handle_rate_limit(
confluence_client.get_page_child_by_type confluence_client.get_page_child_by_type
) )
try: try:
comment_pages = cast( comment_pages = cast(
Collection[dict[str, Any]], Collection[dict[str, Any]],
@ -356,7 +542,14 @@ class ConfluenceConnector(LoadConnector, PollConnector):
page_id, start=0, limit=500 page_id, start=0, limit=500
) )
for attachment in attachments_container["results"]: for attachment in attachments_container["results"]:
if attachment["metadata"]["mediaType"] in ["image/jpeg", "image/png"]: if attachment["metadata"]["mediaType"] in [
"image/jpeg",
"image/png",
"image/gif",
"image/svg+xml",
"video/mp4",
"video/quicktime",
]:
continue continue
if attachment["title"] not in files_in_used: if attachment["title"] not in files_in_used:
@ -367,9 +560,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
if response.status_code == 200: if response.status_code == 200:
extract = extract_file_text( extract = extract_file_text(
attachment["title"], attachment["title"], io.BytesIO(response.content), False
io.BytesIO(response.content),
break_on_unprocessable=False,
) )
files_attachment_content.append(extract) files_attachment_content.append(extract)
@ -389,8 +580,8 @@ class ConfluenceConnector(LoadConnector, PollConnector):
if self.confluence_client is None: if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence") raise ConnectorMissingCredentialError("Confluence")
batch = self._fetch_pages(self.confluence_client, start_ind) batch = self._fetch_pages(self.confluence_client, start_ind)
for page in batch: for page in batch:
last_modified_str = page["version"]["when"] last_modified_str = page["version"]["when"]
author = cast(str | None, page["version"].get("by", {}).get("email")) author = cast(str | None, page["version"].get("by", {}).get("email"))
@ -405,6 +596,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
if time_filter is None or time_filter(last_modified): if time_filter is None or time_filter(last_modified):
page_id = page["id"] page_id = page["id"]
if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING: if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING:
page_labels = self._fetch_labels(self.confluence_client, page_id) page_labels = self._fetch_labels(self.confluence_client, page_id)
@ -416,6 +608,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
f"Page with ID '{page_id}' has a label which has been " f"Page with ID '{page_id}' has a label which has been "
f"designated as disallowed: {label_intersection}. Skipping." f"designated as disallowed: {label_intersection}. Skipping."
) )
continue continue
page_html = ( page_html = (
@ -436,7 +629,6 @@ class ConfluenceConnector(LoadConnector, PollConnector):
page_text += attachment_text page_text += attachment_text
comments_text = self._fetch_comments(self.confluence_client, page_id) comments_text = self._fetch_comments(self.confluence_client, page_id)
page_text += comments_text page_text += comments_text
doc_metadata: dict[str, str | list[str]] = { doc_metadata: dict[str, str | list[str]] = {
"Wiki Space Name": self.space "Wiki Space Name": self.space
} }
@ -450,9 +642,9 @@ class ConfluenceConnector(LoadConnector, PollConnector):
source=DocumentSource.CONFLUENCE, source=DocumentSource.CONFLUENCE,
semantic_identifier=page["title"], semantic_identifier=page["title"],
doc_updated_at=last_modified, doc_updated_at=last_modified,
primary_owners=[BasicExpertInfo(email=author)] primary_owners=(
if author [BasicExpertInfo(email=author)] if author else None
else None, ),
metadata=doc_metadata, metadata=doc_metadata,
) )
) )

View File

@ -2,7 +2,10 @@
import * as Yup from "yup"; import * as Yup from "yup";
import { ConfluenceIcon, TrashIcon } from "@/components/icons/icons"; import { ConfluenceIcon, TrashIcon } from "@/components/icons/icons";
import { TextFormField } from "@/components/admin/connectors/Field"; import {
BooleanFormField,
TextFormField,
} from "@/components/admin/connectors/Field";
import { HealthCheckBanner } from "@/components/health/healthcheck"; import { HealthCheckBanner } from "@/components/health/healthcheck";
import { CredentialForm } from "@/components/admin/connectors/CredentialForm"; import { CredentialForm } from "@/components/admin/connectors/CredentialForm";
import { import {
@ -207,13 +210,17 @@ const Main = () => {
<p className="text-sm mb-4"> <p className="text-sm mb-4">
Specify any link to a Confluence page below and click Specify any link to a Confluence page below and click
&quot;Index&quot; to Index. Based on the provided link, we will &quot;Index&quot; to Index. Based on the provided link, we will
index the ENTIRE SPACE, not just the specified page. For example, index either the entire page and its subpages OR the entire space.
entering{" "} For example, entering{" "}
<i> <i>
https://danswer.atlassian.net/wiki/spaces/Engineering/overview https://danswer.atlassian.net/wiki/spaces/Engineering/overview
</i>{" "} </i>{" "}
and clicking the Index button will index the whole{" "} and clicking the Index button will index the whole{" "}
<i>Engineering</i> Confluence space. <i>Engineering</i> Confluence space, but entering
https://danswer.atlassian.net/wiki/spaces/Engineering/pages/164331/example+page
will index that page&apos;s children (and optionally, itself). Use
the checkbox below to determine whether or not to index the parent
page in addition to its children.
</p> </p>
{confluenceConnectorIndexingStatuses.length > 0 && ( {confluenceConnectorIndexingStatuses.length > 0 && (
@ -274,9 +281,8 @@ const Main = () => {
<Divider /> <Divider />
</> </>
)} )}
<Card className="mt-4"> <Card className="mt-4">
<h2 className="font-bold mb-3">Add a New Space</h2> <h2 className="font-bold mb-3">Add a New Space or Page</h2>
<ConnectorForm<ConfluenceConfig> <ConnectorForm<ConfluenceConfig>
nameBuilder={(values) => nameBuilder={(values) =>
`ConfluenceConnector-${values.wiki_page_url}` `ConfluenceConnector-${values.wiki_page_url}`
@ -289,15 +295,21 @@ const Main = () => {
formBody={ formBody={
<> <>
<TextFormField name="wiki_page_url" label="Confluence URL:" /> <TextFormField name="wiki_page_url" label="Confluence URL:" />
<BooleanFormField
name="index_origin"
label="(For pages) Index the page itself"
/>
</> </>
} }
validationSchema={Yup.object().shape({ validationSchema={Yup.object().shape({
wiki_page_url: Yup.string().required( wiki_page_url: Yup.string().required(
"Please enter any link to your confluence e.g. https://danswer.atlassian.net/wiki/spaces/Engineering/overview" "Please enter any link to a Confluence space or Page e.g. https://danswer.atlassian.net/wiki/spaces/Engineering/overview"
), ),
index_origin: Yup.boolean(),
})} })}
initialValues={{ initialValues={{
wiki_page_url: "", wiki_page_url: "",
index_origin: true,
}} }}
refreshFreq={10 * 60} // 10 minutes refreshFreq={10 * 60} // 10 minutes
credentialId={confluenceCredential.id} credentialId={confluenceCredential.id}

View File

@ -133,6 +133,7 @@ export interface BookstackConfig {}
export interface ConfluenceConfig { export interface ConfluenceConfig {
wiki_page_url: string; wiki_page_url: string;
index_origin?: boolean;
} }
export interface JiraConfig { export interface JiraConfig {