diff --git a/backend/danswer/connectors/freshdesk/connector.py b/backend/danswer/connectors/freshdesk/connector.py index e25010efd..bd5df1012 100644 --- a/backend/danswer/connectors/freshdesk/connector.py +++ b/backend/danswer/connectors/freshdesk/connector.py @@ -12,97 +12,166 @@ from danswer.utils.logger import setup_logger logger = setup_logger() -def _create_doc_from_ticket(ticket: dict, domain: str) -> Document: - # Process date fields - for date_field in ["created_at", "updated_at", "due_by"]: - ticket[date_field] = datetime.fromisoformat(ticket[date_field].replace('Z', '+00:00')) - # Convert all other values to strings - ticket = { - key: str(value) if not isinstance(value, (str, datetime)) else value - for key, value in ticket.items() +def _create_metadata_from_ticket(ticket: dict) -> dict: + included_fields = { + "fr_escalated", + "spam", + "priority", + "source", + "status", + "type", + "is_escalated", + "tags", + "nr_due_by", + "nr_escalated", + "cc_emails", + "fwd_emails", + "reply_cc_emails", + "ticket_cc_emails", + "support_email", + "to_emails", } - # Checking for overdue tickets - ticket["overdue"] = datetime.now(timezone.utc) > ticket["due_by"] + metadata = {} + email_data = {} - # Map ticket status codes to human readable values - status_mapping = {2: "open", 3: "pending", 4: "resolved", 5: "closed"} - if status_string := status_mapping.get(ticket.get("status")): - ticket["status"] = status_string + for key, value in ticket.items(): + if ( + key in included_fields + and value is not None + and value != [] + and value != {} + and value != "[]" + and value != "" + ): + value_to_str = ( + [str(item) for item in value] if isinstance(value, List) else str(value) + ) + if "email" in key: + email_data[key] = value_to_str + else: + metadata[key] = value_to_str - # Parse HTML from the description field - ticket["description"] = parse_html_page_basic(ticket["description"]) + if email_data: + metadata["email_data"] = str(email_data) + # Convert source to human-parsable string + source_types = { + "1": "Email", + "2": "Portal", + "3": "Phone", + "7": "Chat", + "9": "Feedback Widget", + "10": "Outbound Email", + } + if ticket.get("source"): + metadata["source"] = source_types.get( + str(ticket.get("source")), "Unknown Source Type" + ) + + # Convert priority to human-parsable string + priority_types = {"1": "low", "2": "medium", "3": "high", "4": "urgent"} + if ticket.get("priority"): + metadata["priority"] = priority_types.get( + str(ticket.get("priority")), "Unknown Priority" + ) + + # Convert status to human-parsable string + status_types = {"2": "open", "3": "pending", "4": "resolved", "5": "closed"} + if ticket.get("status"): + metadata["status"] = status_types.get( + str(ticket.get("status")), "Unknown Status" + ) + + metadata["overdue"] = datetime.now(timezone.utc) > ticket["due_by"] + + return metadata + + +def _create_doc_from_ticket(ticket: dict, domain: str) -> Document: return Document( id=ticket["id"], - sections=[Section( - link=f"https://{domain}.freshdesk.com/helpdesk/tickets/{int(ticket['id'])}", - text=json.dumps({ - key: value - for key, value in ticket.items() - if isinstance(value, str) - }, default=str), - )], + sections=[ + Section( + link=f"https://{domain}.freshdesk.com/helpdesk/tickets/{int(ticket['id'])}", + text=f"description: {parse_html_page_basic(ticket.get('description_text', ''))}", + ) + ], source=DocumentSource.FRESHDESK, semantic_identifier=ticket["subject"], - metadata={ - key: value.isoformat() if isinstance(value, datetime) else str(value) - for key, value in ticket.items() - if isinstance(value, (str, datetime)) and key not in ["description", "description_text"] - }, + metadata=_create_metadata_from_ticket(ticket), + doc_updated_at=ticket["updated_at"], ) + class FreshdeskConnector(PollConnector, LoadConnector): def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None: self.batch_size = batch_size - def load_credentials(self, credentials: dict[str, Any]) -> Optional[dict[str, Any]]: - self.api_key = credentials.get("freshdesk_api_key") - self.domain = credentials.get("freshdesk_domain") - self.password = credentials.get("freshdesk_password") - return None - - def _fetch_tickets(self, start: datetime | None = None, end: datetime | None = None) -> Iterator[List[dict]]: - #"end" is not currently used, so we may double fetch tickets created after the indexing starts but before the actual call is made. - #To use "end" would require us to use the search endpoint but it has limitations, - #namely having to fetch all IDs and then individually fetch each ticket because there is no "include" field available for this endpoint: - #https://developers.freshdesk.com/api/#filter_tickets - if any([self.api_key, self.domain, self.password]) is None: + def load_credentials(self, credentials: dict[str, str | int]) -> None: + api_key = credentials.get("freshdesk_api_key") + domain = credentials.get("freshdesk_domain") + password = credentials.get("freshdesk_password") + + if not all(isinstance(cred, str) for cred in [domain, api_key, password]): + raise ConnectorMissingCredentialError( + "All Freshdesk credentials must be strings" + ) + + self.api_key = str(api_key) + self.domain = str(domain) + self.password = str(password) + + def _fetch_tickets( + self, start: datetime | None = None, end: datetime | None = None + ) -> Iterator[List[dict]]: + # "end" is not currently used, so we may double fetch tickets created after the indexing starts but before the actual call is made. + # To use "end" would require us to use the search endpoint but it has limitations, + # namely having to fetch all IDs and then individually fetch each ticket because there is no "include" field available for this endpoint: + # https://developers.freshdesk.com/api/#filter_tickets + if any(attr is None for attr in [self.api_key, self.domain, self.password]): raise ConnectorMissingCredentialError("freshdesk") - + base_url = f"https://{self.domain}.freshdesk.com/api/v2/tickets" - params = { + params: dict[str, int | str] = { "include": "description", "per_page": 50, - "page": 1 + "page": 1, } - + if start: params["updated_since"] = start.isoformat() while True: - response = requests.get(base_url, auth=(self.api_key, self.password), params=params) + response = requests.get( + base_url, auth=(self.api_key, self.password), params=params + ) response.raise_for_status() - + if response.status_code == 204: break - + tickets = json.loads(response.content) - logger.info(f"Fetched {len(tickets)} tickets from Freshdesk API (Page {params['page']})") + logger.info( + f"Fetched {len(tickets)} tickets from Freshdesk API (Page {params['page']})" + ) yield tickets - if len(tickets) < params["per_page"]: + if len(tickets) < int(params["per_page"]): break - - params["page"] += 1 - def _process_tickets(self, start: datetime | None = None, end: datetime | None = None) -> GenerateDocumentsOutput: + params["page"] = int(params["page"]) + 1 + + def _process_tickets( + self, start: datetime | None = None, end: datetime | None = None + ) -> GenerateDocumentsOutput: doc_batch: List[Document] = [] for ticket_batch in self._fetch_tickets(start, end): for ticket in ticket_batch: + logger.info(_create_doc_from_ticket(ticket, self.domain)) doc_batch.append(_create_doc_from_ticket(ticket, self.domain)) if len(doc_batch) >= self.batch_size: @@ -115,7 +184,9 @@ class FreshdeskConnector(PollConnector, LoadConnector): def load_from_state(self) -> GenerateDocumentsOutput: return self._process_tickets() - def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)