Highspot cleanup

This commit is contained in:
Weves
2025-04-30 13:47:52 -07:00
committed by Chris Weaver
parent f68b74ff4a
commit 8515f4b57a

View File

@@ -5,6 +5,8 @@ from typing import Dict
from typing import List
from typing import Optional
from pydantic import BaseModel
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.highspot.client import HighspotClient
@@ -30,6 +32,11 @@ logger = setup_logger()
_SLIM_BATCH_SIZE = 1000
class HighspotSpot(BaseModel):
id: str
name: str
class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
"""
Connector for loading data from Highspot.
@@ -40,7 +47,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
def __init__(
self,
spot_names: List[str] = [],
spot_names: list[str] | None = None,
batch_size: int = INDEX_BATCH_SIZE,
):
"""
@@ -50,11 +57,10 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
spot_names: List of spot names to retrieve content from (if empty, gets all spots)
batch_size: Number of items to retrieve in each batch
"""
self.spot_names = spot_names
self.spot_names = spot_names or []
self.batch_size = batch_size
self._client: Optional[HighspotClient] = None
self._spot_id_map: Dict[str, str] = {} # Maps spot names to spot IDs
self._all_spots_fetched = False
self.highspot_url: Optional[str] = None
self.key: Optional[str] = None
self.secret: Optional[str] = None
@@ -80,60 +86,37 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
self.secret = credentials.get("highspot_secret")
return None
def _populate_spot_id_map(self) -> None:
def _fetch_spots(self) -> list[HighspotSpot]:
"""
Populate the spot ID map with all available spots.
Keys are stored as lowercase for case-insensitive lookups.
"""
try:
spots = self.client.get_spots()
for spot in spots:
if "title" in spot and "id" in spot:
spot_name = spot["title"]
self._spot_id_map[spot_name.lower()] = spot["id"]
return [
HighspotSpot(id=spot["id"], name=spot["title"])
for spot in self.client.get_spots()
]
self._all_spots_fetched = True
logger.info(f"Retrieved {len(self._spot_id_map)} spots from Highspot")
except HighspotClientError as e:
logger.error(f"Error retrieving spots from Highspot: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error retrieving spots from Highspot: {str(e)}")
raise
def _get_all_spot_names(self) -> List[str]:
def _fetch_spots_to_process(self) -> list[HighspotSpot]:
"""
Retrieve all available spot names.
Returns:
List of all spot names
Fetch spots to process based on the configured spot names.
"""
if not self._all_spots_fetched:
self._populate_spot_id_map()
spots = self._fetch_spots()
if not spots:
raise ValueError("No spots found in Highspot.")
return [spot_name for spot_name in self._spot_id_map.keys()]
if self.spot_names:
lower_spot_names = [name.lower() for name in self.spot_names]
spots_to_process = [
spot for spot in spots if spot.name.lower() in lower_spot_names
]
if not spots_to_process:
raise ValueError(
f"No valid spots found in Highspot. Found {spots} "
f"but {self.spot_names} were requested."
)
return spots_to_process
def _get_spot_id_from_name(self, spot_name: str) -> str:
"""
Get spot ID from a spot name.
Args:
spot_name: Name of the spot
Returns:
ID of the spot
Raises:
ValueError: If spot name is not found
"""
if not self._all_spots_fetched:
self._populate_spot_id_map()
spot_name_lower = spot_name.lower()
if spot_name_lower not in self._spot_id_map:
raise ValueError(f"Spot '{spot_name}' not found")
return self._spot_id_map[spot_name_lower]
return spots
def load_from_state(self) -> GenerateDocumentsOutput:
"""
@@ -158,37 +141,26 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
Yields:
Batches of Document objects
"""
spots_to_process = self._fetch_spots_to_process()
doc_batch: list[Document] = []
try:
# If no spots specified, get all spots
spot_names_to_process = self.spot_names
if not spot_names_to_process:
spot_names_to_process = self._get_all_spot_names()
if not spot_names_to_process:
logger.warning("No spots found in Highspot")
raise ValueError("No spots found in Highspot")
logger.info(
f"No spots specified, using all {len(spot_names_to_process)} available spots"
)
for spot_name in spot_names_to_process:
for spot in spots_to_process:
try:
spot_id = self._get_spot_id_from_name(spot_name)
if spot_id is None:
logger.warning(f"Spot ID not found for spot {spot_name}")
continue
offset = 0
has_more = True
while has_more:
logger.info(
f"Retrieving items from spot {spot_name}, offset {offset}"
f"Retrieving items from spot {spot.name}, offset {offset}"
)
response = self.client.get_spot_items(
spot_id=spot_id, offset=offset, page_size=self.batch_size
spot_id=spot.id, offset=offset, page_size=self.batch_size
)
items = response.get("collection", [])
logger.info(f"Received Items: {items}")
logger.info(
f"Received {len(items)} items from spot {spot.name}"
)
if not items:
has_more = False
continue
@@ -248,7 +220,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
source=DocumentSource.HIGHSPOT,
semantic_identifier=title,
metadata={
"spot_name": spot_name,
"spot_name": spot.name,
"type": item_details.get(
"content_type", ""
),
@@ -286,11 +258,13 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
offset += self.batch_size
except (HighspotClientError, ValueError) as e:
logger.error(f"Error processing spot {spot_name}: {str(e)}")
logger.error(f"Error processing spot {spot.name}: {str(e)}")
raise
except Exception as e:
logger.error(
f"Unexpected error processing spot {spot_name}: {str(e)}"
f"Unexpected error processing spot {spot.name}: {str(e)}"
)
raise
except Exception as e:
logger.error(f"Error in Highspot connector: {str(e)}")
@@ -405,31 +379,21 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
Yields:
Batches of SlimDocument objects
"""
spots_to_process = self._fetch_spots_to_process()
slim_doc_batch: list[SlimDocument] = []
try:
# If no spots specified, get all spots
spot_names_to_process = self.spot_names
if not spot_names_to_process:
spot_names_to_process = self._get_all_spot_names()
if not spot_names_to_process:
logger.warning("No spots found in Highspot")
raise ValueError("No spots found in Highspot")
logger.info(
f"No spots specified, using all {len(spot_names_to_process)} available spots for slim documents"
)
for spot_name in spot_names_to_process:
for spot in spots_to_process:
try:
spot_id = self._get_spot_id_from_name(spot_name)
offset = 0
has_more = True
while has_more:
logger.info(
f"Retrieving slim documents from spot {spot_name}, offset {offset}"
f"Retrieving slim documents from spot {spot.name}, offset {offset}"
)
response = self.client.get_spot_items(
spot_id=spot_id, offset=offset, page_size=self.batch_size
spot_id=spot.id, offset=offset, page_size=self.batch_size
)
items = response.get("collection", [])
@@ -440,6 +404,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
for item in items:
item_id = item.get("id")
if not item_id:
logger.warning("Item without ID found, skipping")
continue
slim_doc_batch.append(
@@ -453,15 +418,16 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
has_more = len(items) >= self.batch_size
offset += self.batch_size
except (HighspotClientError, ValueError) as e:
logger.error(
f"Error retrieving slim documents from spot {spot_name}: {str(e)}"
except (HighspotClientError, ValueError):
logger.exception(
f"Error retrieving slim documents from spot {spot.name}"
)
raise
if slim_doc_batch:
yield slim_doc_batch
except Exception as e:
logger.error(f"Error in Highspot Slim Connector: {str(e)}")
except Exception:
logger.exception("Error in Highspot Slim Connector")
raise
def validate_credentials(self) -> bool: