diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index f61fc9770..9b6f455a8 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -157,6 +157,12 @@ DYNAMIC_CONFIG_STORE = os.environ.get( DYNAMIC_CONFIG_DIR_PATH = os.environ.get("DYNAMIC_CONFIG_DIR_PATH", "/home/storage") # notset, debug, info, warning, error, or critical LOG_LEVEL = os.environ.get("LOG_LEVEL", "info") +# NOTE: Currently only supported in the Confluence and Google Drive connectors + +# only handles some failures (Confluence = handles API call failures, Google +# Drive = handles failures pulling files / parsing them) +CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get( + "CONTINUE_ON_CONNECTOR_FAILURE", "" +).lower() not in ["false", ""] ##### diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index e165e1a36..b8b2296fc 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -1,5 +1,5 @@ from collections.abc import Callable -from collections.abc import Generator +from collections.abc import Collection from datetime import datetime from datetime import timezone from typing import Any @@ -7,6 +7,7 @@ from urllib.parse import urlparse from atlassian import Confluence # type:ignore +from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource from danswer.connectors.interfaces import GenerateDocumentsOutput @@ -16,8 +17,11 @@ from danswer.connectors.interfaces import SecondsSinceUnixEpoch from danswer.connectors.models import ConnectorMissingCredentialError from danswer.connectors.models import Document from danswer.connectors.models import Section +from danswer.utils.logger import setup_logger from danswer.utils.text_processing import parse_html_page_basic +logger = setup_logger() + # Potential Improvements # 1. If wiki page instead of space, do a search of all the children of the page instead of index all in the space # 2. Include attachments, etc @@ -48,7 +52,7 @@ def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str]: def _comment_dfs( comments_str: str, - comment_pages: Generator[dict[str, Any], None, None], + comment_pages: Collection[dict[str, Any]], confluence_client: Confluence, ) -> str: for comment_page in comment_pages: @@ -72,8 +76,10 @@ class ConfluenceConnector(LoadConnector, PollConnector): self, wiki_page_url: str, batch_size: int = INDEX_BATCH_SIZE, + continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE, ) -> None: self.batch_size = batch_size + self.continue_on_failure = continue_on_failure self.wiki_base, self.space = extract_confluence_keys_from_url(wiki_page_url) self.confluence_client: Confluence | None = None @@ -88,6 +94,57 @@ class ConfluenceConnector(LoadConnector, PollConnector): ) return None + def _fetch_pages( + self, + confluence_client: Confluence, + start_ind: int, + ) -> Collection[dict[str, Any]]: + def _fetch(start_ind: int, batch_size: int) -> Collection[dict[str, Any]]: + return confluence_client.get_all_pages_from_space( + self.space, + start=start_ind, + limit=batch_size, + expand="body.storage.value,version", + ) + + try: + return _fetch(start_ind, self.batch_size) + except Exception as e: + if not self.continue_on_failure: + raise e + + # error checking phase, only reachable if `self.continue_on_failure=True` + pages: list[dict[str, Any]] = [] + for i in range(self.batch_size): + try: + pages.extend(_fetch(start_ind + i, 1)) + except: + logger.exception( + "Ran into exception when fetching pages from Confluence" + ) + + return pages + + def _fetch_comments( + self, confluence_client: Confluence, page_id: str + ) -> Collection[dict[str, Any]]: + try: + return confluence_client.get_page_child_by_type( + page_id, + type="comment", + start=None, + limit=None, + expand="body.storage.value", + ) + except Exception as e: + if not self.continue_on_failure: + raise e + + logger.exception( + "Ran into exception when fetching comments from Confluence" + ) + return [] + def _get_doc_batch( self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None ) -> tuple[list[Document], int]: @@ -96,13 +153,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): if self.confluence_client is None: raise ConnectorMissingCredentialError("Confluence") - batch = self.confluence_client.get_all_pages_from_space( - self.space, - start=start_ind, - limit=self.batch_size, - expand="body.storage.value,version", - ) - + batch = self._fetch_pages(self.confluence_client, start_ind) for page in batch: last_modified_str = page["version"]["when"] last_modified = datetime.fromisoformat(last_modified_str) @@ -112,13 +163,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): page_text = ( page.get("title", "") + "\n" + parse_html_page_basic(page_html) ) - comment_pages = self.confluence_client.get_page_child_by_type( - page["id"], - type="comment", - start=None, - limit=None, - expand="body.storage.value", - ) + comment_pages = self._fetch_comments(self.confluence_client, page["id"]) comments_text = _comment_dfs("", comment_pages, self.confluence_client) page_text += comments_text diff --git a/backend/danswer/connectors/google_drive/connector.py b/backend/danswer/connectors/google_drive/connector.py index dff8aa515..88b4681b4 100644 --- a/backend/danswer/connectors/google_drive/connector.py +++ b/backend/danswer/connectors/google_drive/connector.py @@ -11,6 +11,7 @@ from google.oauth2.credentials import Credentials # type: ignore from googleapiclient import discovery # type: ignore from PyPDF2 import PdfReader +from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE from danswer.configs.app_configs import GOOGLE_DRIVE_FOLLOW_SHORTCUTS from danswer.configs.app_configs import GOOGLE_DRIVE_INCLUDE_SHARED from danswer.configs.app_configs import INDEX_BATCH_SIZE @@ -293,11 +294,13 @@ class GoogleDriveConnector(LoadConnector, PollConnector): batch_size: int = INDEX_BATCH_SIZE, include_shared: bool = GOOGLE_DRIVE_INCLUDE_SHARED, follow_shortcuts: bool = GOOGLE_DRIVE_FOLLOW_SHORTCUTS, + continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE, ) -> None: self.folder_paths = folder_paths or [] self.batch_size = batch_size self.include_shared = include_shared self.follow_shortcuts = follow_shortcuts + self.continue_on_failure = continue_on_failure self.creds: Credentials | None = None @staticmethod @@ -376,18 +379,28 @@ class GoogleDriveConnector(LoadConnector, PollConnector): for files_batch in file_batches: doc_batch = [] for file in files_batch: - text_contents = extract_text(file, service) - full_context = file["name"] + " - " + text_contents + try: + text_contents = extract_text(file, service) + full_context = file["name"] + " - " + text_contents - doc_batch.append( - Document( - id=file["webViewLink"], - sections=[Section(link=file["webViewLink"], text=full_context)], - source=DocumentSource.GOOGLE_DRIVE, - semantic_identifier=file["name"], - metadata={}, + doc_batch.append( + Document( + id=file["webViewLink"], + sections=[ + Section(link=file["webViewLink"], text=full_context) + ], + source=DocumentSource.GOOGLE_DRIVE, + semantic_identifier=file["name"], + metadata={}, + ) + ) + except Exception as e: + if not self.continue_on_failure: + raise e + + logger.exception( + "Ran into exception when pulling a file from Google Drive" ) - ) yield doc_batch diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 5b89b8ad9..130077ac9 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -68,6 +68,7 @@ services: - API_TYPE_OPENAI=${API_TYPE_OPENAI:-} - API_VERSION_OPENAI=${API_VERSION_OPENAI:-} - AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-} + - CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-} - DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-} - DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-} - LOG_LEVEL=${LOG_LEVEL:-info}