diff --git a/backend/onyx/connectors/interfaces.py b/backend/onyx/connectors/interfaces.py index daa4b07c63e3..f36a520d5f52 100644 --- a/backend/onyx/connectors/interfaces.py +++ b/backend/onyx/connectors/interfaces.py @@ -8,7 +8,6 @@ from typing import TypeAlias from typing import TypeVar from pydantic import BaseModel -from typing_extensions import override from onyx.configs.constants import DocumentSource from onyx.connectors.models import ConnectorCheckpoint @@ -231,7 +230,7 @@ class CheckpointConnector(BaseConnector[CT]): """ raise NotImplementedError - @override + @abc.abstractmethod def build_dummy_checkpoint(self) -> CT: raise NotImplementedError diff --git a/backend/onyx/connectors/zendesk/connector.py b/backend/onyx/connectors/zendesk/connector.py index 9c37c242cfdf..5f1bb181f8e3 100644 --- a/backend/onyx/connectors/zendesk/connector.py +++ b/backend/onyx/connectors/zendesk/connector.py @@ -1,23 +1,32 @@ +import copy +import time from collections.abc import Iterator from typing import Any from typing import cast import requests +from pydantic import BaseModel +from requests.exceptions import HTTPError +from typing_extensions import override -from onyx.configs.app_configs import INDEX_BATCH_SIZE from onyx.configs.app_configs import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS from onyx.configs.constants import DocumentSource from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( time_str_to_utc, ) -from onyx.connectors.interfaces import GenerateDocumentsOutput +from onyx.connectors.exceptions import ConnectorValidationError +from onyx.connectors.exceptions import CredentialExpiredError +from onyx.connectors.exceptions import InsufficientPermissionsError +from onyx.connectors.interfaces import CheckpointConnector +from onyx.connectors.interfaces import CheckpointOutput +from onyx.connectors.interfaces import ConnectorFailure from onyx.connectors.interfaces import GenerateSlimDocumentOutput -from onyx.connectors.interfaces import LoadConnector -from onyx.connectors.interfaces import PollConnector from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.interfaces import SlimConnector from onyx.connectors.models import BasicExpertInfo +from onyx.connectors.models import ConnectorCheckpoint from onyx.connectors.models import Document +from onyx.connectors.models import DocumentFailure from onyx.connectors.models import SlimDocument from onyx.connectors.models import TextSection from onyx.file_processing.html_utils import parse_html_page_basic @@ -26,6 +35,7 @@ from onyx.utils.retry_wrapper import retry_builder MAX_PAGE_SIZE = 30 # Zendesk API maximum +MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large _SLIM_BATCH_SIZE = 1000 @@ -53,10 +63,22 @@ class ZendeskClient: # Sleep for the duration indicated by the Retry-After header time.sleep(int(retry_after)) + elif ( + response.status_code == 403 + and response.json().get("error") == "SupportProductInactive" + ): + return response.json() + response.raise_for_status() return response.json() +class ZendeskPageResponse(BaseModel): + data: list[dict[str, Any]] + meta: dict[str, Any] + has_more: bool + + def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]: content_tags: dict[str, str] = {} params = {"page[size]": MAX_PAGE_SIZE} @@ -82,11 +104,9 @@ def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]: def _get_articles( client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE ) -> Iterator[dict[str, Any]]: - params = ( - {"start_time": start_time, "page[size]": page_size} - if start_time - else {"page[size]": page_size} - ) + params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"} + if start_time is not None: + params["start_time"] = start_time while True: data = client.make_request("help_center/articles", params) @@ -98,10 +118,30 @@ def _get_articles( params["page[after]"] = data["meta"]["after_cursor"] +def _get_article_page( + client: ZendeskClient, + start_time: int | None = None, + after_cursor: str | None = None, + page_size: int = MAX_PAGE_SIZE, +) -> ZendeskPageResponse: + params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"} + if start_time is not None: + params["start_time"] = start_time + if after_cursor is not None: + params["page[after]"] = after_cursor + + data = client.make_request("help_center/articles", params) + return ZendeskPageResponse( + data=data["articles"], + meta=data["meta"], + has_more=bool(data["meta"].get("has_more", False)), + ) + + def _get_tickets( client: ZendeskClient, start_time: int | None = None ) -> Iterator[dict[str, Any]]: - params = {"start_time": start_time} if start_time else {"start_time": 0} + params = {"start_time": start_time or 0} while True: data = client.make_request("incremental/tickets.json", params) @@ -114,6 +154,27 @@ def _get_tickets( break +# TODO: maybe these don't need to be their own functions? +def _get_tickets_page( + client: ZendeskClient, start_time: int | None = None +) -> ZendeskPageResponse: + params = {"start_time": start_time or 0} + + # NOTE: for some reason zendesk doesn't seem to be respecting the start_time param + # in my local testing with very few tickets. We'll look into it if this becomes an + # issue in larger deployments + data = client.make_request("incremental/tickets.json", params) + if data.get("error") == "SupportProductInactive": + raise ValueError( + "Zendesk Support Product is not active for this account, No tickets to index" + ) + return ZendeskPageResponse( + data=data["tickets"], + meta={"end_time": data["end_time"]}, + has_more=not bool(data.get("end_of_stream", False)), + ) + + def _fetch_author(client: ZendeskClient, author_id: str) -> BasicExpertInfo | None: # Skip fetching if author_id is invalid if not author_id or author_id == "-1": @@ -278,13 +339,22 @@ def _ticket_to_document( ) -class ZendeskConnector(LoadConnector, PollConnector, SlimConnector): +class ZendeskConnectorCheckpoint(ConnectorCheckpoint): + # We use cursor-based paginated retrieval for articles + after_cursor_articles: str | None + + # We use timestamp-based paginated retrieval for tickets + next_start_time_tickets: int | None + + cached_author_map: dict[str, BasicExpertInfo] | None + cached_content_tags: dict[str, str] | None + + +class ZendeskConnector(SlimConnector, CheckpointConnector[ZendeskConnectorCheckpoint]): def __init__( self, - batch_size: int = INDEX_BATCH_SIZE, content_type: str = "articles", ) -> None: - self.batch_size = batch_size self.content_type = content_type self.subdomain = "" # Fetch all tags ahead of time @@ -304,33 +374,50 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector): ) return None - def load_from_state(self) -> GenerateDocumentsOutput: - return self.poll_source(None, None) - - def poll_source( - self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None - ) -> GenerateDocumentsOutput: + @override + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: if self.client is None: raise ZendeskCredentialsNotSetUpError() - self.content_tags = _get_content_tag_mapping(self.client) + if checkpoint.cached_content_tags is None: + checkpoint.cached_content_tags = _get_content_tag_mapping(self.client) + return checkpoint # save the content tags to the checkpoint + self.content_tags = checkpoint.cached_content_tags if self.content_type == "articles": - yield from self._poll_articles(start) + checkpoint = yield from self._retrieve_articles(start, end, checkpoint) + return checkpoint elif self.content_type == "tickets": - yield from self._poll_tickets(start) + checkpoint = yield from self._retrieve_tickets(start, end, checkpoint) + return checkpoint else: raise ValueError(f"Unsupported content_type: {self.content_type}") - def _poll_articles( - self, start: SecondsSinceUnixEpoch | None - ) -> GenerateDocumentsOutput: - articles = _get_articles(self.client, start_time=int(start) if start else None) - + def _retrieve_articles( + self, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: + checkpoint = copy.deepcopy(checkpoint) # This one is built on the fly as there may be more many more authors than tags - author_map: dict[str, BasicExpertInfo] = {} + author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {} + after_cursor = checkpoint.after_cursor_articles + doc_batch: list[Document] = [] - doc_batch = [] + response = _get_article_page( + self.client, + start_time=int(start) if start else None, + after_cursor=after_cursor, + ) + articles = response.data + has_more = response.has_more + after_cursor = response.meta.get("after_cursor") for article in articles: if ( article.get("body") is None @@ -342,66 +429,109 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector): ): continue - new_author_map, documents = _article_to_document( - article, self.content_tags, author_map, self.client - ) + try: + new_author_map, document = _article_to_document( + article, self.content_tags, author_map, self.client + ) + except Exception as e: + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=f"{article.get('id')}", + document_link=article.get("html_url", ""), + ), + failure_message=str(e), + exception=e, + ) + continue + if new_author_map: author_map.update(new_author_map) - doc_batch.append(documents) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch.clear() + doc_batch.append(document) - if doc_batch: - yield doc_batch + if not has_more: + yield from doc_batch + checkpoint.has_more = False + return checkpoint - def _poll_tickets( - self, start: SecondsSinceUnixEpoch | None - ) -> GenerateDocumentsOutput: + # Sometimes no documents are retrieved, but the cursor + # is still updated so the connector makes progress. + yield from doc_batch + checkpoint.after_cursor_articles = after_cursor + + last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None + checkpoint.has_more = bool( + end is None + or last_doc_updated_at is None + or last_doc_updated_at.timestamp() <= end + ) + checkpoint.cached_author_map = ( + author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None + ) + return checkpoint + + def _retrieve_tickets( + self, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + checkpoint: ZendeskConnectorCheckpoint, + ) -> CheckpointOutput[ZendeskConnectorCheckpoint]: + checkpoint = copy.deepcopy(checkpoint) if self.client is None: raise ZendeskCredentialsNotSetUpError() - author_map: dict[str, BasicExpertInfo] = {} + author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {} - ticket_generator = _get_tickets( - self.client, start_time=int(start) if start else None + doc_batch: list[Document] = [] + next_start_time = int(checkpoint.next_start_time_tickets or start or 0) + ticket_response = _get_tickets_page(self.client, start_time=next_start_time) + tickets = ticket_response.data + has_more = ticket_response.has_more + next_start_time = ticket_response.meta["end_time"] + for ticket in tickets: + if ticket.get("status") == "deleted": + continue + + try: + new_author_map, document = _ticket_to_document( + ticket=ticket, + author_map=author_map, + client=self.client, + default_subdomain=self.subdomain, + ) + except Exception as e: + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=f"{ticket.get('id')}", + document_link=ticket.get("url", ""), + ), + failure_message=str(e), + exception=e, + ) + continue + + if new_author_map: + author_map.update(new_author_map) + + doc_batch.append(document) + + if not has_more: + yield from doc_batch + checkpoint.has_more = False + return checkpoint + + yield from doc_batch + checkpoint.next_start_time_tickets = next_start_time + last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None + checkpoint.has_more = bool( + end is None + or last_doc_updated_at is None + or last_doc_updated_at.timestamp() <= end ) - - while True: - doc_batch = [] - for _ in range(self.batch_size): - try: - ticket = next(ticket_generator) - - # Check if the ticket status is deleted and skip it if so - if ticket.get("status") == "deleted": - continue - - new_author_map, documents = _ticket_to_document( - ticket=ticket, - author_map=author_map, - client=self.client, - default_subdomain=self.subdomain, - ) - - if new_author_map: - author_map.update(new_author_map) - - doc_batch.append(documents) - - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch.clear() - - except StopIteration: - # No more tickets to process - if doc_batch: - yield doc_batch - return - - if doc_batch: - yield doc_batch + checkpoint.cached_author_map = ( + author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None + ) + return checkpoint def retrieve_all_slim_documents( self, @@ -441,10 +571,51 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector): if slim_doc_batch: yield slim_doc_batch + @override + def validate_connector_settings(self) -> None: + if self.client is None: + raise ZendeskCredentialsNotSetUpError() + + try: + _get_article_page(self.client, start_time=0) + except HTTPError as e: + # Check for HTTP status codes + if e.response.status_code == 401: + raise CredentialExpiredError( + "Your Zendesk credentials appear to be invalid or expired (HTTP 401)." + ) from e + elif e.response.status_code == 403: + raise InsufficientPermissionsError( + "Your Zendesk token does not have sufficient permissions (HTTP 403)." + ) from e + elif e.response.status_code == 404: + raise ConnectorValidationError( + "Zendesk resource not found (HTTP 404)." + ) from e + else: + raise ConnectorValidationError( + f"Unexpected Zendesk error (status={e.response.status_code}): {e}" + ) from e + + @override + def validate_checkpoint_json( + self, checkpoint_json: str + ) -> ZendeskConnectorCheckpoint: + return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json) + + @override + def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint: + return ZendeskConnectorCheckpoint( + after_cursor_articles=None, + next_start_time_tickets=None, + cached_author_map=None, + cached_content_tags=None, + has_more=True, + ) + if __name__ == "__main__": import os - import time connector = ZendeskConnector() connector.load_credentials( @@ -457,6 +628,8 @@ if __name__ == "__main__": current = time.time() one_day_ago = current - 24 * 60 * 60 # 1 day - document_batches = connector.poll_source(one_day_ago, current) + document_batches = connector.load_from_checkpoint( + one_day_ago, current, connector.build_dummy_checkpoint() + ) print(next(document_batches)) diff --git a/backend/tests/daily/connectors/zendesk/test_zendesk_connector.py b/backend/tests/daily/connectors/zendesk/test_zendesk_connector.py index 0d757ecbfb61..149ec1a84dd1 100644 --- a/backend/tests/daily/connectors/zendesk/test_zendesk_connector.py +++ b/backend/tests/daily/connectors/zendesk/test_zendesk_connector.py @@ -2,12 +2,14 @@ import json import os import time from pathlib import Path +from typing import cast import pytest from onyx.configs.constants import DocumentSource from onyx.connectors.models import Document from onyx.connectors.zendesk.connector import ZendeskConnector +from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector def load_test_data(file_name: str = "test_zendesk_data.json") -> dict[str, dict]: @@ -50,7 +52,7 @@ def get_credentials() -> dict[str, str]: def test_zendesk_connector_basic( request: pytest.FixtureRequest, connector_fixture: str ) -> None: - connector = request.getfixturevalue(connector_fixture) + connector = cast(ZendeskConnector, request.getfixturevalue(connector_fixture)) test_data = load_test_data() all_docs: list[Document] = [] target_test_doc_id: str @@ -61,12 +63,11 @@ def test_zendesk_connector_basic( target_doc: Document | None = None - for doc_batch in connector.poll_source(0, time.time()): - for doc in doc_batch: - all_docs.append(doc) - if doc.id == target_test_doc_id: - target_doc = doc - print(f"target_doc {target_doc}") + for doc in load_all_docs_from_checkpoint_connector(connector, 0, time.time()): + all_docs.append(doc) + if doc.id == target_test_doc_id: + target_doc = doc + print(f"target_doc {target_doc}") assert len(all_docs) > 0, "No documents were retrieved from the connector" assert ( @@ -111,8 +112,10 @@ def test_zendesk_connector_basic( def test_zendesk_connector_slim(zendesk_article_connector: ZendeskConnector) -> None: # Get full doc IDs all_full_doc_ids = set() - for doc_batch in zendesk_article_connector.load_from_state(): - all_full_doc_ids.update([doc.id for doc in doc_batch]) + for doc in load_all_docs_from_checkpoint_connector( + zendesk_article_connector, 0, time.time() + ): + all_full_doc_ids.add(doc.id) # Get slim doc IDs all_slim_doc_ids = set() diff --git a/backend/tests/unit/onyx/connectors/zendesk/test_zendesk_checkpointing.py b/backend/tests/unit/onyx/connectors/zendesk/test_zendesk_checkpointing.py new file mode 100644 index 000000000000..be41d357eb2a --- /dev/null +++ b/backend/tests/unit/onyx/connectors/zendesk/test_zendesk_checkpointing.py @@ -0,0 +1,472 @@ +import time +from collections.abc import Callable +from collections.abc import Generator +from typing import Any +from typing import cast +from unittest.mock import call +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from requests.exceptions import HTTPError + +from onyx.configs.constants import DocumentSource +from onyx.connectors.exceptions import ConnectorValidationError +from onyx.connectors.exceptions import CredentialExpiredError +from onyx.connectors.exceptions import InsufficientPermissionsError +from onyx.connectors.models import Document +from onyx.connectors.zendesk.connector import ZendeskClient +from onyx.connectors.zendesk.connector import ZendeskConnector +from tests.unit.onyx.connectors.utils import load_everything_from_checkpoint_connector + + +@pytest.fixture +def mock_zendesk_client() -> MagicMock: + """Create a mock Zendesk client""" + mock = MagicMock(spec=ZendeskClient) + mock.base_url = "https://test.zendesk.com/api/v2" + mock.auth = ("test@example.com/token", "test_token") + return mock + + +@pytest.fixture +def zendesk_connector( + mock_zendesk_client: MagicMock, +) -> Generator[ZendeskConnector, None, None]: + """Create a Zendesk connector with mocked client""" + connector = ZendeskConnector(content_type="articles") + connector.client = mock_zendesk_client + yield connector + + +@pytest.fixture +def unmocked_zendesk_connector() -> Generator[ZendeskConnector, None, None]: + """Create a Zendesk connector with unmocked client""" + zendesk_connector = ZendeskConnector(content_type="articles") + zendesk_connector.client = ZendeskClient( + "test", "test@example.com/token", "test_token" + ) + yield zendesk_connector + + +@pytest.fixture +def create_mock_article() -> Callable[..., dict[str, Any]]: + def _create_mock_article( + id: int = 1, + title: str = "Test Article", + body: str = "Test Content", + updated_at: str = "2023-01-01T12:00:00Z", + author_id: str = "123", + label_names: list[str] | None = None, + draft: bool = False, + ) -> dict[str, Any]: + """Helper to create a mock article""" + return { + "id": id, + "title": title, + "body": body, + "updated_at": updated_at, + "author_id": author_id, + "label_names": label_names or [], + "draft": draft, + "html_url": f"https://test.zendesk.com/hc/en-us/articles/{id}", + } + + return _create_mock_article + + +@pytest.fixture +def create_mock_ticket() -> Callable[..., dict[str, Any]]: + def _create_mock_ticket( + id: int = 1, + subject: str = "Test Ticket", + description: str = "Test Description", + updated_at: str = "2023-01-01T12:00:00Z", + submitter_id: str = "123", + status: str = "open", + priority: str = "normal", + tags: list[str] | None = None, + ticket_type: str = "question", + ) -> dict[str, Any]: + """Helper to create a mock ticket""" + return { + "id": id, + "subject": subject, + "description": description, + "updated_at": updated_at, + "submitter": submitter_id, + "status": status, + "priority": priority, + "tags": tags or [], + "type": ticket_type, + "url": f"https://test.zendesk.com/agent/tickets/{id}", + } + + return _create_mock_ticket + + +@pytest.fixture +def create_mock_author() -> Callable[..., dict[str, Any]]: + def _create_mock_author( + id: str = "123", + name: str = "Test User", + email: str = "test@example.com", + ) -> dict[str, Any]: + """Helper to create a mock author""" + return { + "user": { + "id": id, + "name": name, + "email": email, + } + } + + return _create_mock_author + + +def test_load_from_checkpoint_articles_happy_path( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, + create_mock_article: Callable[..., dict[str, Any]], + create_mock_author: Callable[..., dict[str, Any]], +) -> None: + """Test loading articles from checkpoint - happy path""" + # Set up mock responses + mock_article1 = create_mock_article(id=1, title="Article 1") + mock_article2 = create_mock_article(id=2, title="Article 2") + mock_author = create_mock_author() + + # Mock API responses + mock_zendesk_client.make_request.side_effect = [ + # First call: content tags + {"records": []}, + # Second call: articles page + { + "articles": [mock_article1, mock_article2], + "meta": { + "has_more": False, + "after_cursor": None, + }, + }, + # Third call: author info + mock_author, + ] + + # Call load_from_checkpoint + end_time = time.time() + outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time) + + # Check that we got the documents + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + + assert len(outputs[1].items) == 2 + + # Check first document + doc1 = outputs[1].items[0] + assert isinstance(doc1, Document) + assert doc1.id == "article:1" + assert doc1.semantic_identifier == "Article 1" + assert doc1.source == DocumentSource.ZENDESK + + # Check second document + doc2 = outputs[1].items[1] + assert isinstance(doc2, Document) + assert doc2.id == "article:2" + assert doc2.semantic_identifier == "Article 2" + assert doc2.source == DocumentSource.ZENDESK + + # Check checkpoint state + assert not outputs[1].next_checkpoint.has_more + + +def test_load_from_checkpoint_tickets_happy_path( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, + create_mock_ticket: Callable[..., dict[str, Any]], + create_mock_author: Callable[..., dict[str, Any]], +) -> None: + """Test loading tickets from checkpoint - happy path""" + # Configure connector for tickets + zendesk_connector.content_type = "tickets" + + # Set up mock responses + mock_ticket1 = create_mock_ticket(id=1, subject="Ticket 1") + mock_ticket2 = create_mock_ticket(id=2, subject="Ticket 2") + mock_author = create_mock_author() + + # Mock API responses + mock_zendesk_client.make_request.side_effect = [ + # First call: content tags + {"records": []}, + # Second call: tickets page + { + "tickets": [mock_ticket1, mock_ticket2], + "end_of_stream": True, + "end_time": int(time.time()), + }, + # Third call: author info + mock_author, + # Fourth call: comments page + {"comments": []}, + # Fifth call: comments page + {"comments": []}, + ] + + zendesk_connector.client = mock_zendesk_client + + # Call load_from_checkpoint + end_time = time.time() + outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time) + + # Check that we got the documents + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + assert len(outputs[1].items) == 2 + + # Check first document + doc1 = outputs[1].items[0] + print(doc1, type(doc1)) + assert isinstance(doc1, Document) + assert doc1.id == "zendesk_ticket_1" + assert doc1.semantic_identifier == "Ticket #1: Ticket 1" + assert doc1.source == DocumentSource.ZENDESK + + # Check second document + doc2 = outputs[1].items[1] + assert isinstance(doc2, Document) + assert doc2.id == "zendesk_ticket_2" + assert doc2.semantic_identifier == "Ticket #2: Ticket 2" + assert doc2.source == DocumentSource.ZENDESK + + # Check checkpoint state + assert not outputs[1].next_checkpoint.has_more + + +def test_load_from_checkpoint_with_rate_limit( + unmocked_zendesk_connector: ZendeskConnector, + create_mock_article: Callable[..., dict[str, Any]], + create_mock_author: Callable[..., dict[str, Any]], +) -> None: + """Test loading from checkpoint with rate limit handling""" + zendesk_connector = unmocked_zendesk_connector + # Set up mock responses + mock_article = create_mock_article() + mock_author = create_mock_author() + author_response = MagicMock() + author_response.status_code = 200 + author_response.json.return_value = mock_author + + # Create mock responses for requests.get + rate_limit_response = MagicMock() + rate_limit_response.status_code = 429 + rate_limit_response.headers = {"Retry-After": "60"} + rate_limit_response.raise_for_status.side_effect = HTTPError( + response=rate_limit_response + ) + + success_response = MagicMock() + success_response.status_code = 200 + success_response.json.return_value = { + "articles": [mock_article], + "meta": { + "has_more": False, + "after_cursor": None, + }, + } + + # Mock requests.get to simulate rate limit then success + with patch("onyx.connectors.zendesk.connector.requests.get") as mock_get: + mock_get.side_effect = [ + # First call: content tags + MagicMock( + status_code=200, + json=lambda: {"records": [], "meta": {"has_more": False}}, + ), + # Second call: articles page (rate limited) + rate_limit_response, + # Third call: articles page (after rate limit) + success_response, + # Fourth call: author info + author_response, + ] + + # Call load_from_checkpoint + end_time = time.time() + with patch("onyx.connectors.zendesk.connector.time.sleep") as mock_sleep: + outputs = load_everything_from_checkpoint_connector( + zendesk_connector, 0, end_time + ) + mock_sleep.assert_has_calls([call(60), call(0.1)]) + + # Check that we got the document after rate limit was handled + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + assert len(outputs[1].items) == 1 + assert isinstance(outputs[1].items[0], Document) + assert outputs[1].items[0].id == "article:1" + + # Verify the requests were made with correct parameters + assert mock_get.call_count == 4 + # First call should be for content tags + args, kwargs = mock_get.call_args_list[0] + assert "guide/content_tags" in args[0] + # Second call should be for articles (rate limited) + args, kwargs = mock_get.call_args_list[1] + assert "help_center/articles" in args[0] + # Third call should be for articles (success) + args, kwargs = mock_get.call_args_list[2] + assert "help_center/articles" in args[0] + # Fourth call should be for author info + args, kwargs = mock_get.call_args_list[3] + assert "users/123" in args[0] + + +def test_load_from_checkpoint_with_empty_response( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, +) -> None: + """Test loading from checkpoint with empty response""" + # Mock API responses + mock_zendesk_client.make_request.side_effect = [ + # First call: content tags + {"records": []}, + # Second call: empty articles page + { + "articles": [], + "meta": { + "has_more": False, + "after_cursor": None, + }, + }, + ] + + # Call load_from_checkpoint + end_time = time.time() + outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time) + + # Check that we got no documents + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + assert len(outputs[1].items) == 0 + assert not outputs[1].next_checkpoint.has_more + + +def test_load_from_checkpoint_with_skipped_article( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, + create_mock_article: Callable[..., dict[str, Any]], +) -> None: + """Test loading from checkpoint with an article that should be skipped""" + # Set up mock responses with a draft article + mock_article = create_mock_article(draft=True) + mock_zendesk_client.make_request.side_effect = [ + # First call: content tags + {"records": []}, + # Second call: articles page with draft article + { + "articles": [mock_article], + "meta": { + "has_more": False, + "after_cursor": None, + }, + }, + ] + + # Call load_from_checkpoint + end_time = time.time() + outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time) + + # Check that no documents were returned + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + assert len(outputs[1].items) == 0 + assert not outputs[1].next_checkpoint.has_more + + +def test_load_from_checkpoint_with_skipped_ticket( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, + create_mock_ticket: Callable[..., dict[str, Any]], +) -> None: + """Test loading from checkpoint with a deleted ticket""" + # Configure connector for tickets + zendesk_connector.content_type = "tickets" + + # Set up mock responses with a deleted ticket + mock_ticket = create_mock_ticket(status="deleted") + mock_zendesk_client.make_request.side_effect = [ + # First call: content tags + {"records": []}, + # Second call: tickets page with deleted ticket + { + "tickets": [mock_ticket], + "end_of_stream": True, + "end_time": int(time.time()), + }, + ] + + # Call load_from_checkpoint + end_time = time.time() + outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time) + + # Check that no documents were returned + assert len(outputs) == 2 + assert outputs[0].next_checkpoint.cached_content_tags is not None + assert len(outputs[1].items) == 0 + assert not outputs[1].next_checkpoint.has_more + + +@pytest.mark.parametrize( + "status_code,expected_exception,expected_message", + [ + ( + 401, + CredentialExpiredError, + "Your Zendesk credentials appear to be invalid or expired", + ), + ( + 403, + InsufficientPermissionsError, + "Your Zendesk token does not have sufficient permissions", + ), + ( + 404, + ConnectorValidationError, + "Zendesk resource not found", + ), + ], +) +def test_validate_connector_settings_errors( + zendesk_connector: ZendeskConnector, + status_code: int, + expected_exception: type[Exception], + expected_message: str, +) -> None: + """Test validation with various error scenarios""" + mock_response = MagicMock() + mock_response.status_code = status_code + error = HTTPError(response=mock_response) + + mock_zendesk_client = cast(MagicMock, zendesk_connector.client) + mock_zendesk_client.make_request.side_effect = error + + with pytest.raises(expected_exception) as excinfo: + print("excinfo", excinfo) + zendesk_connector.validate_connector_settings() + + assert expected_message in str(excinfo.value) + + +def test_validate_connector_settings_success( + zendesk_connector: ZendeskConnector, + mock_zendesk_client: MagicMock, +) -> None: + """Test successful validation""" + # Mock successful API response + mock_zendesk_client.make_request.return_value = { + "articles": [], + "meta": {"has_more": False}, + } + + zendesk_connector.validate_connector_settings()