DAN-141 Confluence Poll Connector (#114)

This commit is contained in:
Yuhong Sun 2023-06-24 00:01:09 -07:00 committed by GitHub
parent 34861013f8
commit 3701239283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,7 @@
from collections.abc import Callable
from collections.abc import Generator from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from typing import Any from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
@ -9,6 +12,8 @@ from danswer.configs.constants import DocumentSource
from danswer.configs.constants import HTML_SEPARATOR from danswer.configs.constants import HTML_SEPARATOR
from danswer.connectors.interfaces import GenerateDocumentsOutput from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import Document from danswer.connectors.models import Document
from danswer.connectors.models import Section from danswer.connectors.models import Section
@ -18,6 +23,13 @@ from danswer.connectors.models import Section
# 3. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost # 3. Segment into Sections for more accurate linking, can split by headers but make sure no text/ordering is lost
class ConfluenceClientNotSetUpError(PermissionError):
def __init__(self) -> None:
super().__init__(
"Confluence Client is not set up, was load_credentials called?"
)
def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str]: def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str]:
"""Sample """Sample
https://danswer.atlassian.net/wiki/spaces/1234abcd/overview https://danswer.atlassian.net/wiki/spaces/1234abcd/overview
@ -62,7 +74,7 @@ def _comment_dfs(
return comments_str return comments_str
class ConfluenceConnector(LoadConnector): class ConfluenceConnector(LoadConnector, PollConnector):
def __init__( def __init__(
self, self,
wiki_page_url: str, wiki_page_url: str,
@ -83,24 +95,26 @@ class ConfluenceConnector(LoadConnector):
) )
return None return None
def load_from_state(self) -> GenerateDocumentsOutput: def _get_doc_batch(
if self.confluence_client is None: self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None
raise PermissionError( ) -> tuple[list[Document], int]:
"Confluence Client is not set up, was load_credentials called?"
)
start_ind = 0
while True:
doc_batch: list[Document] = [] doc_batch: list[Document] = []
if self.confluence_client is None:
raise ConfluenceClientNotSetUpError()
batch = self.confluence_client.get_all_pages_from_space( batch = self.confluence_client.get_all_pages_from_space(
self.space, self.space,
start=start_ind, start=start_ind,
limit=self.batch_size, limit=self.batch_size,
expand="body.storage.value", expand="body.storage.value,version",
) )
for page in batch: for page in batch:
last_modified_str = page["version"]["when"]
last_modified = datetime.fromisoformat(last_modified_str)
if time_filter is None or time_filter(last_modified):
page_html = page["body"]["storage"]["value"] page_html = page["body"]["storage"]["value"]
soup = BeautifulSoup(page_html, "html.parser") soup = BeautifulSoup(page_html, "html.parser")
page_text = page.get("title", "") + "\n" + soup.get_text(HTML_SEPARATOR) page_text = page.get("title", "") + "\n" + soup.get_text(HTML_SEPARATOR)
@ -125,10 +139,39 @@ class ConfluenceConnector(LoadConnector):
metadata={}, metadata={},
) )
) )
yield doc_batch return doc_batch, len(batch)
start_ind += len(batch) def load_from_state(self) -> GenerateDocumentsOutput:
if len(batch) < self.batch_size: if self.confluence_client is None:
break raise ConfluenceClientNotSetUpError()
start_ind = 0
while True:
doc_batch, num_pages = self._get_doc_batch(start_ind)
start_ind += num_pages
if doc_batch: if doc_batch:
yield doc_batch yield doc_batch
if num_pages < self.batch_size:
break
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
if self.confluence_client is None:
raise ConfluenceClientNotSetUpError()
start_time = datetime.fromtimestamp(start, tz=timezone.utc)
end_time = datetime.fromtimestamp(end, tz=timezone.utc)
start_ind = 0
while True:
doc_batch, num_pages = self._get_doc_batch(
start_ind, time_filter=lambda t: start_time <= t <= end_time
)
start_ind += num_pages
if doc_batch:
yield doc_batch
if num_pages < self.batch_size:
break