Continue on some connector failures (#314)

This commit is contained in:
Chris Weaver 2023-08-18 17:59:33 -07:00 committed by GitHub
parent 70d7ca5c73
commit f541a3ee85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 26 deletions

View File

@ -157,6 +157,12 @@ DYNAMIC_CONFIG_STORE = os.environ.get(
DYNAMIC_CONFIG_DIR_PATH = os.environ.get("DYNAMIC_CONFIG_DIR_PATH", "/home/storage")
# notset, debug, info, warning, error, or critical
LOG_LEVEL = os.environ.get("LOG_LEVEL", "info")
# NOTE: Currently only supported in the Confluence and Google Drive connectors +
# only handles some failures (Confluence = handles API call failures, Google
# Drive = handles failures pulling files / parsing them)
CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get(
"CONTINUE_ON_CONNECTOR_FAILURE", ""
).lower() not in ["false", ""]
#####

View File

@ -1,5 +1,5 @@
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Collection
from datetime import datetime
from datetime import timezone
from typing import Any
@ -7,6 +7,7 @@ from urllib.parse import urlparse
from atlassian import Confluence # type:ignore
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
@ -16,8 +17,11 @@ from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import ConnectorMissingCredentialError
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.utils.logger import setup_logger
from danswer.utils.text_processing import parse_html_page_basic
logger = setup_logger()
# Potential Improvements
# 1. If wiki page instead of space, do a search of all the children of the page instead of index all in the space
# 2. Include attachments, etc
@ -48,7 +52,7 @@ def extract_confluence_keys_from_url(wiki_url: str) -> tuple[str, str]:
def _comment_dfs(
comments_str: str,
comment_pages: Generator[dict[str, Any], None, None],
comment_pages: Collection[dict[str, Any]],
confluence_client: Confluence,
) -> str:
for comment_page in comment_pages:
@ -72,8 +76,10 @@ class ConfluenceConnector(LoadConnector, PollConnector):
self,
wiki_page_url: str,
batch_size: int = INDEX_BATCH_SIZE,
continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE,
) -> None:
self.batch_size = batch_size
self.continue_on_failure = continue_on_failure
self.wiki_base, self.space = extract_confluence_keys_from_url(wiki_page_url)
self.confluence_client: Confluence | None = None
@ -88,6 +94,57 @@ class ConfluenceConnector(LoadConnector, PollConnector):
)
return None
def _fetch_pages(
self,
confluence_client: Confluence,
start_ind: int,
) -> Collection[dict[str, Any]]:
def _fetch(start_ind: int, batch_size: int) -> Collection[dict[str, Any]]:
return confluence_client.get_all_pages_from_space(
self.space,
start=start_ind,
limit=batch_size,
expand="body.storage.value,version",
)
try:
return _fetch(start_ind, self.batch_size)
except Exception as e:
if not self.continue_on_failure:
raise e
# error checking phase, only reachable if `self.continue_on_failure=True`
pages: list[dict[str, Any]] = []
for i in range(self.batch_size):
try:
pages.extend(_fetch(start_ind + i, 1))
except:
logger.exception(
"Ran into exception when fetching pages from Confluence"
)
return pages
def _fetch_comments(
self, confluence_client: Confluence, page_id: str
) -> Collection[dict[str, Any]]:
try:
return confluence_client.get_page_child_by_type(
page_id,
type="comment",
start=None,
limit=None,
expand="body.storage.value",
)
except Exception as e:
if not self.continue_on_failure:
raise e
logger.exception(
"Ran into exception when fetching comments from Confluence"
)
return []
def _get_doc_batch(
self, start_ind: int, time_filter: Callable[[datetime], bool] | None = None
) -> tuple[list[Document], int]:
@ -96,13 +153,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
if self.confluence_client is None:
raise ConnectorMissingCredentialError("Confluence")
batch = self.confluence_client.get_all_pages_from_space(
self.space,
start=start_ind,
limit=self.batch_size,
expand="body.storage.value,version",
)
batch = self._fetch_pages(self.confluence_client, start_ind)
for page in batch:
last_modified_str = page["version"]["when"]
last_modified = datetime.fromisoformat(last_modified_str)
@ -112,13 +163,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
page_text = (
page.get("title", "") + "\n" + parse_html_page_basic(page_html)
)
comment_pages = self.confluence_client.get_page_child_by_type(
page["id"],
type="comment",
start=None,
limit=None,
expand="body.storage.value",
)
comment_pages = self._fetch_comments(self.confluence_client, page["id"])
comments_text = _comment_dfs("", comment_pages, self.confluence_client)
page_text += comments_text

View File

@ -11,6 +11,7 @@ from google.oauth2.credentials import Credentials # type: ignore
from googleapiclient import discovery # type: ignore
from PyPDF2 import PdfReader
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.app_configs import GOOGLE_DRIVE_FOLLOW_SHORTCUTS
from danswer.configs.app_configs import GOOGLE_DRIVE_INCLUDE_SHARED
from danswer.configs.app_configs import INDEX_BATCH_SIZE
@ -293,11 +294,13 @@ class GoogleDriveConnector(LoadConnector, PollConnector):
batch_size: int = INDEX_BATCH_SIZE,
include_shared: bool = GOOGLE_DRIVE_INCLUDE_SHARED,
follow_shortcuts: bool = GOOGLE_DRIVE_FOLLOW_SHORTCUTS,
continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE,
) -> None:
self.folder_paths = folder_paths or []
self.batch_size = batch_size
self.include_shared = include_shared
self.follow_shortcuts = follow_shortcuts
self.continue_on_failure = continue_on_failure
self.creds: Credentials | None = None
@staticmethod
@ -376,18 +379,28 @@ class GoogleDriveConnector(LoadConnector, PollConnector):
for files_batch in file_batches:
doc_batch = []
for file in files_batch:
text_contents = extract_text(file, service)
full_context = file["name"] + " - " + text_contents
try:
text_contents = extract_text(file, service)
full_context = file["name"] + " - " + text_contents
doc_batch.append(
Document(
id=file["webViewLink"],
sections=[Section(link=file["webViewLink"], text=full_context)],
source=DocumentSource.GOOGLE_DRIVE,
semantic_identifier=file["name"],
metadata={},
doc_batch.append(
Document(
id=file["webViewLink"],
sections=[
Section(link=file["webViewLink"], text=full_context)
],
source=DocumentSource.GOOGLE_DRIVE,
semantic_identifier=file["name"],
metadata={},
)
)
except Exception as e:
if not self.continue_on_failure:
raise e
logger.exception(
"Ran into exception when pulling a file from Google Drive"
)
)
yield doc_batch

View File

@ -68,6 +68,7 @@ services:
- API_TYPE_OPENAI=${API_TYPE_OPENAI:-}
- API_VERSION_OPENAI=${API_VERSION_OPENAI:-}
- AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-}
- CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-}
- DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-}
- DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-}
- LOG_LEVEL=${LOG_LEVEL:-info}