mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-27 12:29:41 +02:00
support indexing attachments as separate docs when not part of a page (#2194)
* support indexing attachments as separate docs when not part of a page * fix time filter, fix batch handling, fix returned number of attachments processed
This commit is contained in:
@@ -213,6 +213,19 @@ def _comment_dfs(
|
|||||||
return comments_str
|
return comments_str
|
||||||
|
|
||||||
|
|
||||||
|
def _datetime_from_string(datetime_string: str) -> datetime:
|
||||||
|
datetime_object = datetime.fromisoformat(datetime_string)
|
||||||
|
|
||||||
|
if datetime_object.tzinfo is None:
|
||||||
|
# If no timezone info, assume it is UTC
|
||||||
|
datetime_object = datetime_object.replace(tzinfo=timezone.utc)
|
||||||
|
else:
|
||||||
|
# If not in UTC, translate it
|
||||||
|
datetime_object = datetime_object.astimezone(timezone.utc)
|
||||||
|
|
||||||
|
return datetime_object
|
||||||
|
|
||||||
|
|
||||||
class RecursiveIndexer:
|
class RecursiveIndexer:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -537,15 +550,18 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
|
|
||||||
def _fetch_attachments(
|
def _fetch_attachments(
|
||||||
self, confluence_client: Confluence, page_id: str, files_in_used: list[str]
|
self, confluence_client: Confluence, page_id: str, files_in_used: list[str]
|
||||||
) -> str:
|
) -> tuple[str, list[dict[str, Any]]]:
|
||||||
|
unused_attachments = []
|
||||||
|
|
||||||
get_attachments_from_content = make_confluence_call_handle_rate_limit(
|
get_attachments_from_content = make_confluence_call_handle_rate_limit(
|
||||||
confluence_client.get_attachments_from_content
|
confluence_client.get_attachments_from_content
|
||||||
)
|
)
|
||||||
files_attachment_content: list = []
|
files_attachment_content: list = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
expand = "history.lastUpdated,metadata.labels"
|
||||||
attachments_container = get_attachments_from_content(
|
attachments_container = get_attachments_from_content(
|
||||||
page_id, start=0, limit=500
|
page_id, start=0, limit=500, expand=expand
|
||||||
)
|
)
|
||||||
for attachment in attachments_container["results"]:
|
for attachment in attachments_container["results"]:
|
||||||
if attachment["metadata"]["mediaType"] in [
|
if attachment["metadata"]["mediaType"] in [
|
||||||
@@ -558,9 +574,6 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
]:
|
]:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if attachment["title"] not in files_in_used:
|
|
||||||
continue
|
|
||||||
|
|
||||||
download_link = confluence_client.url + attachment["_links"]["download"]
|
download_link = confluence_client.url + attachment["_links"]["download"]
|
||||||
|
|
||||||
attachment_size = attachment["extensions"]["fileSize"]
|
attachment_size = attachment["extensions"]["fileSize"]
|
||||||
@@ -572,7 +585,10 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
download_link = confluence_client.url + attachment["_links"]["download"]
|
if attachment["title"] not in files_in_used:
|
||||||
|
unused_attachments.append(attachment)
|
||||||
|
continue
|
||||||
|
|
||||||
response = confluence_client._session.get(download_link)
|
response = confluence_client._session.get(download_link)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
@@ -588,92 +604,155 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
f"Ran into exception when fetching attachments from Confluence: {e}"
|
f"Ran into exception when fetching attachments from Confluence: {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return "\n".join(files_attachment_content)
|
return "\n".join(files_attachment_content), unused_attachments
|
||||||
|
|
||||||
def _get_doc_batch(
|
def _get_doc_batch(
|
||||||
self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None
|
self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None
|
||||||
) -> tuple[list[Document], int]:
|
) -> tuple[list[Document], list[dict[str, Any]], int]:
|
||||||
doc_batch: list[Document] = []
|
doc_batch: list[Document] = []
|
||||||
|
unused_attachments = []
|
||||||
|
|
||||||
if self.confluence_client is None:
|
if self.confluence_client is None:
|
||||||
raise ConnectorMissingCredentialError("Confluence")
|
raise ConnectorMissingCredentialError("Confluence")
|
||||||
batch = self._fetch_pages(self.confluence_client, start_ind)
|
batch = self._fetch_pages(self.confluence_client, start_ind)
|
||||||
|
|
||||||
for page in batch:
|
for page in batch:
|
||||||
last_modified_str = page["version"]["when"]
|
last_modified = _datetime_from_string(page["version"]["when"])
|
||||||
author = cast(str | None, page["version"].get("by", {}).get("email"))
|
author = cast(str | None, page["version"].get("by", {}).get("email"))
|
||||||
last_modified = datetime.fromisoformat(last_modified_str)
|
|
||||||
|
|
||||||
if last_modified.tzinfo is None:
|
if time_filter and not time_filter(last_modified):
|
||||||
# If no timezone info, assume it is UTC
|
continue
|
||||||
last_modified = last_modified.replace(tzinfo=timezone.utc)
|
|
||||||
else:
|
|
||||||
# If not in UTC, translate it
|
|
||||||
last_modified = last_modified.astimezone(timezone.utc)
|
|
||||||
|
|
||||||
if time_filter is None or time_filter(last_modified):
|
page_id = page["id"]
|
||||||
page_id = page["id"]
|
|
||||||
|
|
||||||
if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING:
|
if self.labels_to_skip or not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING:
|
||||||
page_labels = self._fetch_labels(self.confluence_client, page_id)
|
page_labels = self._fetch_labels(self.confluence_client, page_id)
|
||||||
|
|
||||||
# check disallowed labels
|
# check disallowed labels
|
||||||
if self.labels_to_skip:
|
if self.labels_to_skip:
|
||||||
label_intersection = self.labels_to_skip.intersection(page_labels)
|
label_intersection = self.labels_to_skip.intersection(page_labels)
|
||||||
if label_intersection:
|
if label_intersection:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Page with ID '{page_id}' has a label which has been "
|
f"Page with ID '{page_id}' has a label which has been "
|
||||||
f"designated as disallowed: {label_intersection}. Skipping."
|
f"designated as disallowed: {label_intersection}. Skipping."
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
|
||||||
|
|
||||||
page_html = (
|
|
||||||
page["body"]
|
|
||||||
.get("storage", page["body"].get("view", {}))
|
|
||||||
.get("value")
|
|
||||||
)
|
|
||||||
page_url = self.wiki_base + page["_links"]["webui"]
|
|
||||||
if not page_html:
|
|
||||||
logger.debug("Page is empty, skipping: %s", page_url)
|
|
||||||
continue
|
|
||||||
page_text = parse_html_page(page_html, self.confluence_client)
|
|
||||||
|
|
||||||
files_in_used = get_used_attachments(page_html, self.confluence_client)
|
|
||||||
attachment_text = self._fetch_attachments(
|
|
||||||
self.confluence_client, page_id, files_in_used
|
|
||||||
)
|
|
||||||
page_text += attachment_text
|
|
||||||
comments_text = self._fetch_comments(self.confluence_client, page_id)
|
|
||||||
page_text += comments_text
|
|
||||||
doc_metadata: dict[str, str | list[str]] = {
|
|
||||||
"Wiki Space Name": self.space
|
|
||||||
}
|
|
||||||
if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING and page_labels:
|
|
||||||
doc_metadata["labels"] = page_labels
|
|
||||||
|
|
||||||
doc_batch.append(
|
|
||||||
Document(
|
|
||||||
id=page_url,
|
|
||||||
sections=[Section(link=page_url, text=page_text)],
|
|
||||||
source=DocumentSource.CONFLUENCE,
|
|
||||||
semantic_identifier=page["title"],
|
|
||||||
doc_updated_at=last_modified,
|
|
||||||
primary_owners=(
|
|
||||||
[BasicExpertInfo(email=author)] if author else None
|
|
||||||
),
|
|
||||||
metadata=doc_metadata,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
page_html = (
|
||||||
|
page["body"].get("storage", page["body"].get("view", {})).get("value")
|
||||||
|
)
|
||||||
|
page_url = self.wiki_base + page["_links"]["webui"]
|
||||||
|
if not page_html:
|
||||||
|
logger.debug("Page is empty, skipping: %s", page_url)
|
||||||
|
continue
|
||||||
|
page_text = parse_html_page(page_html, self.confluence_client)
|
||||||
|
|
||||||
|
files_in_used = get_used_attachments(page_html, self.confluence_client)
|
||||||
|
attachment_text, unused_page_attachments = self._fetch_attachments(
|
||||||
|
self.confluence_client, page_id, files_in_used
|
||||||
|
)
|
||||||
|
unused_attachments.extend(unused_page_attachments)
|
||||||
|
|
||||||
|
page_text += attachment_text
|
||||||
|
comments_text = self._fetch_comments(self.confluence_client, page_id)
|
||||||
|
page_text += comments_text
|
||||||
|
doc_metadata: dict[str, str | list[str]] = {"Wiki Space Name": self.space}
|
||||||
|
if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING and page_labels:
|
||||||
|
doc_metadata["labels"] = page_labels
|
||||||
|
|
||||||
|
doc_batch.append(
|
||||||
|
Document(
|
||||||
|
id=page_url,
|
||||||
|
sections=[Section(link=page_url, text=page_text)],
|
||||||
|
source=DocumentSource.CONFLUENCE,
|
||||||
|
semantic_identifier=page["title"],
|
||||||
|
doc_updated_at=last_modified,
|
||||||
|
primary_owners=(
|
||||||
|
[BasicExpertInfo(email=author)] if author else None
|
||||||
|
),
|
||||||
|
metadata=doc_metadata,
|
||||||
)
|
)
|
||||||
return doc_batch, len(batch)
|
)
|
||||||
|
return (
|
||||||
|
doc_batch,
|
||||||
|
unused_attachments,
|
||||||
|
len(batch),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_attachment_batch(
|
||||||
|
self,
|
||||||
|
start_ind: int,
|
||||||
|
attachments: list[dict[str, Any]],
|
||||||
|
time_filter: Callable[[datetime], bool] | None = None,
|
||||||
|
) -> tuple[list[Document], int]:
|
||||||
|
doc_batch: list[Document] = []
|
||||||
|
|
||||||
|
if self.confluence_client is None:
|
||||||
|
raise ConnectorMissingCredentialError("Confluence")
|
||||||
|
|
||||||
|
end_ind = min(start_ind + self.batch_size, len(attachments))
|
||||||
|
|
||||||
|
for attachment in attachments[start_ind:end_ind]:
|
||||||
|
last_updated = _datetime_from_string(
|
||||||
|
attachment["history"]["lastUpdated"]["when"]
|
||||||
|
)
|
||||||
|
|
||||||
|
if time_filter and not time_filter(last_updated):
|
||||||
|
continue
|
||||||
|
|
||||||
|
download_url = self.confluence_client.url + attachment["_links"]["download"]
|
||||||
|
|
||||||
|
response = self.confluence_client._session.get(download_url)
|
||||||
|
if response.status_code != 200:
|
||||||
|
continue
|
||||||
|
|
||||||
|
extracted_text = extract_file_text(
|
||||||
|
attachment["title"], io.BytesIO(response.content), False
|
||||||
|
)
|
||||||
|
|
||||||
|
creator_email = attachment["history"]["createdBy"]["email"]
|
||||||
|
|
||||||
|
comment = attachment["metadata"].get("comment", "")
|
||||||
|
doc_metadata: dict[str, str | list[str]] = {"comment": comment}
|
||||||
|
|
||||||
|
attachment_labels: list[str] = []
|
||||||
|
if not CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING:
|
||||||
|
for label in attachment["metadata"]["labels"]["results"]:
|
||||||
|
attachment_labels.append(label["name"])
|
||||||
|
|
||||||
|
doc_metadata["labels"] = attachment_labels
|
||||||
|
|
||||||
|
doc_batch.append(
|
||||||
|
Document(
|
||||||
|
id=download_url,
|
||||||
|
sections=[Section(link=download_url, text=extracted_text)],
|
||||||
|
source=DocumentSource.CONFLUENCE,
|
||||||
|
semantic_identifier=attachment["title"],
|
||||||
|
doc_updated_at=last_updated,
|
||||||
|
primary_owners=(
|
||||||
|
[BasicExpertInfo(email=creator_email)]
|
||||||
|
if creator_email
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
metadata=doc_metadata,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return doc_batch, end_ind - start_ind
|
||||||
|
|
||||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||||
|
unused_attachments = []
|
||||||
|
|
||||||
if self.confluence_client is None:
|
if self.confluence_client is None:
|
||||||
raise ConnectorMissingCredentialError("Confluence")
|
raise ConnectorMissingCredentialError("Confluence")
|
||||||
|
|
||||||
start_ind = 0
|
start_ind = 0
|
||||||
while True:
|
while True:
|
||||||
doc_batch, num_pages = self._get_doc_batch(start_ind)
|
doc_batch, unused_attachments_batch, num_pages = self._get_doc_batch(
|
||||||
|
start_ind
|
||||||
|
)
|
||||||
|
unused_attachments.extend(unused_attachments_batch)
|
||||||
start_ind += num_pages
|
start_ind += num_pages
|
||||||
if doc_batch:
|
if doc_batch:
|
||||||
yield doc_batch
|
yield doc_batch
|
||||||
@@ -681,9 +760,23 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
if num_pages < self.batch_size:
|
if num_pages < self.batch_size:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
start_ind = 0
|
||||||
|
while True:
|
||||||
|
attachment_batch, num_attachments = self._get_attachment_batch(
|
||||||
|
start_ind, unused_attachments
|
||||||
|
)
|
||||||
|
start_ind += num_attachments
|
||||||
|
if attachment_batch:
|
||||||
|
yield attachment_batch
|
||||||
|
|
||||||
|
if num_attachments < self.batch_size:
|
||||||
|
break
|
||||||
|
|
||||||
def poll_source(
|
def poll_source(
|
||||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||||
) -> GenerateDocumentsOutput:
|
) -> GenerateDocumentsOutput:
|
||||||
|
unused_attachments = []
|
||||||
|
|
||||||
if self.confluence_client is None:
|
if self.confluence_client is None:
|
||||||
raise ConnectorMissingCredentialError("Confluence")
|
raise ConnectorMissingCredentialError("Confluence")
|
||||||
|
|
||||||
@@ -692,9 +785,11 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
|
|
||||||
start_ind = 0
|
start_ind = 0
|
||||||
while True:
|
while True:
|
||||||
doc_batch, num_pages = self._get_doc_batch(
|
doc_batch, unused_attachments_batch, num_pages = self._get_doc_batch(
|
||||||
start_ind, time_filter=lambda t: start_time <= t <= end_time
|
start_ind, time_filter=lambda t: start_time <= t <= end_time
|
||||||
)
|
)
|
||||||
|
unused_attachments.extend(unused_attachments_batch)
|
||||||
|
|
||||||
start_ind += num_pages
|
start_ind += num_pages
|
||||||
if doc_batch:
|
if doc_batch:
|
||||||
yield doc_batch
|
yield doc_batch
|
||||||
@@ -702,6 +797,20 @@ class ConfluenceConnector(LoadConnector, PollConnector):
|
|||||||
if num_pages < self.batch_size:
|
if num_pages < self.batch_size:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
start_ind = 0
|
||||||
|
while True:
|
||||||
|
attachment_batch, num_attachments = self._get_attachment_batch(
|
||||||
|
start_ind,
|
||||||
|
unused_attachments,
|
||||||
|
time_filter=lambda t: start_time <= t <= end_time,
|
||||||
|
)
|
||||||
|
start_ind += num_attachments
|
||||||
|
if attachment_batch:
|
||||||
|
yield attachment_batch
|
||||||
|
|
||||||
|
if num_attachments < self.batch_size:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
connector = ConfluenceConnector(os.environ["CONFLUENCE_TEST_SPACE_URL"])
|
connector = ConfluenceConnector(os.environ["CONFLUENCE_TEST_SPACE_URL"])
|
||||||
|
Reference in New Issue
Block a user