Fix slack pagination

This commit is contained in:
Weves 2023-08-15 17:17:55 -07:00 committed by Chris Weaver
parent 78b49f546c
commit 8fc74a4313
2 changed files with 16 additions and 12 deletions

View File

@ -36,7 +36,7 @@ ThreadType = list[MessageType]
def _make_paginated_slack_api_call(
call: Callable[..., SlackResponse], **kwargs: Any
) -> list[dict[str, Any]]:
) -> Generator[dict[str, Any], None, None]:
return make_slack_api_call_paginated(
make_slack_api_rate_limited(make_slack_api_call_logged(call))
)(**kwargs)
@ -50,9 +50,9 @@ def _make_slack_api_call(
def get_channel_info(client: WebClient, channel_id: str) -> ChannelType:
"""Get information about a channel. Needed to convert channel ID to channel name"""
return _make_paginated_slack_api_call(
client.conversations_info, channel=channel_id
)[0]["channel"]
return _make_slack_api_call(client.conversations_info, channel=channel_id)[0][
"channel"
]
def get_channels(

View File

@ -1,6 +1,7 @@
import re
import time
from collections.abc import Callable
from collections.abc import Generator
from functools import wraps
from typing import Any
from typing import cast
@ -45,20 +46,20 @@ def make_slack_api_call_logged(
def make_slack_api_call_paginated(
call: Callable[..., SlackResponse],
) -> Callable[..., list[dict[str, Any]]]:
) -> Callable[..., Generator[dict[str, Any], None, None]]:
"""Wraps calls to slack API so that they automatically handle pagination"""
@wraps(call)
def paginated_call(**kwargs: Any) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
def paginated_call(**kwargs: Any) -> Generator[dict[str, Any], None, None]:
cursor: str | None = None
has_more = True
while has_more:
for result in call(cursor=cursor, limit=_SLACK_LIMIT, **kwargs):
has_more = result.get("has_more", False)
cursor = result.get("response_metadata", {}).get("next_cursor", "")
results.append(cast(dict[str, Any], result))
return results
response = call(cursor=cursor, limit=_SLACK_LIMIT, **kwargs)
yield cast(dict[str, Any], response.validate())
cursor = cast(dict[str, Any], response.get("response_metadata", {})).get(
"next_cursor", ""
)
has_more = bool(cursor)
return paginated_call
@ -84,6 +85,9 @@ def make_slack_api_rate_limited(
if e.response["error"] == "ratelimited":
# Handle rate limiting: get the 'Retry-After' header value and sleep for that duration
retry_after = int(e.response.headers.get("Retry-After", 1))
logger.info(
f"Slack call rate limited, retrying after {retry_after} seconds. Exception: {e}"
)
time.sleep(retry_after)
else:
# Raise the error for non-transient errors