zendesk checkpointed connector (#4311)

* zendesk v1

* logic fix

* zendesk testing

* add unit tests

* zendesk caching

* CW comments

* fix unit tests
This commit is contained in:
evan-danswer
2025-03-23 13:43:13 -07:00
committed by GitHub
parent c25d56f4a5
commit 99546e4a4d
4 changed files with 739 additions and 92 deletions

View File

@@ -8,7 +8,6 @@ from typing import TypeAlias
from typing import TypeVar
from pydantic import BaseModel
from typing_extensions import override
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import ConnectorCheckpoint
@@ -231,7 +230,7 @@ class CheckpointConnector(BaseConnector[CT]):
"""
raise NotImplementedError
@override
@abc.abstractmethod
def build_dummy_checkpoint(self) -> CT:
raise NotImplementedError

View File

@@ -1,23 +1,32 @@
import copy
import time
from collections.abc import Iterator
from typing import Any
from typing import cast
import requests
from pydantic import BaseModel
from requests.exceptions import HTTPError
from typing_extensions import override
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import ZENDESK_CONNECTOR_SKIP_ARTICLE_LABELS
from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
time_str_to_utc,
)
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.interfaces import CheckpointConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import ConnectorFailure
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TextSection
from onyx.file_processing.html_utils import parse_html_page_basic
@@ -26,6 +35,7 @@ from onyx.utils.retry_wrapper import retry_builder
MAX_PAGE_SIZE = 30 # Zendesk API maximum
MAX_AUTHOR_MAP_SIZE = 50_000 # Reset author map cache if it gets too large
_SLIM_BATCH_SIZE = 1000
@@ -53,10 +63,22 @@ class ZendeskClient:
# Sleep for the duration indicated by the Retry-After header
time.sleep(int(retry_after))
elif (
response.status_code == 403
and response.json().get("error") == "SupportProductInactive"
):
return response.json()
response.raise_for_status()
return response.json()
class ZendeskPageResponse(BaseModel):
data: list[dict[str, Any]]
meta: dict[str, Any]
has_more: bool
def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
content_tags: dict[str, str] = {}
params = {"page[size]": MAX_PAGE_SIZE}
@@ -82,11 +104,9 @@ def _get_content_tag_mapping(client: ZendeskClient) -> dict[str, str]:
def _get_articles(
client: ZendeskClient, start_time: int | None = None, page_size: int = MAX_PAGE_SIZE
) -> Iterator[dict[str, Any]]:
params = (
{"start_time": start_time, "page[size]": page_size}
if start_time
else {"page[size]": page_size}
)
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
while True:
data = client.make_request("help_center/articles", params)
@@ -98,10 +118,30 @@ def _get_articles(
params["page[after]"] = data["meta"]["after_cursor"]
def _get_article_page(
client: ZendeskClient,
start_time: int | None = None,
after_cursor: str | None = None,
page_size: int = MAX_PAGE_SIZE,
) -> ZendeskPageResponse:
params = {"page[size]": page_size, "sort_by": "updated_at", "sort_order": "asc"}
if start_time is not None:
params["start_time"] = start_time
if after_cursor is not None:
params["page[after]"] = after_cursor
data = client.make_request("help_center/articles", params)
return ZendeskPageResponse(
data=data["articles"],
meta=data["meta"],
has_more=bool(data["meta"].get("has_more", False)),
)
def _get_tickets(
client: ZendeskClient, start_time: int | None = None
) -> Iterator[dict[str, Any]]:
params = {"start_time": start_time} if start_time else {"start_time": 0}
params = {"start_time": start_time or 0}
while True:
data = client.make_request("incremental/tickets.json", params)
@@ -114,6 +154,27 @@ def _get_tickets(
break
# TODO: maybe these don't need to be their own functions?
def _get_tickets_page(
client: ZendeskClient, start_time: int | None = None
) -> ZendeskPageResponse:
params = {"start_time": start_time or 0}
# NOTE: for some reason zendesk doesn't seem to be respecting the start_time param
# in my local testing with very few tickets. We'll look into it if this becomes an
# issue in larger deployments
data = client.make_request("incremental/tickets.json", params)
if data.get("error") == "SupportProductInactive":
raise ValueError(
"Zendesk Support Product is not active for this account, No tickets to index"
)
return ZendeskPageResponse(
data=data["tickets"],
meta={"end_time": data["end_time"]},
has_more=not bool(data.get("end_of_stream", False)),
)
def _fetch_author(client: ZendeskClient, author_id: str) -> BasicExpertInfo | None:
# Skip fetching if author_id is invalid
if not author_id or author_id == "-1":
@@ -278,13 +339,22 @@ def _ticket_to_document(
)
class ZendeskConnector(LoadConnector, PollConnector, SlimConnector):
class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
# We use cursor-based paginated retrieval for articles
after_cursor_articles: str | None
# We use timestamp-based paginated retrieval for tickets
next_start_time_tickets: int | None
cached_author_map: dict[str, BasicExpertInfo] | None
cached_content_tags: dict[str, str] | None
class ZendeskConnector(SlimConnector, CheckpointConnector[ZendeskConnectorCheckpoint]):
def __init__(
self,
batch_size: int = INDEX_BATCH_SIZE,
content_type: str = "articles",
) -> None:
self.batch_size = batch_size
self.content_type = content_type
self.subdomain = ""
# Fetch all tags ahead of time
@@ -304,33 +374,50 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector):
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
return self.poll_source(None, None)
def poll_source(
self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
self.content_tags = _get_content_tag_mapping(self.client)
if checkpoint.cached_content_tags is None:
checkpoint.cached_content_tags = _get_content_tag_mapping(self.client)
return checkpoint # save the content tags to the checkpoint
self.content_tags = checkpoint.cached_content_tags
if self.content_type == "articles":
yield from self._poll_articles(start)
checkpoint = yield from self._retrieve_articles(start, end, checkpoint)
return checkpoint
elif self.content_type == "tickets":
yield from self._poll_tickets(start)
checkpoint = yield from self._retrieve_tickets(start, end, checkpoint)
return checkpoint
else:
raise ValueError(f"Unsupported content_type: {self.content_type}")
def _poll_articles(
self, start: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
articles = _get_articles(self.client, start_time=int(start) if start else None)
def _retrieve_articles(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
# This one is built on the fly as there may be more many more authors than tags
author_map: dict[str, BasicExpertInfo] = {}
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
after_cursor = checkpoint.after_cursor_articles
doc_batch: list[Document] = []
doc_batch = []
response = _get_article_page(
self.client,
start_time=int(start) if start else None,
after_cursor=after_cursor,
)
articles = response.data
has_more = response.has_more
after_cursor = response.meta.get("after_cursor")
for article in articles:
if (
article.get("body") is None
@@ -342,66 +429,109 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector):
):
continue
new_author_map, documents = _article_to_document(
article, self.content_tags, author_map, self.client
)
try:
new_author_map, document = _article_to_document(
article, self.content_tags, author_map, self.client
)
except Exception as e:
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{article.get('id')}",
document_link=article.get("html_url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
doc_batch.append(documents)
if len(doc_batch) >= self.batch_size:
yield doc_batch
doc_batch.clear()
doc_batch.append(document)
if doc_batch:
yield doc_batch
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
def _poll_tickets(
self, start: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
# Sometimes no documents are retrieved, but the cursor
# is still updated so the connector makes progress.
yield from doc_batch
checkpoint.after_cursor_articles = after_cursor
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def _retrieve_tickets(
self,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
checkpoint: ZendeskConnectorCheckpoint,
) -> CheckpointOutput[ZendeskConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
author_map: dict[str, BasicExpertInfo] = {}
author_map: dict[str, BasicExpertInfo] = checkpoint.cached_author_map or {}
ticket_generator = _get_tickets(
self.client, start_time=int(start) if start else None
doc_batch: list[Document] = []
next_start_time = int(checkpoint.next_start_time_tickets or start or 0)
ticket_response = _get_tickets_page(self.client, start_time=next_start_time)
tickets = ticket_response.data
has_more = ticket_response.has_more
next_start_time = ticket_response.meta["end_time"]
for ticket in tickets:
if ticket.get("status") == "deleted":
continue
try:
new_author_map, document = _ticket_to_document(
ticket=ticket,
author_map=author_map,
client=self.client,
default_subdomain=self.subdomain,
)
except Exception as e:
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"{ticket.get('id')}",
document_link=ticket.get("url", ""),
),
failure_message=str(e),
exception=e,
)
continue
if new_author_map:
author_map.update(new_author_map)
doc_batch.append(document)
if not has_more:
yield from doc_batch
checkpoint.has_more = False
return checkpoint
yield from doc_batch
checkpoint.next_start_time_tickets = next_start_time
last_doc_updated_at = doc_batch[-1].doc_updated_at if doc_batch else None
checkpoint.has_more = bool(
end is None
or last_doc_updated_at is None
or last_doc_updated_at.timestamp() <= end
)
while True:
doc_batch = []
for _ in range(self.batch_size):
try:
ticket = next(ticket_generator)
# Check if the ticket status is deleted and skip it if so
if ticket.get("status") == "deleted":
continue
new_author_map, documents = _ticket_to_document(
ticket=ticket,
author_map=author_map,
client=self.client,
default_subdomain=self.subdomain,
)
if new_author_map:
author_map.update(new_author_map)
doc_batch.append(documents)
if len(doc_batch) >= self.batch_size:
yield doc_batch
doc_batch.clear()
except StopIteration:
# No more tickets to process
if doc_batch:
yield doc_batch
return
if doc_batch:
yield doc_batch
checkpoint.cached_author_map = (
author_map if len(author_map) <= MAX_AUTHOR_MAP_SIZE else None
)
return checkpoint
def retrieve_all_slim_documents(
self,
@@ -441,10 +571,51 @@ class ZendeskConnector(LoadConnector, PollConnector, SlimConnector):
if slim_doc_batch:
yield slim_doc_batch
@override
def validate_connector_settings(self) -> None:
if self.client is None:
raise ZendeskCredentialsNotSetUpError()
try:
_get_article_page(self.client, start_time=0)
except HTTPError as e:
# Check for HTTP status codes
if e.response.status_code == 401:
raise CredentialExpiredError(
"Your Zendesk credentials appear to be invalid or expired (HTTP 401)."
) from e
elif e.response.status_code == 403:
raise InsufficientPermissionsError(
"Your Zendesk token does not have sufficient permissions (HTTP 403)."
) from e
elif e.response.status_code == 404:
raise ConnectorValidationError(
"Zendesk resource not found (HTTP 404)."
) from e
else:
raise ConnectorValidationError(
f"Unexpected Zendesk error (status={e.response.status_code}): {e}"
) from e
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint.model_validate_json(checkpoint_json)
@override
def build_dummy_checkpoint(self) -> ZendeskConnectorCheckpoint:
return ZendeskConnectorCheckpoint(
after_cursor_articles=None,
next_start_time_tickets=None,
cached_author_map=None,
cached_content_tags=None,
has_more=True,
)
if __name__ == "__main__":
import os
import time
connector = ZendeskConnector()
connector.load_credentials(
@@ -457,6 +628,8 @@ if __name__ == "__main__":
current = time.time()
one_day_ago = current - 24 * 60 * 60 # 1 day
document_batches = connector.poll_source(one_day_ago, current)
document_batches = connector.load_from_checkpoint(
one_day_ago, current, connector.build_dummy_checkpoint()
)
print(next(document_batches))

View File

@@ -2,12 +2,14 @@ import json
import os
import time
from pathlib import Path
from typing import cast
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.connectors.zendesk.connector import ZendeskConnector
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
def load_test_data(file_name: str = "test_zendesk_data.json") -> dict[str, dict]:
@@ -50,7 +52,7 @@ def get_credentials() -> dict[str, str]:
def test_zendesk_connector_basic(
request: pytest.FixtureRequest, connector_fixture: str
) -> None:
connector = request.getfixturevalue(connector_fixture)
connector = cast(ZendeskConnector, request.getfixturevalue(connector_fixture))
test_data = load_test_data()
all_docs: list[Document] = []
target_test_doc_id: str
@@ -61,12 +63,11 @@ def test_zendesk_connector_basic(
target_doc: Document | None = None
for doc_batch in connector.poll_source(0, time.time()):
for doc in doc_batch:
all_docs.append(doc)
if doc.id == target_test_doc_id:
target_doc = doc
print(f"target_doc {target_doc}")
for doc in load_all_docs_from_checkpoint_connector(connector, 0, time.time()):
all_docs.append(doc)
if doc.id == target_test_doc_id:
target_doc = doc
print(f"target_doc {target_doc}")
assert len(all_docs) > 0, "No documents were retrieved from the connector"
assert (
@@ -111,8 +112,10 @@ def test_zendesk_connector_basic(
def test_zendesk_connector_slim(zendesk_article_connector: ZendeskConnector) -> None:
# Get full doc IDs
all_full_doc_ids = set()
for doc_batch in zendesk_article_connector.load_from_state():
all_full_doc_ids.update([doc.id for doc in doc_batch])
for doc in load_all_docs_from_checkpoint_connector(
zendesk_article_connector, 0, time.time()
):
all_full_doc_ids.add(doc.id)
# Get slim doc IDs
all_slim_doc_ids = set()

View File

@@ -0,0 +1,472 @@
import time
from collections.abc import Callable
from collections.abc import Generator
from typing import Any
from typing import cast
from unittest.mock import call
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from requests.exceptions import HTTPError
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.models import Document
from onyx.connectors.zendesk.connector import ZendeskClient
from onyx.connectors.zendesk.connector import ZendeskConnector
from tests.unit.onyx.connectors.utils import load_everything_from_checkpoint_connector
@pytest.fixture
def mock_zendesk_client() -> MagicMock:
"""Create a mock Zendesk client"""
mock = MagicMock(spec=ZendeskClient)
mock.base_url = "https://test.zendesk.com/api/v2"
mock.auth = ("test@example.com/token", "test_token")
return mock
@pytest.fixture
def zendesk_connector(
mock_zendesk_client: MagicMock,
) -> Generator[ZendeskConnector, None, None]:
"""Create a Zendesk connector with mocked client"""
connector = ZendeskConnector(content_type="articles")
connector.client = mock_zendesk_client
yield connector
@pytest.fixture
def unmocked_zendesk_connector() -> Generator[ZendeskConnector, None, None]:
"""Create a Zendesk connector with unmocked client"""
zendesk_connector = ZendeskConnector(content_type="articles")
zendesk_connector.client = ZendeskClient(
"test", "test@example.com/token", "test_token"
)
yield zendesk_connector
@pytest.fixture
def create_mock_article() -> Callable[..., dict[str, Any]]:
def _create_mock_article(
id: int = 1,
title: str = "Test Article",
body: str = "Test Content",
updated_at: str = "2023-01-01T12:00:00Z",
author_id: str = "123",
label_names: list[str] | None = None,
draft: bool = False,
) -> dict[str, Any]:
"""Helper to create a mock article"""
return {
"id": id,
"title": title,
"body": body,
"updated_at": updated_at,
"author_id": author_id,
"label_names": label_names or [],
"draft": draft,
"html_url": f"https://test.zendesk.com/hc/en-us/articles/{id}",
}
return _create_mock_article
@pytest.fixture
def create_mock_ticket() -> Callable[..., dict[str, Any]]:
def _create_mock_ticket(
id: int = 1,
subject: str = "Test Ticket",
description: str = "Test Description",
updated_at: str = "2023-01-01T12:00:00Z",
submitter_id: str = "123",
status: str = "open",
priority: str = "normal",
tags: list[str] | None = None,
ticket_type: str = "question",
) -> dict[str, Any]:
"""Helper to create a mock ticket"""
return {
"id": id,
"subject": subject,
"description": description,
"updated_at": updated_at,
"submitter": submitter_id,
"status": status,
"priority": priority,
"tags": tags or [],
"type": ticket_type,
"url": f"https://test.zendesk.com/agent/tickets/{id}",
}
return _create_mock_ticket
@pytest.fixture
def create_mock_author() -> Callable[..., dict[str, Any]]:
def _create_mock_author(
id: str = "123",
name: str = "Test User",
email: str = "test@example.com",
) -> dict[str, Any]:
"""Helper to create a mock author"""
return {
"user": {
"id": id,
"name": name,
"email": email,
}
}
return _create_mock_author
def test_load_from_checkpoint_articles_happy_path(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
create_mock_article: Callable[..., dict[str, Any]],
create_mock_author: Callable[..., dict[str, Any]],
) -> None:
"""Test loading articles from checkpoint - happy path"""
# Set up mock responses
mock_article1 = create_mock_article(id=1, title="Article 1")
mock_article2 = create_mock_article(id=2, title="Article 2")
mock_author = create_mock_author()
# Mock API responses
mock_zendesk_client.make_request.side_effect = [
# First call: content tags
{"records": []},
# Second call: articles page
{
"articles": [mock_article1, mock_article2],
"meta": {
"has_more": False,
"after_cursor": None,
},
},
# Third call: author info
mock_author,
]
# Call load_from_checkpoint
end_time = time.time()
outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time)
# Check that we got the documents
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 2
# Check first document
doc1 = outputs[1].items[0]
assert isinstance(doc1, Document)
assert doc1.id == "article:1"
assert doc1.semantic_identifier == "Article 1"
assert doc1.source == DocumentSource.ZENDESK
# Check second document
doc2 = outputs[1].items[1]
assert isinstance(doc2, Document)
assert doc2.id == "article:2"
assert doc2.semantic_identifier == "Article 2"
assert doc2.source == DocumentSource.ZENDESK
# Check checkpoint state
assert not outputs[1].next_checkpoint.has_more
def test_load_from_checkpoint_tickets_happy_path(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
create_mock_ticket: Callable[..., dict[str, Any]],
create_mock_author: Callable[..., dict[str, Any]],
) -> None:
"""Test loading tickets from checkpoint - happy path"""
# Configure connector for tickets
zendesk_connector.content_type = "tickets"
# Set up mock responses
mock_ticket1 = create_mock_ticket(id=1, subject="Ticket 1")
mock_ticket2 = create_mock_ticket(id=2, subject="Ticket 2")
mock_author = create_mock_author()
# Mock API responses
mock_zendesk_client.make_request.side_effect = [
# First call: content tags
{"records": []},
# Second call: tickets page
{
"tickets": [mock_ticket1, mock_ticket2],
"end_of_stream": True,
"end_time": int(time.time()),
},
# Third call: author info
mock_author,
# Fourth call: comments page
{"comments": []},
# Fifth call: comments page
{"comments": []},
]
zendesk_connector.client = mock_zendesk_client
# Call load_from_checkpoint
end_time = time.time()
outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time)
# Check that we got the documents
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 2
# Check first document
doc1 = outputs[1].items[0]
print(doc1, type(doc1))
assert isinstance(doc1, Document)
assert doc1.id == "zendesk_ticket_1"
assert doc1.semantic_identifier == "Ticket #1: Ticket 1"
assert doc1.source == DocumentSource.ZENDESK
# Check second document
doc2 = outputs[1].items[1]
assert isinstance(doc2, Document)
assert doc2.id == "zendesk_ticket_2"
assert doc2.semantic_identifier == "Ticket #2: Ticket 2"
assert doc2.source == DocumentSource.ZENDESK
# Check checkpoint state
assert not outputs[1].next_checkpoint.has_more
def test_load_from_checkpoint_with_rate_limit(
unmocked_zendesk_connector: ZendeskConnector,
create_mock_article: Callable[..., dict[str, Any]],
create_mock_author: Callable[..., dict[str, Any]],
) -> None:
"""Test loading from checkpoint with rate limit handling"""
zendesk_connector = unmocked_zendesk_connector
# Set up mock responses
mock_article = create_mock_article()
mock_author = create_mock_author()
author_response = MagicMock()
author_response.status_code = 200
author_response.json.return_value = mock_author
# Create mock responses for requests.get
rate_limit_response = MagicMock()
rate_limit_response.status_code = 429
rate_limit_response.headers = {"Retry-After": "60"}
rate_limit_response.raise_for_status.side_effect = HTTPError(
response=rate_limit_response
)
success_response = MagicMock()
success_response.status_code = 200
success_response.json.return_value = {
"articles": [mock_article],
"meta": {
"has_more": False,
"after_cursor": None,
},
}
# Mock requests.get to simulate rate limit then success
with patch("onyx.connectors.zendesk.connector.requests.get") as mock_get:
mock_get.side_effect = [
# First call: content tags
MagicMock(
status_code=200,
json=lambda: {"records": [], "meta": {"has_more": False}},
),
# Second call: articles page (rate limited)
rate_limit_response,
# Third call: articles page (after rate limit)
success_response,
# Fourth call: author info
author_response,
]
# Call load_from_checkpoint
end_time = time.time()
with patch("onyx.connectors.zendesk.connector.time.sleep") as mock_sleep:
outputs = load_everything_from_checkpoint_connector(
zendesk_connector, 0, end_time
)
mock_sleep.assert_has_calls([call(60), call(0.1)])
# Check that we got the document after rate limit was handled
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 1
assert isinstance(outputs[1].items[0], Document)
assert outputs[1].items[0].id == "article:1"
# Verify the requests were made with correct parameters
assert mock_get.call_count == 4
# First call should be for content tags
args, kwargs = mock_get.call_args_list[0]
assert "guide/content_tags" in args[0]
# Second call should be for articles (rate limited)
args, kwargs = mock_get.call_args_list[1]
assert "help_center/articles" in args[0]
# Third call should be for articles (success)
args, kwargs = mock_get.call_args_list[2]
assert "help_center/articles" in args[0]
# Fourth call should be for author info
args, kwargs = mock_get.call_args_list[3]
assert "users/123" in args[0]
def test_load_from_checkpoint_with_empty_response(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
) -> None:
"""Test loading from checkpoint with empty response"""
# Mock API responses
mock_zendesk_client.make_request.side_effect = [
# First call: content tags
{"records": []},
# Second call: empty articles page
{
"articles": [],
"meta": {
"has_more": False,
"after_cursor": None,
},
},
]
# Call load_from_checkpoint
end_time = time.time()
outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time)
# Check that we got no documents
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 0
assert not outputs[1].next_checkpoint.has_more
def test_load_from_checkpoint_with_skipped_article(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
create_mock_article: Callable[..., dict[str, Any]],
) -> None:
"""Test loading from checkpoint with an article that should be skipped"""
# Set up mock responses with a draft article
mock_article = create_mock_article(draft=True)
mock_zendesk_client.make_request.side_effect = [
# First call: content tags
{"records": []},
# Second call: articles page with draft article
{
"articles": [mock_article],
"meta": {
"has_more": False,
"after_cursor": None,
},
},
]
# Call load_from_checkpoint
end_time = time.time()
outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time)
# Check that no documents were returned
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 0
assert not outputs[1].next_checkpoint.has_more
def test_load_from_checkpoint_with_skipped_ticket(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
create_mock_ticket: Callable[..., dict[str, Any]],
) -> None:
"""Test loading from checkpoint with a deleted ticket"""
# Configure connector for tickets
zendesk_connector.content_type = "tickets"
# Set up mock responses with a deleted ticket
mock_ticket = create_mock_ticket(status="deleted")
mock_zendesk_client.make_request.side_effect = [
# First call: content tags
{"records": []},
# Second call: tickets page with deleted ticket
{
"tickets": [mock_ticket],
"end_of_stream": True,
"end_time": int(time.time()),
},
]
# Call load_from_checkpoint
end_time = time.time()
outputs = load_everything_from_checkpoint_connector(zendesk_connector, 0, end_time)
# Check that no documents were returned
assert len(outputs) == 2
assert outputs[0].next_checkpoint.cached_content_tags is not None
assert len(outputs[1].items) == 0
assert not outputs[1].next_checkpoint.has_more
@pytest.mark.parametrize(
"status_code,expected_exception,expected_message",
[
(
401,
CredentialExpiredError,
"Your Zendesk credentials appear to be invalid or expired",
),
(
403,
InsufficientPermissionsError,
"Your Zendesk token does not have sufficient permissions",
),
(
404,
ConnectorValidationError,
"Zendesk resource not found",
),
],
)
def test_validate_connector_settings_errors(
zendesk_connector: ZendeskConnector,
status_code: int,
expected_exception: type[Exception],
expected_message: str,
) -> None:
"""Test validation with various error scenarios"""
mock_response = MagicMock()
mock_response.status_code = status_code
error = HTTPError(response=mock_response)
mock_zendesk_client = cast(MagicMock, zendesk_connector.client)
mock_zendesk_client.make_request.side_effect = error
with pytest.raises(expected_exception) as excinfo:
print("excinfo", excinfo)
zendesk_connector.validate_connector_settings()
assert expected_message in str(excinfo.value)
def test_validate_connector_settings_success(
zendesk_connector: ZendeskConnector,
mock_zendesk_client: MagicMock,
) -> None:
"""Test successful validation"""
# Mock successful API response
mock_zendesk_client.make_request.return_value = {
"articles": [],
"meta": {"has_more": False},
}
zendesk_connector.validate_connector_settings()