diff --git a/backend/onyx/connectors/highspot/connector.py b/backend/onyx/connectors/highspot/connector.py index f0e3a1adb..99de7fe06 100644 --- a/backend/onyx/connectors/highspot/connector.py +++ b/backend/onyx/connectors/highspot/connector.py @@ -20,7 +20,8 @@ from onyx.connectors.models import ConnectorMissingCredentialError from onyx.connectors.models import Document from onyx.connectors.models import SlimDocument from onyx.connectors.models import TextSection -from onyx.file_processing.extract_file_text import ALL_ACCEPTED_FILE_EXTENSIONS +from onyx.file_processing.extract_file_text import ACCEPTED_DOCUMENT_FILE_EXTENSIONS +from onyx.file_processing.extract_file_text import ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS from onyx.file_processing.extract_file_text import extract_file_text from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.utils.logger import setup_logger @@ -84,14 +85,21 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector): Populate the spot ID map with all available spots. Keys are stored as lowercase for case-insensitive lookups. """ - spots = self.client.get_spots() - for spot in spots: - if "title" in spot and "id" in spot: - spot_name = spot["title"] - self._spot_id_map[spot_name.lower()] = spot["id"] + try: + spots = self.client.get_spots() + for spot in spots: + if "title" in spot and "id" in spot: + spot_name = spot["title"] + self._spot_id_map[spot_name.lower()] = spot["id"] - self._all_spots_fetched = True - logger.info(f"Retrieved {len(self._spot_id_map)} spots from Highspot") + self._all_spots_fetched = True + logger.info(f"Retrieved {len(self._spot_id_map)} spots from Highspot") + except HighspotClientError as e: + logger.error(f"Error retrieving spots from Highspot: {str(e)}") + raise + except Exception as e: + logger.error(f"Unexpected error retrieving spots from Highspot: {str(e)}") + raise def _get_all_spot_names(self) -> List[str]: """ @@ -151,116 +159,142 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector): Batches of Document objects """ doc_batch: list[Document] = [] + try: + # If no spots specified, get all spots + spot_names_to_process = self.spot_names + if not spot_names_to_process: + spot_names_to_process = self._get_all_spot_names() + if not spot_names_to_process: + logger.warning("No spots found in Highspot") + raise ValueError("No spots found in Highspot") + logger.info( + f"No spots specified, using all {len(spot_names_to_process)} available spots" + ) - # If no spots specified, get all spots - spot_names_to_process = self.spot_names - if not spot_names_to_process: - spot_names_to_process = self._get_all_spot_names() - logger.info( - f"No spots specified, using all {len(spot_names_to_process)} available spots" - ) - - for spot_name in spot_names_to_process: - try: - spot_id = self._get_spot_id_from_name(spot_name) - if spot_id is None: - logger.warning(f"Spot ID not found for spot {spot_name}") - continue - offset = 0 - has_more = True - - while has_more: - logger.info( - f"Retrieving items from spot {spot_name}, offset {offset}" - ) - response = self.client.get_spot_items( - spot_id=spot_id, offset=offset, page_size=self.batch_size - ) - items = response.get("collection", []) - logger.info(f"Received Items: {items}") - if not items: - has_more = False + for spot_name in spot_names_to_process: + try: + spot_id = self._get_spot_id_from_name(spot_name) + if spot_id is None: + logger.warning(f"Spot ID not found for spot {spot_name}") continue + offset = 0 + has_more = True - for item in items: - try: - item_id = item.get("id") - if not item_id: - logger.warning("Item without ID found, skipping") - continue + while has_more: + logger.info( + f"Retrieving items from spot {spot_name}, offset {offset}" + ) + response = self.client.get_spot_items( + spot_id=spot_id, offset=offset, page_size=self.batch_size + ) + items = response.get("collection", []) + logger.info(f"Received Items: {items}") + if not items: + has_more = False + continue - item_details = self.client.get_item(item_id) - if not item_details: - logger.warning( - f"Item {item_id} details not found, skipping" - ) - continue - # Apply time filter if specified - if start or end: - updated_at = item_details.get("date_updated") - if updated_at: - # Convert to datetime for comparison - try: - updated_time = datetime.fromisoformat( - updated_at.replace("Z", "+00:00") - ) - if ( - start and updated_time.timestamp() < start - ) or (end and updated_time.timestamp() > end): + for item in items: + try: + item_id = item.get("id") + if not item_id: + logger.warning("Item without ID found, skipping") + continue + + item_details = self.client.get_item(item_id) + if not item_details: + logger.warning( + f"Item {item_id} details not found, skipping" + ) + continue + # Apply time filter if specified + if start or end: + updated_at = item_details.get("date_updated") + if updated_at: + # Convert to datetime for comparison + try: + updated_time = datetime.fromisoformat( + updated_at.replace("Z", "+00:00") + ) + if ( + start + and updated_time.timestamp() < start + ) or ( + end and updated_time.timestamp() > end + ): + continue + except (ValueError, TypeError): + # Skip if date cannot be parsed + logger.warning( + f"Invalid date format for item {item_id}: {updated_at}" + ) continue - except (ValueError, TypeError): - # Skip if date cannot be parsed - logger.warning( - f"Invalid date format for item {item_id}: {updated_at}" - ) - continue - content = self._get_item_content(item_details) - title = item_details.get("title", "") + content = self._get_item_content(item_details) - doc_batch.append( - Document( - id=f"HIGHSPOT_{item_id}", - sections=[ - TextSection( - link=item_details.get( - "url", - f"https://www.highspot.com/items/{item_id}", + title = item_details.get("title", "") + + doc_batch.append( + Document( + id=f"HIGHSPOT_{item_id}", + sections=[ + TextSection( + link=item_details.get( + "url", + f"https://www.highspot.com/items/{item_id}", + ), + text=content, + ) + ], + source=DocumentSource.HIGHSPOT, + semantic_identifier=title, + metadata={ + "spot_name": spot_name, + "type": item_details.get( + "content_type", "" ), - text=content, - ) - ], - source=DocumentSource.HIGHSPOT, - semantic_identifier=title, - metadata={ - "spot_name": spot_name, - "type": item_details.get("content_type", ""), - "created_at": item_details.get( - "date_added", "" - ), - "author": item_details.get("author", ""), - "language": item_details.get("language", ""), - "can_download": str( - item_details.get("can_download", False) - ), - }, - doc_updated_at=item_details.get("date_updated"), + "created_at": item_details.get( + "date_added", "" + ), + "author": item_details.get("author", ""), + "language": item_details.get( + "language", "" + ), + "can_download": str( + item_details.get("can_download", False) + ), + }, + doc_updated_at=item_details.get("date_updated"), + ) ) - ) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch = [] + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] - except HighspotClientError as e: - item_id = "ID" if not item_id else item_id - logger.error(f"Error retrieving item {item_id}: {str(e)}") + except HighspotClientError as e: + item_id = "ID" if not item_id else item_id + logger.error( + f"Error retrieving item {item_id}: {str(e)}" + ) + except Exception as e: + item_id = "ID" if not item_id else item_id + logger.error( + f"Unexpected error for item {item_id}: {str(e)}" + ) - has_more = len(items) >= self.batch_size - offset += self.batch_size + has_more = len(items) >= self.batch_size + offset += self.batch_size - except (HighspotClientError, ValueError) as e: - logger.error(f"Error processing spot {spot_name}: {str(e)}") + except (HighspotClientError, ValueError) as e: + logger.error(f"Error processing spot {spot_name}: {str(e)}") + except Exception as e: + logger.error( + f"Unexpected error processing spot {spot_name}: {str(e)}" + ) + + except Exception as e: + logger.error(f"Error in Highspot connector: {str(e)}") + raise if doc_batch: yield doc_batch @@ -286,7 +320,9 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector): # Extract title and description once at the beginning title, description = self._extract_title_and_description(item_details) default_content = f"{title}\n{description}" - logger.info(f"Processing item {item_id} with extension {file_extension}") + logger.info( + f"Processing item {item_id} with extension {file_extension} and file name {content_name}" + ) try: if content_type == "WebLink": @@ -298,30 +334,39 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector): elif ( is_valid_format - and file_extension in ALL_ACCEPTED_FILE_EXTENSIONS + and ( + file_extension in ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS + or file_extension in ACCEPTED_DOCUMENT_FILE_EXTENSIONS + ) and can_download ): - # For documents, try to get the text content - if not item_id: # Ensure item_id is defined - return default_content - content_response = self.client.get_item_content(item_id) # Process and extract text from binary content based on type if content_response: text_content = extract_file_text( - BytesIO(content_response), content_name + BytesIO(content_response), content_name, False ) - return text_content + return text_content if text_content else default_content return default_content else: return default_content except HighspotClientError as e: - # Use item_id safely in the warning message - error_context = f"item {item_id}" if item_id else "item" + error_context = f"item {item_id}" if item_id else "(item id not found)" logger.warning(f"Could not retrieve content for {error_context}: {str(e)}") - return "" + return default_content + except ValueError as e: + error_context = f"item {item_id}" if item_id else "(item id not found)" + logger.error(f"Value error for {error_context}: {str(e)}") + return default_content + + except Exception as e: + error_context = f"item {item_id}" if item_id else "(item id not found)" + logger.error( + f"Unexpected error retrieving content for {error_context}: {str(e)}" + ) + return default_content def _extract_title_and_description( self, item_details: Dict[str, Any] @@ -358,55 +403,63 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector): Batches of SlimDocument objects """ slim_doc_batch: list[SlimDocument] = [] - - # If no spots specified, get all spots - spot_names_to_process = self.spot_names - if not spot_names_to_process: - spot_names_to_process = self._get_all_spot_names() - logger.info( - f"No spots specified, using all {len(spot_names_to_process)} available spots for slim documents" - ) - - for spot_name in spot_names_to_process: - try: - spot_id = self._get_spot_id_from_name(spot_name) - offset = 0 - has_more = True - - while has_more: - logger.info( - f"Retrieving slim documents from spot {spot_name}, offset {offset}" - ) - response = self.client.get_spot_items( - spot_id=spot_id, offset=offset, page_size=self.batch_size - ) - - items = response.get("collection", []) - if not items: - has_more = False - continue - - for item in items: - item_id = item.get("id") - if not item_id: - continue - - slim_doc_batch.append(SlimDocument(id=f"HIGHSPOT_{item_id}")) - - if len(slim_doc_batch) >= _SLIM_BATCH_SIZE: - yield slim_doc_batch - slim_doc_batch = [] - - has_more = len(items) >= self.batch_size - offset += self.batch_size - - except (HighspotClientError, ValueError) as e: - logger.error( - f"Error retrieving slim documents from spot {spot_name}: {str(e)}" + try: + # If no spots specified, get all spots + spot_names_to_process = self.spot_names + if not spot_names_to_process: + spot_names_to_process = self._get_all_spot_names() + if not spot_names_to_process: + logger.warning("No spots found in Highspot") + raise ValueError("No spots found in Highspot") + logger.info( + f"No spots specified, using all {len(spot_names_to_process)} available spots for slim documents" ) - if slim_doc_batch: - yield slim_doc_batch + for spot_name in spot_names_to_process: + try: + spot_id = self._get_spot_id_from_name(spot_name) + offset = 0 + has_more = True + + while has_more: + logger.info( + f"Retrieving slim documents from spot {spot_name}, offset {offset}" + ) + response = self.client.get_spot_items( + spot_id=spot_id, offset=offset, page_size=self.batch_size + ) + + items = response.get("collection", []) + if not items: + has_more = False + continue + + for item in items: + item_id = item.get("id") + if not item_id: + continue + + slim_doc_batch.append( + SlimDocument(id=f"HIGHSPOT_{item_id}") + ) + + if len(slim_doc_batch) >= _SLIM_BATCH_SIZE: + yield slim_doc_batch + slim_doc_batch = [] + + has_more = len(items) >= self.batch_size + offset += self.batch_size + + except (HighspotClientError, ValueError) as e: + logger.error( + f"Error retrieving slim documents from spot {spot_name}: {str(e)}" + ) + + if slim_doc_batch: + yield slim_doc_batch + except Exception as e: + logger.error(f"Error in Highspot Slim Connector: {str(e)}") + raise def validate_credentials(self) -> bool: """ diff --git a/backend/tests/daily/connectors/highspot/test_highspot_connector.py b/backend/tests/daily/connectors/highspot/test_highspot_connector.py index a09ff5c1f..37a9eaea1 100644 --- a/backend/tests/daily/connectors/highspot/test_highspot_connector.py +++ b/backend/tests/daily/connectors/highspot/test_highspot_connector.py @@ -1,6 +1,7 @@ import json import os import time +from datetime import datetime from pathlib import Path from unittest.mock import MagicMock from unittest.mock import patch @@ -105,6 +106,54 @@ def test_highspot_connector_slim( assert len(all_slim_doc_ids) > 0 +@patch( + "onyx.file_processing.extract_file_text.get_unstructured_api_key", + return_value=None, +) +def test_highspot_connector_poll_source( + mock_get_api_key: MagicMock, highspot_connector: HighspotConnector +) -> None: + """Test poll_source functionality with date range filtering.""" + # Define date range: April 3, 2025 to April 4, 2025 + start_date = datetime(2025, 4, 3, 0, 0, 0) + end_date = datetime(2025, 4, 4, 23, 59, 59) + + # Convert to seconds since Unix epoch + start_time = int(time.mktime(start_date.timetuple())) + end_time = int(time.mktime(end_date.timetuple())) + + # Load test data for assertions + test_data = load_test_data() + poll_source_data = test_data.get("poll_source", {}) + target_doc_id = poll_source_data.get("target_doc_id") + + # Call poll_source with date range + all_docs: list[Document] = [] + target_doc: Document | None = None + + for doc_batch in highspot_connector.poll_source(start_time, end_time): + for doc in doc_batch: + all_docs.append(doc) + if doc.id == f"HIGHSPOT_{target_doc_id}": + target_doc = doc + + # Verify documents were loaded + assert len(all_docs) > 0 + + # Verify the specific test document was found and has correct properties + assert target_doc is not None + assert target_doc.semantic_identifier == poll_source_data.get("semantic_identifier") + assert target_doc.source == DocumentSource.HIGHSPOT + assert target_doc.metadata is not None + + # Verify sections + assert len(target_doc.sections) == 1 + section = target_doc.sections[0] + assert section.link == poll_source_data.get("link") + assert section.text is not None + assert len(section.text) > 0 + + def test_highspot_connector_validate_credentials( highspot_connector: HighspotConnector, ) -> None: diff --git a/backend/tests/daily/connectors/highspot/test_highspot_data.json b/backend/tests/daily/connectors/highspot/test_highspot_data.json index d796b3d60..2ebd09919 100644 --- a/backend/tests/daily/connectors/highspot/test_highspot_data.json +++ b/backend/tests/daily/connectors/highspot/test_highspot_data.json @@ -1,5 +1,10 @@ { "target_doc_id": "67cd8eb35d3ee0487de2e704", "semantic_identifier": "Highspot in Action _ Salesforce Integration", - "link": "https://www.highspot.com/items/67cd8eb35d3ee0487de2e704" + "link": "https://www.highspot.com/items/67cd8eb35d3ee0487de2e704", + "poll_source": { + "target_doc_id":"67ef9edcc3f40b2bf3d816a8", + "semantic_identifier":"A Brief Introduction To AI", + "link":"https://www.highspot.com/items/67ef9edcc3f40b2bf3d816a8" + } }