From b9b633bb74a26a2882e3289faddca3c7a317f3f1 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 21 Aug 2024 10:15:13 -0700 Subject: [PATCH] support indexing attachments as separate docs when not part of a page (#2194) * support indexing attachments as separate docs when not part of a page * fix time filter, fix batch handling, fix returned number of attachments processed --- .../connectors/confluence/connector.py | 247 +++++++++++++----- 1 file changed, 178 insertions(+), 69 deletions(-) diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 3e366ead1..da8f42edc 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -213,6 +213,19 @@ def _comment_dfs( return comments_str +def _datetime_from_string(datetime_string: str) -> datetime: + datetime_object = datetime.fromisoformat(datetime_string) + + if datetime_object.tzinfo is None: + # If no timezone info, assume it is UTC + datetime_object = datetime_object.replace(tzinfo=timezone.utc) + else: + # If not in UTC, translate it + datetime_object = datetime_object.astimezone(timezone.utc) + + return datetime_object + + class RecursiveIndexer: def __init__( self, @@ -537,15 +550,18 @@ class ConfluenceConnector(LoadConnector, PollConnector): def _fetch_attachments( self, confluence_client: Confluence, page_id: str, files_in_used: list[str] - ) -> str: + ) -> tuple[str, list[dict[str, Any]]]: + unused_attachments = [] + get_attachments_from_content = make_confluence_call_handle_rate_limit( confluence_client.get_attachments_from_content ) files_attachment_content: list = [] try: + expand = "history.lastUpdated,metadata.labels" attachments_container = get_attachments_from_content( - page_id, start=0, limit=500 + page_id, start=0, limit=500, expand=expand ) for attachment in attachments_container["results"]: if attachment["metadata"]["mediaType"] in [ @@ -558,9 +574,6 @@ class ConfluenceConnector(LoadConnector, PollConnector): ]: continue - if attachment["title"] not in files_in_used: - continue - download_link = confluence_client.url + attachment["_links"]["download"] attachment_size = attachment["extensions"]["fileSize"] @@ -572,7 +585,10 @@ class ConfluenceConnector(LoadConnector, PollConnector): ) continue - download_link = confluence_client.url + attachment["_links"]["download"] + if attachment["title"] not in files_in_used: + unused_attachments.append(attachment) + continue + response = confluence_client._session.get(download_link) if response.status_code == 200: @@ -588,92 +604,155 @@ class ConfluenceConnector(LoadConnector, PollConnector): f"Ran into exception when fetching attachments from Confluence: {e}" ) - return "\n".join(files_attachment_content) + return "\n".join(files_attachment_content), unused_attachments def _get_doc_batch( self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None - ) -> tuple[list[Document], int]: + ) -> tuple[list[Document], list[dict[str, Any]], int]: doc_batch: list[Document] = [] + unused_attachments = [] if self.confluence_client is None: raise ConnectorMissingCredentialError("Confluence") batch = self._fetch_pages(self.confluence_client, start_ind) for page in batch: - last_modified_str = page["version"]["when"] + last_modified = _datetime_from_string(page["version"]["when"]) author = cast(str | None, page["version"].get("by", {}).get("email")) - last_modified = datetime.fromisoformat(last_modified_str) - if last_modified.tzinfo is None: - # If no timezone info, assume it is UTC - last_modified = last_modified.replace(tzinfo=timezone.utc) - else: - # If not in UTC, translate it - last_modified = last_modified.astimezone(timezone.utc) + if time_filter and not time_filter(last_modified): + continue - if time_filter is None or time_filter(last_modified): - page_id = page["id"] + page_id = page["id"] - if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING: - page_labels = self._fetch_labels(self.confluence_client, page_id) + if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING: + page_labels = self._fetch_labels(self.confluence_client, page_id) - # check disallowed labels - if self.labels_to_skip: - label_intersection = self.labels_to_skip.intersection(page_labels) - if label_intersection: - logger.info( - f"Page with ID '{page_id}' has a label which has been " - f"designated as disallowed: {label_intersection}. Skipping." - ) - - continue - - page_html = ( - page["body"] - .get("storage", page["body"].get("view", {})) - .get("value") - ) - page_url = self.wiki_base + page["_links"]["webui"] - if not page_html: - logger.debug("Page is empty, skipping: %s", page_url) - continue - page_text = parse_html_page(page_html, self.confluence_client) - - files_in_used = get_used_attachments(page_html, self.confluence_client) - attachment_text = self._fetch_attachments( - self.confluence_client, page_id, files_in_used - ) - page_text += attachment_text - comments_text = self._fetch_comments(self.confluence_client, page_id) - page_text += comments_text - doc_metadata: dict[str, str | list[str]] = { - "Wiki Space Name": self.space - } - if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING and page_labels: - doc_metadata["labels"] = page_labels - - doc_batch.append( - Document( - id=page_url, - sections=[Section(link=page_url, text=page_text)], - source=DocumentSource.CONFLUENCE, - semantic_identifier=page["title"], - doc_updated_at=last_modified, - primary_owners=( - [BasicExpertInfo(email=author)] if author else None - ), - metadata=doc_metadata, + # check disallowed labels + if self.labels_to_skip: + label_intersection = self.labels_to_skip.intersection(page_labels) + if label_intersection: + logger.info( + f"Page with ID '{page_id}' has a label which has been " + f"designated as disallowed: {label_intersection}. Skipping." ) + + continue + + page_html = ( + page["body"].get("storage", page["body"].get("view", {})).get("value") + ) + page_url = self.wiki_base + page["_links"]["webui"] + if not page_html: + logger.debug("Page is empty, skipping: %s", page_url) + continue + page_text = parse_html_page(page_html, self.confluence_client) + + files_in_used = get_used_attachments(page_html, self.confluence_client) + attachment_text, unused_page_attachments = self._fetch_attachments( + self.confluence_client, page_id, files_in_used + ) + unused_attachments.extend(unused_page_attachments) + + page_text += attachment_text + comments_text = self._fetch_comments(self.confluence_client, page_id) + page_text += comments_text + doc_metadata: dict[str, str | list[str]] = {"Wiki Space Name": self.space} + if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING and page_labels: + doc_metadata["labels"] = page_labels + + doc_batch.append( + Document( + id=page_url, + sections=[Section(link=page_url, text=page_text)], + source=DocumentSource.CONFLUENCE, + semantic_identifier=page["title"], + doc_updated_at=last_modified, + primary_owners=( + [BasicExpertInfo(email=author)] if author else None + ), + metadata=doc_metadata, ) - return doc_batch, len(batch) + ) + return ( + doc_batch, + unused_attachments, + len(batch), + ) + + def _get_attachment_batch( + self, + start_ind: int, + attachments: list[dict[str, Any]], + time_filter: Callable[[datetime], bool] | None = None, + ) -> tuple[list[Document], int]: + doc_batch: list[Document] = [] + + if self.confluence_client is None: + raise ConnectorMissingCredentialError("Confluence") + + end_ind = min(start_ind + self.batch_size, len(attachments)) + + for attachment in attachments[start_ind:end_ind]: + last_updated = _datetime_from_string( + attachment["history"]["lastUpdated"]["when"] + ) + + if time_filter and not time_filter(last_updated): + continue + + download_url = self.confluence_client.url + attachment["_links"]["download"] + + response = self.confluence_client._session.get(download_url) + if response.status_code != 200: + continue + + extracted_text = extract_file_text( + attachment["title"], io.BytesIO(response.content), False + ) + + creator_email = attachment["history"]["createdBy"]["email"] + + comment = attachment["metadata"].get("comment", "") + doc_metadata: dict[str, str | list[str]] = {"comment": comment} + + attachment_labels: list[str] = [] + if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING: + for label in attachment["metadata"]["labels"]["results"]: + attachment_labels.append(label["name"]) + + doc_metadata["labels"] = attachment_labels + + doc_batch.append( + Document( + id=download_url, + sections=[Section(link=download_url, text=extracted_text)], + source=DocumentSource.CONFLUENCE, + semantic_identifier=attachment["title"], + doc_updated_at=last_updated, + primary_owners=( + [BasicExpertInfo(email=creator_email)] + if creator_email + else None + ), + metadata=doc_metadata, + ) + ) + + return doc_batch, end_ind - start_ind def load_from_state(self) -> GenerateDocumentsOutput: + unused_attachments = [] + if self.confluence_client is None: raise ConnectorMissingCredentialError("Confluence") start_ind = 0 while True: - doc_batch, num_pages = self._get_doc_batch(start_ind) + doc_batch, unused_attachments_batch, num_pages = self._get_doc_batch( + start_ind + ) + unused_attachments.extend(unused_attachments_batch) start_ind += num_pages if doc_batch: yield doc_batch @@ -681,9 +760,23 @@ class ConfluenceConnector(LoadConnector, PollConnector): if num_pages < self.batch_size: break + start_ind = 0 + while True: + attachment_batch, num_attachments = self._get_attachment_batch( + start_ind, unused_attachments + ) + start_ind += num_attachments + if attachment_batch: + yield attachment_batch + + if num_attachments < self.batch_size: + break + def poll_source( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch ) -> GenerateDocumentsOutput: + unused_attachments = [] + if self.confluence_client is None: raise ConnectorMissingCredentialError("Confluence") @@ -692,9 +785,11 @@ class ConfluenceConnector(LoadConnector, PollConnector): start_ind = 0 while True: - doc_batch, num_pages = self._get_doc_batch( + doc_batch, unused_attachments_batch, num_pages = self._get_doc_batch( start_ind, time_filter=lambda t: start_time <= t <= end_time ) + unused_attachments.extend(unused_attachments_batch) + start_ind += num_pages if doc_batch: yield doc_batch @@ -702,6 +797,20 @@ class ConfluenceConnector(LoadConnector, PollConnector): if num_pages < self.batch_size: break + start_ind = 0 + while True: + attachment_batch, num_attachments = self._get_attachment_batch( + start_ind, + unused_attachments, + time_filter=lambda t: start_time <= t <= end_time, + ) + start_ind += num_attachments + if attachment_batch: + yield attachment_batch + + if num_attachments < self.batch_size: + break + if __name__ == "__main__": connector = ConfluenceConnector(os.environ["CONFLUENCE_TEST_SPACE_URL"])