diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index c124e710639..24ced9c1821 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -1,4 +1,7 @@ +from collections.abc import Callable from collections.abc import Generator +from datetime import datetime +from datetime import timezone from typing import Any from urllib.parse import urlparse @@ -9,6 +12,8 @@ from danswer.configs.constants import DocumentSource from danswer.configs.constants import HTML_SEPARATOR from danswer.connectors.interfaces import GenerateDocumentsOutput from danswer.connectors.interfaces import LoadConnector +from danswer.connectors.interfaces import PollConnector +from danswer.connectors.interfaces import SecondsSinceUnixEpoch from danswer.connectors.models import Document from danswer.connectors.models import Section @@ -18,6 +23,13 @@ from danswer.connectors.models import Section # 3. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost +class ConfluenceClientNotSetUpError(PermissionError): + def __init__(self) -> None: + super().__init__( + "Confluence Client is not set up, was load_credentials called?" + ) + + def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str]: """Sample https://danswer.atlassian.net/wiki/spaces/1234abcd/overview @@ -62,7 +74,7 @@ def _comment_dfs( return comments_str -class ConfluenceConnector(LoadConnector): +class ConfluenceConnector(LoadConnector, PollConnector): def __init__( self, wiki_page_url: str, @@ -83,24 +95,26 @@ class ConfluenceConnector(LoadConnector): ) return None - def load_from_state(self) -> GenerateDocumentsOutput: + def _get_doc_batch( + self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None + ) -> tuple[list[Document], int]: + doc_batch: list[Document] = [] + if self.confluence_client is None: - raise PermissionError( - "Confluence Client is not set up, was load_credentials called?" - ) + raise ConfluenceClientNotSetUpError() - start_ind = 0 - while True: - doc_batch: list[Document] = [] + batch = self.confluence_client.get_all_pages_from_space( + self.space, + start=start_ind, + limit=self.batch_size, + expand="body.storage.value,version", + ) - batch = self.confluence_client.get_all_pages_from_space( - self.space, - start=start_ind, - limit=self.batch_size, - expand="body.storage.value", - ) + for page in batch: + last_modified_str = page["version"]["when"] + last_modified = datetime.fromisoformat(last_modified_str) - for page in batch: + if time_filter is None or time_filter(last_modified): page_html = page["body"]["storage"]["value"] soup = BeautifulSoup(page_html, "html.parser") page_text = page.get("title", "") + "\n" + soup.get_text(HTML_SEPARATOR) @@ -125,10 +139,39 @@ class ConfluenceConnector(LoadConnector): metadata={}, ) ) - yield doc_batch + return doc_batch, len(batch) - start_ind += len(batch) - if len(batch) < self.batch_size: + def load_from_state(self) -> GenerateDocumentsOutput: + if self.confluence_client is None: + raise ConfluenceClientNotSetUpError() + + start_ind = 0 + while True: + doc_batch, num_pages = self._get_doc_batch(start_ind) + start_ind += num_pages + if doc_batch: + yield doc_batch + + if num_pages < self.batch_size: + break + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: + if self.confluence_client is None: + raise ConfluenceClientNotSetUpError() + + start_time = datetime.fromtimestamp(start, tz=timezone.utc) + end_time = datetime.fromtimestamp(end, tz=timezone.utc) + + start_ind = 0 + while True: + doc_batch, num_pages = self._get_doc_batch( + start_ind, time_filter=lambda t: start_time <= t <= end_time + ) + start_ind += num_pages + if doc_batch: + yield doc_batch + + if num_pages < self.batch_size: break - if doc_batch: - yield doc_batch