diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 82174c8b9..03b91fa29 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -105,6 +105,7 @@ def _get_user(user_id: str, confluence_client: DanswerConfluence) -> str: confluence_client.get_user_details_by_accountid ) try: + logger.info(f"_get_user - get_user_details_by_accountid: id={user_id}") return get_user_details_by_accountid(user_id).get("displayName", user_not_found) except Exception as e: logger.warning( @@ -156,6 +157,9 @@ def _comment_dfs( comment_html, confluence_client ) try: + logger.info( + f"_comment_dfs - get_page_by_child_type: id={comment_page['id']}" + ) child_comment_pages = get_page_child_by_type( comment_page["id"], type="comment", @@ -212,13 +216,16 @@ class RecursiveIndexer: self.confluence_client.get_page_by_id ) try: + logger.info( + f"_fetch_origin_page - get_page_by_id: id={self.origin_page_id}" + ) origin_page = get_page_by_id( self.origin_page_id, expand="body.storage.value,version,space" ) return origin_page - except Exception as e: - logger.warning( - f"Appending origin page with id {self.origin_page_id} failed: {e}" + except Exception: + logger.exception( + f"Appending origin page with id {self.origin_page_id} failed." ) return {} @@ -230,6 +237,10 @@ class RecursiveIndexer: queue: list[str] = [page_id] visited_pages: set[str] = set() + get_page_by_id = make_confluence_call_handle_rate_limit( + self.confluence_client.get_page_by_id + ) + get_page_child_by_type = make_confluence_call_handle_rate_limit( self.confluence_client.get_page_child_by_type ) @@ -242,12 +253,15 @@ class RecursiveIndexer: try: # Fetch the page itself - page = self.confluence_client.get_page_by_id( + logger.info( + f"recurse_children_pages - get_page_by_id: id={current_page_id}" + ) + page = get_page_by_id( current_page_id, expand="body.storage.value,version,space" ) pages.append(page) - except Exception as e: - logger.warning(f"Failed to fetch page {current_page_id}: {e}") + except Exception: + logger.exception(f"Failed to fetch page {current_page_id}.") continue if not self.index_recursively: @@ -256,6 +270,9 @@ class RecursiveIndexer: # Fetch child pages start = 0 while True: + logger.info( + f"recurse_children_pages - get_page_by_child_type: id={current_page_id}" + ) child_pages_response = get_page_child_by_type( current_page_id, type="page", @@ -323,11 +340,17 @@ class ConfluenceConnector(LoadConnector, PollConnector): def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: username = credentials["confluence_username"] access_token = credentials["confluence_access_token"] + + # see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py + # for a list of other hidden constructor args self.confluence_client = DanswerConfluence( url=self.wiki_base, username=username if self.is_cloud else None, password=access_token if self.is_cloud else None, token=access_token if not self.is_cloud else None, + backoff_and_retry=True, + max_backoff_retries=60, + max_backoff_seconds=60, ) return None @@ -354,6 +377,9 @@ class ConfluenceConnector(LoadConnector, PollConnector): ) try: + logger.info( + f"_fetch_space - get_all_pages: cursor={cursor} limit={batch_size}" + ) response = get_all_pages( cql=self.cql_query, cursor=cursor, @@ -380,6 +406,9 @@ class ConfluenceConnector(LoadConnector, PollConnector): view_pages: list[dict[str, Any]] = [] for _ in range(self.batch_size): try: + logger.info( + f"_fetch_space - get_all_pages: cursor={cursor} limit=1" + ) response = get_all_pages( cql=self.cql_query, cursor=cursor, @@ -406,6 +435,9 @@ class ConfluenceConnector(LoadConnector, PollConnector): f"Page failed with cql {self.cql_query} with cursor {cursor}, " f"trying alternative expand option: {e}" ) + logger.info( + f"_fetch_space - get_all_pages - trying alternative expand: cursor={cursor} limit=1" + ) response = get_all_pages( cql=self.cql_query, cursor=cursor, @@ -464,6 +496,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): ) try: + logger.info(f"_fetch_comments - get_page_child_by_type: id={page_id}") comment_pages = list( get_page_child_by_type( page_id, @@ -478,9 +511,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): if not self.continue_on_failure: raise e - logger.exception( - "Ran into exception when fetching comments from Confluence" - ) + logger.exception("Fetching comments from Confluence exceptioned") return "" def _fetch_labels(self, confluence_client: Confluence, page_id: str) -> list[str]: @@ -488,13 +519,14 @@ class ConfluenceConnector(LoadConnector, PollConnector): confluence_client.get_page_labels ) try: + logger.info(f"_fetch_labels - get_page_labels: id={page_id}") labels_response = get_page_labels(page_id) return [label["name"] for label in labels_response["results"]] except Exception as e: if not self.continue_on_failure: raise e - logger.exception("Ran into exception when fetching labels from Confluence") + logger.exception("Fetching labels from Confluence exceptioned") return [] @classmethod @@ -531,6 +563,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): ) return None + logger.info(f"_attachment_to_content - _session.get: link={download_link}") response = confluence_client._session.get(download_link) if response.status_code != 200: logger.warning( @@ -589,9 +622,7 @@ class ConfluenceConnector(LoadConnector, PollConnector): return "", [] if not self.continue_on_failure: raise e - logger.exception( - f"Ran into exception when fetching attachments from Confluence: {e}" - ) + logger.exception("Fetching attachments from Confluence exceptioned.") return "\n".join(files_attachment_content), unused_attachments diff --git a/backend/danswer/connectors/confluence/rate_limit_handler.py b/backend/danswer/connectors/confluence/rate_limit_handler.py index c05754bb1..8dbdeba1a 100644 --- a/backend/danswer/connectors/confluence/rate_limit_handler.py +++ b/backend/danswer/connectors/confluence/rate_limit_handler.py @@ -5,11 +5,8 @@ from typing import Any from typing import cast from typing import TypeVar -from redis.exceptions import ConnectionError from requests import HTTPError -from danswer.connectors.interfaces import BaseConnector -from danswer.redis.redis_pool import get_redis_client from danswer.utils.logger import setup_logger logger = setup_logger() @@ -25,110 +22,198 @@ class ConfluenceRateLimitError(Exception): pass +# commenting out while we try using confluence's rate limiter instead +# # https://developer.atlassian.com/cloud/confluence/rate-limiting/ +# def make_confluence_call_handle_rate_limit(confluence_call: F) -> F: +# def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: +# max_retries = 5 +# starting_delay = 5 +# backoff = 2 + +# # max_delay is used when the server doesn't hand back "Retry-After" +# # and we have to decide the retry delay ourselves +# max_delay = 30 # Atlassian uses max_delay = 30 in their examples + +# # max_retry_after is used when we do get a "Retry-After" header +# max_retry_after = 300 # should we really cap the maximum retry delay? + +# NEXT_RETRY_KEY = BaseConnector.REDIS_KEY_PREFIX + "confluence_next_retry" + +# # for testing purposes, rate limiting is written to fall back to a simpler +# # rate limiting approach when redis is not available +# r = get_redis_client() + +# for attempt in range(max_retries): +# try: +# # if multiple connectors are waiting for the next attempt, there could be an issue +# # where many connectors are "released" onto the server at the same time. +# # That's not ideal ... but coming up with a mechanism for queueing +# # all of these connectors is a bigger problem that we want to take on +# # right now +# try: +# next_attempt = r.get(NEXT_RETRY_KEY) +# if next_attempt is None: +# next_attempt = 0 +# else: +# next_attempt = int(cast(int, next_attempt)) + +# # TODO: all connectors need to be interruptible moving forward +# while time.monotonic() < next_attempt: +# time.sleep(1) +# except ConnectionError: +# pass + +# return confluence_call(*args, **kwargs) +# except HTTPError as e: +# # Check if the response or headers are None to avoid potential AttributeError +# if e.response is None or e.response.headers is None: +# logger.warning("HTTPError with `None` as response or as headers") +# raise e + +# retry_after_header = e.response.headers.get("Retry-After") +# if ( +# e.response.status_code == 429 +# or RATE_LIMIT_MESSAGE_LOWERCASE in e.response.text.lower() +# ): +# retry_after = None +# if retry_after_header is not None: +# try: +# retry_after = int(retry_after_header) +# except ValueError: +# pass + +# if retry_after is not None: +# if retry_after > max_retry_after: +# logger.warning( +# f"Clamping retry_after from {retry_after} to {max_delay} seconds..." +# ) +# retry_after = max_delay + +# logger.warning( +# f"Rate limit hit. Retrying after {retry_after} seconds..." +# ) +# try: +# r.set( +# NEXT_RETRY_KEY, +# math.ceil(time.monotonic() + retry_after), +# ) +# except ConnectionError: +# pass +# else: +# logger.warning( +# "Rate limit hit. Retrying with exponential backoff..." +# ) +# delay = min(starting_delay * (backoff**attempt), max_delay) +# delay_until = math.ceil(time.monotonic() + delay) + +# try: +# r.set(NEXT_RETRY_KEY, delay_until) +# except ConnectionError: +# while time.monotonic() < delay_until: +# time.sleep(1) +# else: +# # re-raise, let caller handle +# raise +# except AttributeError as e: +# # Some error within the Confluence library, unclear why it fails. +# # Users reported it to be intermittent, so just retry +# logger.warning(f"Confluence Internal Error, retrying... {e}") +# delay = min(starting_delay * (backoff**attempt), max_delay) +# delay_until = math.ceil(time.monotonic() + delay) +# try: +# r.set(NEXT_RETRY_KEY, delay_until) +# except ConnectionError: +# while time.monotonic() < delay_until: +# time.sleep(1) + +# if attempt == max_retries - 1: +# raise e + +# return cast(F, wrapped_call) + + +def _handle_http_error(e: HTTPError, attempt: int) -> int: + MIN_DELAY = 2 + MAX_DELAY = 60 + STARTING_DELAY = 5 + BACKOFF = 2 + + # Check if the response or headers are None to avoid potential AttributeError + if e.response is None or e.response.headers is None: + logger.warning("HTTPError with `None` as response or as headers") + raise e + + if ( + e.response.status_code != 429 + and RATE_LIMIT_MESSAGE_LOWERCASE not in e.response.text.lower() + ): + raise e + + retry_after = None + + retry_after_header = e.response.headers.get("Retry-After") + if retry_after_header is not None: + try: + retry_after = int(retry_after_header) + if retry_after > MAX_DELAY: + logger.warning( + f"Clamping retry_after from {retry_after} to {MAX_DELAY} seconds..." + ) + retry_after = MAX_DELAY + if retry_after < MIN_DELAY: + retry_after = MIN_DELAY + except ValueError: + pass + + if retry_after is not None: + logger.warning( + f"Rate limiting with retry header. Retrying after {retry_after} seconds..." + ) + delay = retry_after + else: + logger.warning( + "Rate limiting without retry header. Retrying with exponential backoff..." + ) + delay = min(STARTING_DELAY * (BACKOFF**attempt), MAX_DELAY) + + delay_until = math.ceil(time.monotonic() + delay) + return delay_until + + # https://developer.atlassian.com/cloud/confluence/rate-limiting/ +# this uses the native rate limiting option provided by the +# confluence client and otherwise applies a simpler set of error handling def make_confluence_call_handle_rate_limit(confluence_call: F) -> F: def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: - max_retries = 5 - starting_delay = 5 - backoff = 2 + MAX_RETRIES = 5 - # max_delay is used when the server doesn't hand back "Retry-After" - # and we have to decide the retry delay ourselves - max_delay = 30 # Atlassian uses max_delay = 30 in their examples + TIMEOUT = 3600 + timeout_at = time.monotonic() + TIMEOUT - # max_retry_after is used when we do get a "Retry-After" header - max_retry_after = 300 # should we really cap the maximum retry delay? + for attempt in range(MAX_RETRIES): + if time.monotonic() > timeout_at: + raise TimeoutError( + f"Confluence call attempts took longer than {TIMEOUT} seconds." + ) - NEXT_RETRY_KEY = BaseConnector.REDIS_KEY_PREFIX + "confluence_next_retry" - - # for testing purposes, rate limiting is written to fall back to a simpler - # rate limiting approach when redis is not available - r = get_redis_client() - - for attempt in range(max_retries): try: - # if multiple connectors are waiting for the next attempt, there could be an issue - # where many connectors are "released" onto the server at the same time. - # That's not ideal ... but coming up with a mechanism for queueing - # all of these connectors is a bigger problem that we want to take on - # right now - try: - next_attempt = r.get(NEXT_RETRY_KEY) - if next_attempt is None: - next_attempt = 0 - else: - next_attempt = int(cast(int, next_attempt)) - - # TODO: all connectors need to be interruptible moving forward - while time.monotonic() < next_attempt: - time.sleep(1) - except ConnectionError: - pass - + # we're relying more on the client to rate limit itself + # and applying our own retries in a more specific set of circumstances return confluence_call(*args, **kwargs) except HTTPError as e: - # Check if the response or headers are None to avoid potential AttributeError - if e.response is None or e.response.headers is None: - logger.warning("HTTPError with `None` as response or as headers") - raise e - - retry_after_header = e.response.headers.get("Retry-After") - if ( - e.response.status_code == 429 - or RATE_LIMIT_MESSAGE_LOWERCASE in e.response.text.lower() - ): - retry_after = None - if retry_after_header is not None: - try: - retry_after = int(retry_after_header) - except ValueError: - pass - - if retry_after is not None: - if retry_after > max_retry_after: - logger.warning( - f"Clamping retry_after from {retry_after} to {max_delay} seconds..." - ) - retry_after = max_delay - - logger.warning( - f"Rate limit hit. Retrying after {retry_after} seconds..." - ) - try: - r.set( - NEXT_RETRY_KEY, - math.ceil(time.monotonic() + retry_after), - ) - except ConnectionError: - pass - else: - logger.warning( - "Rate limit hit. Retrying with exponential backoff..." - ) - delay = min(starting_delay * (backoff**attempt), max_delay) - delay_until = math.ceil(time.monotonic() + delay) - - try: - r.set(NEXT_RETRY_KEY, delay_until) - except ConnectionError: - while time.monotonic() < delay_until: - time.sleep(1) - else: - # re-raise, let caller handle - raise + delay_until = _handle_http_error(e, attempt) + while time.monotonic() < delay_until: + # in the future, check a signal here to exit + time.sleep(1) except AttributeError as e: # Some error within the Confluence library, unclear why it fails. # Users reported it to be intermittent, so just retry - logger.warning(f"Confluence Internal Error, retrying... {e}") - delay = min(starting_delay * (backoff**attempt), max_delay) - delay_until = math.ceil(time.monotonic() + delay) - try: - r.set(NEXT_RETRY_KEY, delay_until) - except ConnectionError: - while time.monotonic() < delay_until: - time.sleep(1) - - if attempt == max_retries - 1: + if attempt == MAX_RETRIES - 1: raise e + logger.exception( + "Confluence Client raised an AttributeError. Retrying..." + ) + time.sleep(5) + return cast(F, wrapped_call) diff --git a/backend/danswer/db/connector.py b/backend/danswer/db/connector.py index 0f777d30e..835f74d43 100644 --- a/backend/danswer/db/connector.py +++ b/backend/danswer/db/connector.py @@ -248,7 +248,7 @@ def create_initial_default_connector(db_session: Session) -> None: logger.warning( "Default connector does not have expected values. Updating to proper state." ) - # Ensure default connector has correct valuesg + # Ensure default connector has correct values default_connector.source = DocumentSource.INGESTION_API default_connector.input_type = InputType.LOAD_STATE default_connector.refresh_freq = None diff --git a/backend/ee/danswer/external_permissions/confluence/sync_utils.py b/backend/ee/danswer/external_permissions/confluence/sync_utils.py index 183e39059..d6eb22500 100644 --- a/backend/ee/danswer/external_permissions/confluence/sync_utils.py +++ b/backend/ee/danswer/external_permissions/confluence/sync_utils.py @@ -20,6 +20,9 @@ def build_confluence_client( username=credentials_json["confluence_username"] if is_cloud else None, password=credentials_json["confluence_access_token"] if is_cloud else None, token=credentials_json["confluence_access_token"] if not is_cloud else None, + backoff_and_retry=True, + max_backoff_retries=60, + max_backoff_seconds=60, ) diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index bc6c667cc..354103f20 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -1,7 +1,7 @@ aiohttp==3.10.2 alembic==1.10.4 asyncpg==0.27.0 -atlassian-python-api==3.37.0 +atlassian-python-api==3.41.16 beautifulsoup4==4.12.3 boto3==1.34.84 celery==5.5.0b4 @@ -81,5 +81,6 @@ dropbox==11.36.2 boto3-stubs[s3]==1.34.133 ultimate_sitemap_parser==0.5 stripe==10.12.0 +urllib3==2.2.3 mistune==0.8.4 sentry-sdk==2.14.0