From 2b07c102f98d75db68b9bf005191c745745fc3b1 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Wed, 17 Jul 2024 14:57:40 -0700 Subject: [PATCH] fix discourse connector rate limiting + topic fetching (#1820) --- .../danswer/connectors/discourse/connector.py | 143 ++++++++++-------- 1 file changed, 81 insertions(+), 62 deletions(-) diff --git a/backend/danswer/connectors/discourse/connector.py b/backend/danswer/connectors/discourse/connector.py index a637ff78c..f21bcc198 100644 --- a/backend/danswer/connectors/discourse/connector.py +++ b/backend/danswer/connectors/discourse/connector.py @@ -11,6 +11,9 @@ from requests import Response from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource from danswer.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc +from danswer.connectors.cross_connector_utils.rate_limit_wrapper import ( + rate_limit_builder, +) from danswer.connectors.cross_connector_utils.retry_wrapper import retry_builder from danswer.connectors.interfaces import GenerateDocumentsOutput from danswer.connectors.interfaces import PollConnector @@ -58,67 +61,36 @@ class DiscourseConnector(PollConnector): self.category_id_map: dict[int, str] = {} self.batch_size = batch_size - self.permissions: DiscoursePerms | None = None + self.active_categories: set | None = None + + @rate_limit_builder(max_calls=100, period=60) + def _make_request(self, endpoint: str, params: dict | None = None) -> Response: + if not self.permissions: + raise ConnectorMissingCredentialError("Discourse") + return discourse_request(endpoint, self.permissions, params) def _get_categories_map( self, ) -> None: assert self.permissions is not None categories_endpoint = urllib.parse.urljoin(self.base_url, "categories.json") - response = discourse_request( + response = self._make_request( endpoint=categories_endpoint, - perms=self.permissions, params={"include_subcategories": True}, ) categories = response.json()["category_list"]["categories"] - self.category_id_map = { - category["id"]: category["name"] - for category in categories - if not self.categories or category["name"].lower() in self.categories + cat["id"]: cat["name"] + for cat in categories + if not self.categories or cat["name"].lower() in self.categories } - - def _get_latest_topics( - self, start: datetime | None, end: datetime | None - ) -> list[int]: - assert self.permissions is not None - topic_ids = [] - - valid_categories = set(self.category_id_map.keys()) - - latest_endpoint = urllib.parse.urljoin(self.base_url, "latest.json") - response = discourse_request(endpoint=latest_endpoint, perms=self.permissions) - topics = response.json()["topic_list"]["topics"] - for topic in topics: - last_time = topic.get("last_posted_at") - if not last_time: - continue - last_time_dt = time_str_to_utc(last_time) - - if start and start > last_time_dt: - continue - if end and end < last_time_dt: - continue - - if ( - self.categories - and valid_categories - and topic.get("category_id") not in valid_categories - ): - continue - - topic_ids.append(topic["id"]) - - return topic_ids + self.active_categories = set(self.category_id_map) def _get_doc_from_topic(self, topic_id: int) -> Document: assert self.permissions is not None topic_endpoint = urllib.parse.urljoin(self.base_url, f"t/{topic_id}.json") - response = discourse_request( - endpoint=topic_endpoint, - perms=self.permissions, - ) + response = self._make_request(endpoint=topic_endpoint) topic = response.json() topic_url = urllib.parse.urljoin(self.base_url, f"t/{topic['slug']}") @@ -167,26 +139,78 @@ class DiscourseConnector(PollConnector): ) return doc + def _get_latest_topics( + self, start: datetime | None, end: datetime | None, page: int + ) -> list[int]: + assert self.permissions is not None + topic_ids = [] + + if not self.categories: + latest_endpoint = urllib.parse.urljoin( + self.base_url, f"latest.json?page={page}" + ) + response = self._make_request(endpoint=latest_endpoint) + topics = response.json()["topic_list"]["topics"] + + else: + topics = [] + empty_categories = [] + + for category_id in self.category_id_map.keys(): + category_endpoint = urllib.parse.urljoin( + self.base_url, f"c/{category_id}.json?page={page}&sys=latest" + ) + response = self._make_request(endpoint=category_endpoint) + new_topics = response.json()["topic_list"]["topics"] + + if len(new_topics) == 0: + empty_categories.append(category_id) + topics.extend(new_topics) + + for empty_category in empty_categories: + self.category_id_map.pop(empty_category) + + for topic in topics: + last_time = topic.get("last_posted_at") + if not last_time: + continue + + last_time_dt = time_str_to_utc(last_time) + if (start and start > last_time_dt) or (end and end < last_time_dt): + continue + + topic_ids.append(topic["id"]) + if len(topic_ids) >= self.batch_size: + break + + return topic_ids + def _yield_discourse_documents( - self, topic_ids: list[int] + self, + start: datetime, + end: datetime, ) -> GenerateDocumentsOutput: - doc_batch: list[Document] = [] - for topic_id in topic_ids: - doc_batch.append(self._get_doc_from_topic(topic_id)) + page = 1 + while topic_ids := self._get_latest_topics(start, end, page): + doc_batch: list[Document] = [] + for topic_id in topic_ids: + doc_batch.append(self._get_doc_from_topic(topic_id)) + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] - if len(doc_batch) >= self.batch_size: + if doc_batch: yield doc_batch - doc_batch = [] + page += 1 - if doc_batch: - yield doc_batch - - def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + def load_credentials( + self, + credentials: dict[str, Any], + ) -> dict[str, Any] | None: self.permissions = DiscoursePerms( api_key=credentials["discourse_api_key"], api_username=credentials["discourse_api_username"], ) - return None def poll_source( @@ -194,16 +218,13 @@ class DiscourseConnector(PollConnector): ) -> GenerateDocumentsOutput: if self.permissions is None: raise ConnectorMissingCredentialError("Discourse") + start_datetime = datetime.utcfromtimestamp(start).replace(tzinfo=timezone.utc) end_datetime = datetime.utcfromtimestamp(end).replace(tzinfo=timezone.utc) self._get_categories_map() - latest_topic_ids = self._get_latest_topics( - start=start_datetime, end=end_datetime - ) - - yield from self._yield_discourse_documents(latest_topic_ids) + yield from self._yield_discourse_documents(start_datetime, end_datetime) if __name__ == "__main__": @@ -219,7 +240,5 @@ if __name__ == "__main__": current = time.time() one_year_ago = current - 24 * 60 * 60 * 360 - latest_docs = connector.poll_source(one_year_ago, current) - print(next(latest_docs))