Files
danswer/backend/onyx/connectors/gong/connector.py
2024-12-13 09:56:10 -08:00

317 lines
12 KiB
Python

import base64
from collections.abc import Generator
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import cast
import requests
from onyx.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from onyx.configs.app_configs import GONG_CONNECTOR_START_TIME
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.utils.logger import setup_logger
logger = setup_logger()
GONG_BASE_URL = "https://us-34014.api.gong.io"
class GongConnector(LoadConnector, PollConnector):
def __init__(
self,
workspaces: list[str] | None = None,
batch_size: int = INDEX_BATCH_SIZE,
continue_on_fail: bool = CONTINUE_ON_CONNECTOR_FAILURE,
hide_user_info: bool = False,
) -> None:
self.workspaces = workspaces
self.batch_size: int = batch_size
self.continue_on_fail = continue_on_fail
self.auth_token_basic: str | None = None
self.hide_user_info = hide_user_info
def _get_auth_header(self) -> dict[str, str]:
if self.auth_token_basic is None:
raise ConnectorMissingCredentialError("Gong")
return {"Authorization": f"Basic {self.auth_token_basic}"}
def _get_workspace_id_map(self) -> dict[str, str]:
url = f"{GONG_BASE_URL}/v2/workspaces"
response = requests.get(url, headers=self._get_auth_header())
response.raise_for_status()
workspaces_details = response.json().get("workspaces")
name_id_map = {
workspace["name"]: workspace["id"] for workspace in workspaces_details
}
id_id_map = {
workspace["id"]: workspace["id"] for workspace in workspaces_details
}
# In very rare case, if a workspace is given a name which is the id of another workspace,
# Then the user input is treated as the name
return {**id_id_map, **name_id_map}
def _get_transcript_batches(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> Generator[list[dict[str, Any]], None, None]:
url = f"{GONG_BASE_URL}/v2/calls/transcript"
body: dict[str, dict] = {"filter": {}}
if start_datetime:
body["filter"]["fromDateTime"] = start_datetime
if end_datetime:
body["filter"]["toDateTime"] = end_datetime
# The batch_ids in the previous method appears to be batches of call_ids to process
# In this method, we will retrieve transcripts for them in batches.
transcripts: list[dict[str, Any]] = []
workspace_list = self.workspaces or [None] # type: ignore
workspace_map = self._get_workspace_id_map() if self.workspaces else {}
for workspace in workspace_list:
if workspace:
logger.info(f"Updating Gong workspace: {workspace}")
workspace_id = workspace_map.get(workspace)
if not workspace_id:
logger.error(f"Invalid Gong workspace: {workspace}")
if not self.continue_on_fail:
raise ValueError(f"Invalid workspace: {workspace}")
continue
body["filter"]["workspaceId"] = workspace_id
else:
if "workspaceId" in body["filter"]:
del body["filter"]["workspaceId"]
while True:
response = requests.post(
url, headers=self._get_auth_header(), json=body
)
# If no calls in the range, just break out
if response.status_code == 404:
break
try:
response.raise_for_status()
except Exception:
logger.error(f"Error fetching transcripts: {response.text}")
raise
data = response.json()
call_transcripts = data.get("callTranscripts", [])
transcripts.extend(call_transcripts)
while len(transcripts) >= self.batch_size:
yield transcripts[: self.batch_size]
transcripts = transcripts[self.batch_size :]
cursor = data.get("records", {}).get("cursor")
if cursor:
body["cursor"] = cursor
else:
break
if transcripts:
yield transcripts
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict:
url = f"{GONG_BASE_URL}/v2/calls/extensive"
body = {
"filter": {"callIds": call_ids},
"contentSelector": {"exposedFields": {"parties": True}},
}
response = requests.post(url, headers=self._get_auth_header(), json=body)
response.raise_for_status()
calls = response.json().get("calls")
call_to_metadata = {}
for call in calls:
call_to_metadata[call["metaData"]["id"]] = call
return call_to_metadata
@staticmethod
def _parse_parties(parties: list[dict]) -> dict[str, str]:
id_mapping = {}
for party in parties:
name = party.get("name")
email = party.get("emailAddress")
if name and email:
full_identifier = f"{name} ({email})"
elif name:
full_identifier = name
elif email:
full_identifier = email
else:
full_identifier = "Unknown"
id_mapping[party["speakerId"]] = full_identifier
return id_mapping
def _fetch_calls(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> GenerateDocumentsOutput:
for transcript_batch in self._get_transcript_batches(
start_datetime, end_datetime
):
doc_batch: list[Document] = []
call_ids = cast(
list[str],
[t.get("callId") for t in transcript_batch if t.get("callId")],
)
call_details_map = self._get_call_details_by_ids(call_ids)
for transcript in transcript_batch:
call_id = transcript.get("callId")
if not call_id or call_id not in call_details_map:
logger.error(
f"Couldn't get call information for Call ID: {call_id}"
)
if not self.continue_on_fail:
raise RuntimeError(
f"Couldn't get call information for Call ID: {call_id}"
)
continue
call_details = call_details_map[call_id]
call_metadata = call_details["metaData"]
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"Indexing Gong call from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
if call_parties is None:
logger.error(f"Couldn't get parties for Call ID: {call_id}")
call_parties = []
id_to_name_map = self._parse_parties(call_parties)
# Keeping a separate dict here in case the parties info is incomplete
speaker_to_name: dict[str, str] = {}
transcript_text = ""
call_purpose = call_metadata["purpose"]
if call_purpose:
transcript_text += f"Call Description: {call_purpose}\n\n"
contents = transcript["transcript"]
for segment in contents:
speaker_id = segment.get("speakerId", "")
if speaker_id not in speaker_to_name:
if self.hide_user_info:
speaker_to_name[
speaker_id
] = f"User {len(speaker_to_name) + 1}"
else:
speaker_to_name[speaker_id] = id_to_name_map.get(
speaker_id, "Unknown"
)
speaker_name = speaker_to_name[speaker_id]
sentences = segment.get("sentences", {})
monolog = " ".join(
[sentence.get("text", "") for sentence in sentences]
)
transcript_text += f"{speaker_name}: {monolog}\n\n"
metadata = {}
if call_metadata.get("system"):
metadata["client"] = call_metadata.get("system")
# TODO calls have a clientUniqueId field, can pull that in later
doc_batch.append(
Document(
id=call_id,
sections=[
Section(link=call_metadata["url"], text=transcript_text)
],
source=DocumentSource.GONG,
# Should not ever be Untitled as a call cannot be made without a Title
semantic_identifier=call_title or "Untitled",
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
timezone.utc
),
metadata={"client": call_metadata.get("system")},
)
)
yield doc_batch
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f'{credentials["gong_access_key"]}:{credentials["gong_access_key_secret"]}'
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
return self._fetch_calls()
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
# if this env variable is set, don't start from a timestamp before the specified
# start time
# TODO: remove this once this is globally available
if GONG_CONNECTOR_START_TIME:
special_start_datetime = datetime.fromisoformat(GONG_CONNECTOR_START_TIME)
special_start_datetime = special_start_datetime.replace(tzinfo=timezone.utc)
else:
special_start_datetime = datetime.fromtimestamp(0, tz=timezone.utc)
# don't let the special start dt be past the end time, this causes issues when
# the Gong API (`filter.fromDateTime: must be before toDateTime`)
special_start_datetime = min(special_start_datetime, end_datetime)
start_datetime = max(
datetime.fromtimestamp(start, tz=timezone.utc), special_start_datetime
)
# Because these are meeting start times, the meeting needs to end and be processed
# so adding a 1 day buffer and fetching by default till current time
start_one_day_offset = start_datetime - timedelta(days=1)
start_time = start_one_day_offset.isoformat()
end_time = datetime.fromtimestamp(end, tz=timezone.utc).isoformat()
logger.info(f"Fetching Gong calls between {start_time} and {end_time}")
return self._fetch_calls(start_time, end_time)
if __name__ == "__main__":
import os
connector = GongConnector()
connector.load_credentials(
{
"gong_access_key": os.environ["GONG_ACCESS_KEY"],
"gong_access_key_secret": os.environ["GONG_ACCESS_KEY_SECRET"],
}
)
latest_docs = connector.load_from_state()
print(next(latest_docs))