import json from collections.abc import Generator from datetime import datetime from datetime import timezone from typing import Any from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session # type: ignore from onyx.configs.app_configs import INDEX_BATCH_SIZE from onyx.configs.constants import DocumentSource from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc from onyx.connectors.interfaces import GenerateDocumentsOutput from onyx.connectors.interfaces import LoadConnector from onyx.connectors.interfaces import PollConnector from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.models import BasicExpertInfo from onyx.connectors.models import ConnectorMissingCredentialError from onyx.connectors.models import Document from onyx.connectors.models import Section from onyx.file_processing.html_utils import parse_html_page_basic from onyx.file_processing.html_utils import strip_excessive_newlines_and_spaces from onyx.utils.logger import setup_logger LOOPIO_API_BASE = "https://api.loopio.com/" LOOPIO_AUTH_URL = LOOPIO_API_BASE + "oauth2/access_token" LOOPIO_DATA_URL = LOOPIO_API_BASE + "data/" logger = setup_logger() class LoopioConnector(LoadConnector, PollConnector): def __init__( self, loopio_stack_name: str | None = None, batch_size: int = INDEX_BATCH_SIZE, ) -> None: self.batch_size = batch_size self.loopio_client_id: str | None = None self.loopio_client_token: str | None = None self.loopio_stack_name = loopio_stack_name def _fetch_data( self, resource: str, params: dict[str, str | int] ) -> Generator[dict[str, Any], None, None]: client = BackendApplicationClient( client_id=self.loopio_client_id, scope=["library:read"] ) session = OAuth2Session(client=client) session.fetch_token( token_url=LOOPIO_AUTH_URL, client_id=self.loopio_client_id, client_secret=self.loopio_client_token, ) page = 0 stop_at_page = 1 while (page := page + 1) <= stop_at_page: params["page"] = page response = session.request( "GET", LOOPIO_DATA_URL + resource, headers={"Accept": "application/json"}, params=params, ) if response.status_code == 400: logger.error( f"Loopio API returned 400 for {resource} with params {params}", ) logger.error(response.text) response.raise_for_status() response_data = json.loads(response.text) stop_at_page = response_data.get("totalPages", 1) yield response_data def _build_search_filter( self, stack_name: str | None, start: str | None, end: str | None ) -> dict[str, Any]: filter: dict[str, Any] = {} if start is not None and end is not None: filter["lastUpdatedDate"] = {"gte": start, "lt": end} if stack_name is not None: # Right now this is fetching the stacks every time, which is not ideal. # We should update this later to store the ID when we create the Connector for stack in self._fetch_data(resource="v2/stacks", params={}): for item in stack["items"]: if item["name"] == stack_name: filter["locations"] = [{"stackID": item["id"]}] break if "locations" not in filter: raise ValueError(f"Stack {stack_name} not found in Loopio") return filter def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: self.loopio_subdomain = credentials["loopio_subdomain"] self.loopio_client_id = credentials["loopio_client_id"] self.loopio_client_token = credentials["loopio_client_token"] return None def _process_entries( self, start: str | None = None, end: str | None = None ) -> GenerateDocumentsOutput: if self.loopio_client_id is None or self.loopio_client_token is None: raise ConnectorMissingCredentialError("Loopio") filter = self._build_search_filter( stack_name=self.loopio_stack_name, start=start, end=end ) params: dict[str, str | int] = {"pageSize": self.batch_size} params["filter"] = json.dumps(filter) doc_batch: list[Document] = [] for library_entries in self._fetch_data( resource="v2/libraryEntries", params=params ): for entry in library_entries.get("items", []): link = f"https://{self.loopio_subdomain}.loopio.com/library?entry={entry['id']}" topic = "/".join( part["name"] for part in entry["location"].values() if part ) answer = parse_html_page_basic(entry.get("answer", {}).get("text", "")) questions = [ question.get("text").replace("\xa0", " ") for question in entry["questions"] if question.get("text") ] questions_string = strip_excessive_newlines_and_spaces( "\n".join(questions) ) content_text = f"{answer}\n\nRelated Questions: {questions_string}" content_text = strip_excessive_newlines_and_spaces( content_text.replace("\xa0", " ") ) last_updated = time_str_to_utc(entry["lastUpdatedDate"]) last_reviewed = ( time_str_to_utc(entry["lastReviewedDate"]) if entry.get("lastReviewedDate") else None ) # For Onyx, we decay document score overtime, either last_updated or # last_reviewed is a good enough signal for the document's recency latest_time = ( max(last_reviewed, last_updated) if last_reviewed else last_updated ) creator = entry.get("creator") last_updated_by = entry.get("lastUpdatedBy") last_reviewed_by = entry.get("lastReviewedBy") primary_owners: list[BasicExpertInfo] = [ BasicExpertInfo(display_name=owner.get("name")) for owner in [creator, last_updated_by] if owner is not None ] secondary_owners: list[BasicExpertInfo] = [ BasicExpertInfo(display_name=owner.get("name")) for owner in [last_reviewed_by] if owner is not None ] doc_batch.append( Document( id=str(entry["id"]), sections=[Section(link=link, text=content_text)], source=DocumentSource.LOOPIO, semantic_identifier=questions[0], doc_updated_at=latest_time, primary_owners=primary_owners, secondary_owners=secondary_owners, metadata={ "topic": topic, "questions": "\n".join(questions), "creator": creator.get("name") if creator else "", }, ) ) if len(doc_batch) >= self.batch_size: yield doc_batch doc_batch = [] if len(doc_batch) > 0: yield doc_batch def load_from_state(self) -> GenerateDocumentsOutput: return self._process_entries() def poll_source( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch ) -> GenerateDocumentsOutput: start_time = datetime.fromtimestamp(start, tz=timezone.utc).isoformat( timespec="seconds" ) end_time = datetime.fromtimestamp(end, tz=timezone.utc).isoformat( timespec="seconds" ) return self._process_entries(start_time, end_time) if __name__ == "__main__": import os connector = LoopioConnector( loopio_stack_name=os.environ.get("LOOPIO_STACK_NAME", None) ) connector.load_credentials( { "loopio_client_id": os.environ["LOOPIO_CLIENT_ID"], "loopio_client_token": os.environ["LOOPIO_CLIENT_TOKEN"], "loopio_subdomain": os.environ["LOOPIO_SUBDOMAIN"], } ) latest_docs = connector.load_from_state() print(next(latest_docs))