use native rate limiting in the confluence client (#2837)

* use native rate limiting in the confluence client

* upgrade urllib3 to v2.2.3 to support retries in confluence client

* improve logging so that progress is visible.
This commit is contained in:
rkuo-danswer
2024-10-18 11:15:43 -07:00
committed by GitHub
parent 59364aadd7
commit 5b78299880
5 changed files with 230 additions and 110 deletions

View File

@@ -105,6 +105,7 @@ def _get_user(user_id: str, confluence_client: DanswerConfluence) -> str:
confluence_client.get_user_details_by_accountid confluence_client.get_user_details_by_accountid
) )
try: try:
logger.info(f"_get_user - get_user_details_by_accountid: id={user_id}")
return get_user_details_by_accountid(user_id).get("displayName", user_not_found) return get_user_details_by_accountid(user_id).get("displayName", user_not_found)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
@@ -156,6 +157,9 @@ def _comment_dfs(
comment_html, confluence_client comment_html, confluence_client
) )
try: try:
logger.info(
f"_comment_dfs - get_page_by_child_type: id={comment_page['id']}"
)
child_comment_pages = get_page_child_by_type( child_comment_pages = get_page_child_by_type(
comment_page["id"], comment_page["id"],
type="comment", type="comment",
@@ -212,13 +216,16 @@ class RecursiveIndexer:
self.confluence_client.get_page_by_id self.confluence_client.get_page_by_id
) )
try: try:
logger.info(
f"_fetch_origin_page - get_page_by_id: id={self.origin_page_id}"
)
origin_page = get_page_by_id( origin_page = get_page_by_id(
self.origin_page_id, expand="body.storage.value,version,space" self.origin_page_id, expand="body.storage.value,version,space"
) )
return origin_page return origin_page
except Exception as e: except Exception:
logger.warning( logger.exception(
f"Appending origin page with id {self.origin_page_id} failed: {e}" f"Appending origin page with id {self.origin_page_id} failed."
) )
return {} return {}
@@ -230,6 +237,10 @@ class RecursiveIndexer:
queue: list[str] = [page_id] queue: list[str] = [page_id]
visited_pages: set[str] = set() visited_pages: set[str] = set()
get_page_by_id = make_confluence_call_handle_rate_limit(
self.confluence_client.get_page_by_id
)
get_page_child_by_type = make_confluence_call_handle_rate_limit( get_page_child_by_type = make_confluence_call_handle_rate_limit(
self.confluence_client.get_page_child_by_type self.confluence_client.get_page_child_by_type
) )
@@ -242,12 +253,15 @@ class RecursiveIndexer:
try: try:
# Fetch the page itself # Fetch the page itself
page = self.confluence_client.get_page_by_id( logger.info(
f"recurse_children_pages - get_page_by_id: id={current_page_id}"
)
page = get_page_by_id(
current_page_id, expand="body.storage.value,version,space" current_page_id, expand="body.storage.value,version,space"
) )
pages.append(page) pages.append(page)
except Exception as e: except Exception:
logger.warning(f"Failed to fetch page {current_page_id}: {e}") logger.exception(f"Failed to fetch page {current_page_id}.")
continue continue
if not self.index_recursively: if not self.index_recursively:
@@ -256,6 +270,9 @@ class RecursiveIndexer:
# Fetch child pages # Fetch child pages
start = 0 start = 0
while True: while True:
logger.info(
f"recurse_children_pages - get_page_by_child_type: id={current_page_id}"
)
child_pages_response = get_page_child_by_type( child_pages_response = get_page_child_by_type(
current_page_id, current_page_id,
type="page", type="page",
@@ -323,11 +340,17 @@ class ConfluenceConnector(LoadConnector, PollConnector):
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
username = credentials["confluence_username"] username = credentials["confluence_username"]
access_token = credentials["confluence_access_token"] access_token = credentials["confluence_access_token"]
# see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py
# for a list of other hidden constructor args
self.confluence_client = DanswerConfluence( self.confluence_client = DanswerConfluence(
url=self.wiki_base, url=self.wiki_base,
username=username if self.is_cloud else None, username=username if self.is_cloud else None,
password=access_token if self.is_cloud else None, password=access_token if self.is_cloud else None,
token=access_token if not self.is_cloud else None, token=access_token if not self.is_cloud else None,
backoff_and_retry=True,
max_backoff_retries=60,
max_backoff_seconds=60,
) )
return None return None
@@ -354,6 +377,9 @@ class ConfluenceConnector(LoadConnector, PollConnector):
) )
try: try:
logger.info(
f"_fetch_space - get_all_pages: cursor={cursor} limit={batch_size}"
)
response = get_all_pages( response = get_all_pages(
cql=self.cql_query, cql=self.cql_query,
cursor=cursor, cursor=cursor,
@@ -380,6 +406,9 @@ class ConfluenceConnector(LoadConnector, PollConnector):
view_pages: list[dict[str, Any]] = [] view_pages: list[dict[str, Any]] = []
for _ in range(self.batch_size): for _ in range(self.batch_size):
try: try:
logger.info(
f"_fetch_space - get_all_pages: cursor={cursor} limit=1"
)
response = get_all_pages( response = get_all_pages(
cql=self.cql_query, cql=self.cql_query,
cursor=cursor, cursor=cursor,
@@ -406,6 +435,9 @@ class ConfluenceConnector(LoadConnector, PollConnector):
f"Page failed with cql {self.cql_query} with cursor {cursor}, " f"Page failed with cql {self.cql_query} with cursor {cursor}, "
f"trying alternative expand option: {e}" f"trying alternative expand option: {e}"
) )
logger.info(
f"_fetch_space - get_all_pages - trying alternative expand: cursor={cursor} limit=1"
)
response = get_all_pages( response = get_all_pages(
cql=self.cql_query, cql=self.cql_query,
cursor=cursor, cursor=cursor,
@@ -464,6 +496,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
) )
try: try:
logger.info(f"_fetch_comments - get_page_child_by_type: id={page_id}")
comment_pages = list( comment_pages = list(
get_page_child_by_type( get_page_child_by_type(
page_id, page_id,
@@ -478,9 +511,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
if not self.continue_on_failure: if not self.continue_on_failure:
raise e raise e
logger.exception( logger.exception("Fetching comments from Confluence exceptioned")
"Ran into exception when fetching comments from Confluence"
)
return "" return ""
def _fetch_labels(self, confluence_client: Confluence, page_id: str) -> list[str]: def _fetch_labels(self, confluence_client: Confluence, page_id: str) -> list[str]:
@@ -488,13 +519,14 @@ class ConfluenceConnector(LoadConnector, PollConnector):
confluence_client.get_page_labels confluence_client.get_page_labels
) )
try: try:
logger.info(f"_fetch_labels - get_page_labels: id={page_id}")
labels_response = get_page_labels(page_id) labels_response = get_page_labels(page_id)
return [label["name"] for label in labels_response["results"]] return [label["name"] for label in labels_response["results"]]
except Exception as e: except Exception as e:
if not self.continue_on_failure: if not self.continue_on_failure:
raise e raise e
logger.exception("Ran into exception when fetching labels from Confluence") logger.exception("Fetching labels from Confluence exceptioned")
return [] return []
@classmethod @classmethod
@@ -531,6 +563,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
) )
return None return None
logger.info(f"_attachment_to_content - _session.get: link={download_link}")
response = confluence_client._session.get(download_link) response = confluence_client._session.get(download_link)
if response.status_code != 200: if response.status_code != 200:
logger.warning( logger.warning(
@@ -589,9 +622,7 @@ class ConfluenceConnector(LoadConnector, PollConnector):
return "", [] return "", []
if not self.continue_on_failure: if not self.continue_on_failure:
raise e raise e
logger.exception( logger.exception("Fetching attachments from Confluence exceptioned.")
f"Ran into exception when fetching attachments from Confluence: {e}"
)
return "\n".join(files_attachment_content), unused_attachments return "\n".join(files_attachment_content), unused_attachments

View File

@@ -5,11 +5,8 @@ from typing import Any
from typing import cast from typing import cast
from typing import TypeVar from typing import TypeVar
from redis.exceptions import ConnectionError
from requests import HTTPError from requests import HTTPError
from danswer.connectors.interfaces import BaseConnector
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
@@ -25,110 +22,198 @@ class ConfluenceRateLimitError(Exception):
pass pass
# https://developer.atlassian.com/cloud/confluence/rate-limiting/ # commenting out while we try using confluence's rate limiter instead
def make_confluence_call_handle_rate_limit(confluence_call: F) -> F: # # https://developer.atlassian.com/cloud/confluence/rate-limiting/
def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: # def make_confluence_call_handle_rate_limit(confluence_call: F) -> F:
max_retries = 5 # def wrapped_call(*args: list[Any], **kwargs: Any) -> Any:
starting_delay = 5 # max_retries = 5
backoff = 2 # starting_delay = 5
# backoff = 2
# max_delay is used when the server doesn't hand back "Retry-After" # # max_delay is used when the server doesn't hand back "Retry-After"
# and we have to decide the retry delay ourselves # # and we have to decide the retry delay ourselves
max_delay = 30 # Atlassian uses max_delay = 30 in their examples # max_delay = 30 # Atlassian uses max_delay = 30 in their examples
# max_retry_after is used when we do get a "Retry-After" header # # max_retry_after is used when we do get a "Retry-After" header
max_retry_after = 300 # should we really cap the maximum retry delay? # max_retry_after = 300 # should we really cap the maximum retry delay?
NEXT_RETRY_KEY = BaseConnector.REDIS_KEY_PREFIX + "confluence_next_retry" # NEXT_RETRY_KEY = BaseConnector.REDIS_KEY_PREFIX + "confluence_next_retry"
# for testing purposes, rate limiting is written to fall back to a simpler # # for testing purposes, rate limiting is written to fall back to a simpler
# rate limiting approach when redis is not available # # rate limiting approach when redis is not available
r = get_redis_client() # r = get_redis_client()
for attempt in range(max_retries): # for attempt in range(max_retries):
try: # try:
# if multiple connectors are waiting for the next attempt, there could be an issue # # if multiple connectors are waiting for the next attempt, there could be an issue
# where many connectors are "released" onto the server at the same time. # # where many connectors are "released" onto the server at the same time.
# That's not ideal ... but coming up with a mechanism for queueing # # That's not ideal ... but coming up with a mechanism for queueing
# all of these connectors is a bigger problem that we want to take on # # all of these connectors is a bigger problem that we want to take on
# right now # # right now
try: # try:
next_attempt = r.get(NEXT_RETRY_KEY) # next_attempt = r.get(NEXT_RETRY_KEY)
if next_attempt is None: # if next_attempt is None:
next_attempt = 0 # next_attempt = 0
else: # else:
next_attempt = int(cast(int, next_attempt)) # next_attempt = int(cast(int, next_attempt))
# TODO: all connectors need to be interruptible moving forward # # TODO: all connectors need to be interruptible moving forward
while time.monotonic() < next_attempt: # while time.monotonic() < next_attempt:
time.sleep(1) # time.sleep(1)
except ConnectionError: # except ConnectionError:
pass # pass
# return confluence_call(*args, **kwargs)
# except HTTPError as e:
# # Check if the response or headers are None to avoid potential AttributeError
# if e.response is None or e.response.headers is None:
# logger.warning("HTTPError with `None` as response or as headers")
# raise e
# retry_after_header = e.response.headers.get("Retry-After")
# if (
# e.response.status_code == 429
# or RATE_LIMIT_MESSAGE_LOWERCASE in e.response.text.lower()
# ):
# retry_after = None
# if retry_after_header is not None:
# try:
# retry_after = int(retry_after_header)
# except ValueError:
# pass
# if retry_after is not None:
# if retry_after > max_retry_after:
# logger.warning(
# f"Clamping retry_after from {retry_after} to {max_delay} seconds..."
# )
# retry_after = max_delay
# logger.warning(
# f"Rate limit hit. Retrying after {retry_after} seconds..."
# )
# try:
# r.set(
# NEXT_RETRY_KEY,
# math.ceil(time.monotonic() + retry_after),
# )
# except ConnectionError:
# pass
# else:
# logger.warning(
# "Rate limit hit. Retrying with exponential backoff..."
# )
# delay = min(starting_delay * (backoff**attempt), max_delay)
# delay_until = math.ceil(time.monotonic() + delay)
# try:
# r.set(NEXT_RETRY_KEY, delay_until)
# except ConnectionError:
# while time.monotonic() < delay_until:
# time.sleep(1)
# else:
# # re-raise, let caller handle
# raise
# except AttributeError as e:
# # Some error within the Confluence library, unclear why it fails.
# # Users reported it to be intermittent, so just retry
# logger.warning(f"Confluence Internal Error, retrying... {e}")
# delay = min(starting_delay * (backoff**attempt), max_delay)
# delay_until = math.ceil(time.monotonic() + delay)
# try:
# r.set(NEXT_RETRY_KEY, delay_until)
# except ConnectionError:
# while time.monotonic() < delay_until:
# time.sleep(1)
# if attempt == max_retries - 1:
# raise e
# return cast(F, wrapped_call)
def _handle_http_error(e: HTTPError, attempt: int) -> int:
MIN_DELAY = 2
MAX_DELAY = 60
STARTING_DELAY = 5
BACKOFF = 2
return confluence_call(*args, **kwargs)
except HTTPError as e:
# Check if the response or headers are None to avoid potential AttributeError # Check if the response or headers are None to avoid potential AttributeError
if e.response is None or e.response.headers is None: if e.response is None or e.response.headers is None:
logger.warning("HTTPError with `None` as response or as headers") logger.warning("HTTPError with `None` as response or as headers")
raise e raise e
retry_after_header = e.response.headers.get("Retry-After")
if ( if (
e.response.status_code == 429 e.response.status_code != 429
or RATE_LIMIT_MESSAGE_LOWERCASE in e.response.text.lower() and RATE_LIMIT_MESSAGE_LOWERCASE not in e.response.text.lower()
): ):
raise e
retry_after = None retry_after = None
retry_after_header = e.response.headers.get("Retry-After")
if retry_after_header is not None: if retry_after_header is not None:
try: try:
retry_after = int(retry_after_header) retry_after = int(retry_after_header)
if retry_after > MAX_DELAY:
logger.warning(
f"Clamping retry_after from {retry_after} to {MAX_DELAY} seconds..."
)
retry_after = MAX_DELAY
if retry_after < MIN_DELAY:
retry_after = MIN_DELAY
except ValueError: except ValueError:
pass pass
if retry_after is not None: if retry_after is not None:
if retry_after > max_retry_after:
logger.warning( logger.warning(
f"Clamping retry_after from {retry_after} to {max_delay} seconds..." f"Rate limiting with retry header. Retrying after {retry_after} seconds..."
) )
retry_after = max_delay delay = retry_after
logger.warning(
f"Rate limit hit. Retrying after {retry_after} seconds..."
)
try:
r.set(
NEXT_RETRY_KEY,
math.ceil(time.monotonic() + retry_after),
)
except ConnectionError:
pass
else: else:
logger.warning( logger.warning(
"Rate limit hit. Retrying with exponential backoff..." "Rate limiting without retry header. Retrying with exponential backoff..."
) )
delay = min(starting_delay * (backoff**attempt), max_delay) delay = min(STARTING_DELAY * (BACKOFF**attempt), MAX_DELAY)
delay_until = math.ceil(time.monotonic() + delay) delay_until = math.ceil(time.monotonic() + delay)
return delay_until
# https://developer.atlassian.com/cloud/confluence/rate-limiting/
# this uses the native rate limiting option provided by the
# confluence client and otherwise applies a simpler set of error handling
def make_confluence_call_handle_rate_limit(confluence_call: F) -> F:
def wrapped_call(*args: list[Any], **kwargs: Any) -> Any:
MAX_RETRIES = 5
TIMEOUT = 3600
timeout_at = time.monotonic() + TIMEOUT
for attempt in range(MAX_RETRIES):
if time.monotonic() > timeout_at:
raise TimeoutError(
f"Confluence call attempts took longer than {TIMEOUT} seconds."
)
try: try:
r.set(NEXT_RETRY_KEY, delay_until) # we're relying more on the client to rate limit itself
except ConnectionError: # and applying our own retries in a more specific set of circumstances
return confluence_call(*args, **kwargs)
except HTTPError as e:
delay_until = _handle_http_error(e, attempt)
while time.monotonic() < delay_until: while time.monotonic() < delay_until:
# in the future, check a signal here to exit
time.sleep(1) time.sleep(1)
else:
# re-raise, let caller handle
raise
except AttributeError as e: except AttributeError as e:
# Some error within the Confluence library, unclear why it fails. # Some error within the Confluence library, unclear why it fails.
# Users reported it to be intermittent, so just retry # Users reported it to be intermittent, so just retry
logger.warning(f"Confluence Internal Error, retrying... {e}") if attempt == MAX_RETRIES - 1:
delay = min(starting_delay * (backoff**attempt), max_delay)
delay_until = math.ceil(time.monotonic() + delay)
try:
r.set(NEXT_RETRY_KEY, delay_until)
except ConnectionError:
while time.monotonic() < delay_until:
time.sleep(1)
if attempt == max_retries - 1:
raise e raise e
logger.exception(
"Confluence Client raised an AttributeError. Retrying..."
)
time.sleep(5)
return cast(F, wrapped_call) return cast(F, wrapped_call)

View File

@@ -248,7 +248,7 @@ def create_initial_default_connector(db_session: Session) -> None:
logger.warning( logger.warning(
"Default connector does not have expected values. Updating to proper state." "Default connector does not have expected values. Updating to proper state."
) )
# Ensure default connector has correct valuesg # Ensure default connector has correct values
default_connector.source = DocumentSource.INGESTION_API default_connector.source = DocumentSource.INGESTION_API
default_connector.input_type = InputType.LOAD_STATE default_connector.input_type = InputType.LOAD_STATE
default_connector.refresh_freq = None default_connector.refresh_freq = None

View File

@@ -20,6 +20,9 @@ def build_confluence_client(
username=credentials_json["confluence_username"] if is_cloud else None, username=credentials_json["confluence_username"] if is_cloud else None,
password=credentials_json["confluence_access_token"] if is_cloud else None, password=credentials_json["confluence_access_token"] if is_cloud else None,
token=credentials_json["confluence_access_token"] if not is_cloud else None, token=credentials_json["confluence_access_token"] if not is_cloud else None,
backoff_and_retry=True,
max_backoff_retries=60,
max_backoff_seconds=60,
) )

View File

@@ -1,7 +1,7 @@
aiohttp==3.10.2 aiohttp==3.10.2
alembic==1.10.4 alembic==1.10.4
asyncpg==0.27.0 asyncpg==0.27.0
atlassian-python-api==3.37.0 atlassian-python-api==3.41.16
beautifulsoup4==4.12.3 beautifulsoup4==4.12.3
boto3==1.34.84 boto3==1.34.84
celery==5.5.0b4 celery==5.5.0b4
@@ -81,5 +81,6 @@ dropbox==11.36.2
boto3-stubs[s3]==1.34.133 boto3-stubs[s3]==1.34.133
ultimate_sitemap_parser==0.5 ultimate_sitemap_parser==0.5
stripe==10.12.0 stripe==10.12.0
urllib3==2.2.3
mistune==0.8.4 mistune==0.8.4
sentry-sdk==2.14.0 sentry-sdk==2.14.0