Refactor metadata

This commit is contained in:
Skylar Kesselring 2024-10-30 17:23:20 -04:00
parent 195e2c335d
commit 53d2d333ab

View File

@ -12,97 +12,166 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
def _create_doc_from_ticket(ticket: dict, domain: str) -> Document:
# Process date fields
for date_field in ["created_at", "updated_at", "due_by"]:
ticket[date_field] = datetime.fromisoformat(ticket[date_field].replace('Z', '+00:00'))
# Convert all other values to strings
ticket = {
key: str(value) if not isinstance(value, (str, datetime)) else value
for key, value in ticket.items()
def _create_metadata_from_ticket(ticket: dict) -> dict:
included_fields = {
"fr_escalated",
"spam",
"priority",
"source",
"status",
"type",
"is_escalated",
"tags",
"nr_due_by",
"nr_escalated",
"cc_emails",
"fwd_emails",
"reply_cc_emails",
"ticket_cc_emails",
"support_email",
"to_emails",
}
# Checking for overdue tickets
ticket["overdue"] = datetime.now(timezone.utc) > ticket["due_by"]
metadata = {}
email_data = {}
# Map ticket status codes to human readable values
status_mapping = {2: "open", 3: "pending", 4: "resolved", 5: "closed"}
if status_string := status_mapping.get(ticket.get("status")):
ticket["status"] = status_string
for key, value in ticket.items():
if (
key in included_fields
and value is not None
and value != []
and value != {}
and value != "[]"
and value != ""
):
value_to_str = (
[str(item) for item in value] if isinstance(value, List) else str(value)
)
if "email" in key:
email_data[key] = value_to_str
else:
metadata[key] = value_to_str
# Parse HTML from the description field
ticket["description"] = parse_html_page_basic(ticket["description"])
if email_data:
metadata["email_data"] = str(email_data)
# Convert source to human-parsable string
source_types = {
"1": "Email",
"2": "Portal",
"3": "Phone",
"7": "Chat",
"9": "Feedback Widget",
"10": "Outbound Email",
}
if ticket.get("source"):
metadata["source"] = source_types.get(
str(ticket.get("source")), "Unknown Source Type"
)
# Convert priority to human-parsable string
priority_types = {"1": "low", "2": "medium", "3": "high", "4": "urgent"}
if ticket.get("priority"):
metadata["priority"] = priority_types.get(
str(ticket.get("priority")), "Unknown Priority"
)
# Convert status to human-parsable string
status_types = {"2": "open", "3": "pending", "4": "resolved", "5": "closed"}
if ticket.get("status"):
metadata["status"] = status_types.get(
str(ticket.get("status")), "Unknown Status"
)
metadata["overdue"] = datetime.now(timezone.utc) > ticket["due_by"]
return metadata
def _create_doc_from_ticket(ticket: dict, domain: str) -> Document:
return Document(
id=ticket["id"],
sections=[Section(
link=f"https://{domain}.freshdesk.com/helpdesk/tickets/{int(ticket['id'])}",
text=json.dumps({
key: value
for key, value in ticket.items()
if isinstance(value, str)
}, default=str),
)],
sections=[
Section(
link=f"https://{domain}.freshdesk.com/helpdesk/tickets/{int(ticket['id'])}",
text=f"description: {parse_html_page_basic(ticket.get('description_text', ''))}",
)
],
source=DocumentSource.FRESHDESK,
semantic_identifier=ticket["subject"],
metadata={
key: value.isoformat() if isinstance(value, datetime) else str(value)
for key, value in ticket.items()
if isinstance(value, (str, datetime)) and key not in ["description", "description_text"]
},
metadata=_create_metadata_from_ticket(ticket),
doc_updated_at=ticket["updated_at"],
)
class FreshdeskConnector(PollConnector, LoadConnector):
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> Optional[dict[str, Any]]:
self.api_key = credentials.get("freshdesk_api_key")
self.domain = credentials.get("freshdesk_domain")
self.password = credentials.get("freshdesk_password")
return None
def _fetch_tickets(self, start: datetime | None = None, end: datetime | None = None) -> Iterator[List[dict]]:
#"end" is not currently used, so we may double fetch tickets created after the indexing starts but before the actual call is made.
#To use "end" would require us to use the search endpoint but it has limitations,
#namely having to fetch all IDs and then individually fetch each ticket because there is no "include" field available for this endpoint:
#https://developers.freshdesk.com/api/#filter_tickets
if any([self.api_key, self.domain, self.password]) is None:
def load_credentials(self, credentials: dict[str, str | int]) -> None:
api_key = credentials.get("freshdesk_api_key")
domain = credentials.get("freshdesk_domain")
password = credentials.get("freshdesk_password")
if not all(isinstance(cred, str) for cred in [domain, api_key, password]):
raise ConnectorMissingCredentialError(
"All Freshdesk credentials must be strings"
)
self.api_key = str(api_key)
self.domain = str(domain)
self.password = str(password)
def _fetch_tickets(
self, start: datetime | None = None, end: datetime | None = None
) -> Iterator[List[dict]]:
# "end" is not currently used, so we may double fetch tickets created after the indexing starts but before the actual call is made.
# To use "end" would require us to use the search endpoint but it has limitations,
# namely having to fetch all IDs and then individually fetch each ticket because there is no "include" field available for this endpoint:
# https://developers.freshdesk.com/api/#filter_tickets
if any(attr is None for attr in [self.api_key, self.domain, self.password]):
raise ConnectorMissingCredentialError("freshdesk")
base_url = f"https://{self.domain}.freshdesk.com/api/v2/tickets"
params = {
params: dict[str, int | str] = {
"include": "description",
"per_page": 50,
"page": 1
"page": 1,
}
if start:
params["updated_since"] = start.isoformat()
while True:
response = requests.get(base_url, auth=(self.api_key, self.password), params=params)
response = requests.get(
base_url, auth=(self.api_key, self.password), params=params
)
response.raise_for_status()
if response.status_code == 204:
break
tickets = json.loads(response.content)
logger.info(f"Fetched {len(tickets)} tickets from Freshdesk API (Page {params['page']})")
logger.info(
f"Fetched {len(tickets)} tickets from Freshdesk API (Page {params['page']})"
)
yield tickets
if len(tickets) < params["per_page"]:
if len(tickets) < int(params["per_page"]):
break
params["page"] += 1
def _process_tickets(self, start: datetime | None = None, end: datetime | None = None) -> GenerateDocumentsOutput:
params["page"] = int(params["page"]) + 1
def _process_tickets(
self, start: datetime | None = None, end: datetime | None = None
) -> GenerateDocumentsOutput:
doc_batch: List[Document] = []
for ticket_batch in self._fetch_tickets(start, end):
for ticket in ticket_batch:
logger.info(_create_doc_from_ticket(ticket, self.domain))
doc_batch.append(_create_doc_from_ticket(ticket, self.domain))
if len(doc_batch) >= self.batch_size:
@ -115,7 +184,9 @@ class FreshdeskConnector(PollConnector, LoadConnector):
def load_from_state(self) -> GenerateDocumentsOutput:
return self._process_tickets()
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)