fix discourse connector rate limiting + topic fetching (#1820)

This commit is contained in:
pablodanswer 2024-07-17 14:57:40 -07:00 committed by GitHub
parent e93de602c3
commit 2b07c102f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -11,6 +11,9 @@ from requests import Response
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from danswer.connectors.cross_connector_utils.rate_limit_wrapper import (
rate_limit_builder,
)
from danswer.connectors.cross_connector_utils.retry_wrapper import retry_builder
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import PollConnector
@ -58,67 +61,36 @@ class DiscourseConnector(PollConnector):
self.category_id_map: dict[int, str] = {}
self.batch_size = batch_size
self.permissions: DiscoursePerms | None = None
self.active_categories: set | None = None
@rate_limit_builder(max_calls=100, period=60)
def _make_request(self, endpoint: str, params: dict | None = None) -> Response:
if not self.permissions:
raise ConnectorMissingCredentialError("Discourse")
return discourse_request(endpoint, self.permissions, params)
def _get_categories_map(
self,
) -> None:
assert self.permissions is not None
categories_endpoint = urllib.parse.urljoin(self.base_url, "categories.json")
response = discourse_request(
response = self._make_request(
endpoint=categories_endpoint,
perms=self.permissions,
params={"include_subcategories": True},
)
categories = response.json()["category_list"]["categories"]
self.category_id_map = {
category["id"]: category["name"]
for category in categories
if not self.categories or category["name"].lower() in self.categories
cat["id"]: cat["name"]
for cat in categories
if not self.categories or cat["name"].lower() in self.categories
}
def _get_latest_topics(
self, start: datetime | None, end: datetime | None
) -> list[int]:
assert self.permissions is not None
topic_ids = []
valid_categories = set(self.category_id_map.keys())
latest_endpoint = urllib.parse.urljoin(self.base_url, "latest.json")
response = discourse_request(endpoint=latest_endpoint, perms=self.permissions)
topics = response.json()["topic_list"]["topics"]
for topic in topics:
last_time = topic.get("last_posted_at")
if not last_time:
continue
last_time_dt = time_str_to_utc(last_time)
if start and start > last_time_dt:
continue
if end and end < last_time_dt:
continue
if (
self.categories
and valid_categories
and topic.get("category_id") not in valid_categories
):
continue
topic_ids.append(topic["id"])
return topic_ids
self.active_categories = set(self.category_id_map)
def _get_doc_from_topic(self, topic_id: int) -> Document:
assert self.permissions is not None
topic_endpoint = urllib.parse.urljoin(self.base_url, f"t/{topic_id}.json")
response = discourse_request(
endpoint=topic_endpoint,
perms=self.permissions,
)
response = self._make_request(endpoint=topic_endpoint)
topic = response.json()
topic_url = urllib.parse.urljoin(self.base_url, f"t/{topic['slug']}")
@ -167,26 +139,78 @@ class DiscourseConnector(PollConnector):
)
return doc
def _get_latest_topics(
self, start: datetime | None, end: datetime | None, page: int
) -> list[int]:
assert self.permissions is not None
topic_ids = []
if not self.categories:
latest_endpoint = urllib.parse.urljoin(
self.base_url, f"latest.json?page={page}"
)
response = self._make_request(endpoint=latest_endpoint)
topics = response.json()["topic_list"]["topics"]
else:
topics = []
empty_categories = []
for category_id in self.category_id_map.keys():
category_endpoint = urllib.parse.urljoin(
self.base_url, f"c/{category_id}.json?page={page}&sys=latest"
)
response = self._make_request(endpoint=category_endpoint)
new_topics = response.json()["topic_list"]["topics"]
if len(new_topics) == 0:
empty_categories.append(category_id)
topics.extend(new_topics)
for empty_category in empty_categories:
self.category_id_map.pop(empty_category)
for topic in topics:
last_time = topic.get("last_posted_at")
if not last_time:
continue
last_time_dt = time_str_to_utc(last_time)
if (start and start > last_time_dt) or (end and end < last_time_dt):
continue
topic_ids.append(topic["id"])
if len(topic_ids) >= self.batch_size:
break
return topic_ids
def _yield_discourse_documents(
self, topic_ids: list[int]
self,
start: datetime,
end: datetime,
) -> GenerateDocumentsOutput:
doc_batch: list[Document] = []
for topic_id in topic_ids:
doc_batch.append(self._get_doc_from_topic(topic_id))
page = 1
while topic_ids := self._get_latest_topics(start, end, page):
doc_batch: list[Document] = []
for topic_id in topic_ids:
doc_batch.append(self._get_doc_from_topic(topic_id))
if len(doc_batch) >= self.batch_size:
yield doc_batch
doc_batch = []
if len(doc_batch) >= self.batch_size:
if doc_batch:
yield doc_batch
doc_batch = []
page += 1
if doc_batch:
yield doc_batch
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
def load_credentials(
self,
credentials: dict[str, Any],
) -> dict[str, Any] | None:
self.permissions = DiscoursePerms(
api_key=credentials["discourse_api_key"],
api_username=credentials["discourse_api_username"],
)
return None
def poll_source(
@ -194,16 +218,13 @@ class DiscourseConnector(PollConnector):
) -> GenerateDocumentsOutput:
if self.permissions is None:
raise ConnectorMissingCredentialError("Discourse")
start_datetime = datetime.utcfromtimestamp(start).replace(tzinfo=timezone.utc)
end_datetime = datetime.utcfromtimestamp(end).replace(tzinfo=timezone.utc)
self._get_categories_map()
latest_topic_ids = self._get_latest_topics(
start=start_datetime, end=end_datetime
)
yield from self._yield_discourse_documents(latest_topic_ids)
yield from self._yield_discourse_documents(start_datetime, end_datetime)
if __name__ == "__main__":
@ -219,7 +240,5 @@ if __name__ == "__main__":
current = time.time()
one_year_ago = current - 24 * 60 * 60 * 360
latest_docs = connector.poll_source(one_year_ago, current)
print(next(latest_docs))