mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-08-02 13:12:50 +02:00
317 lines
12 KiB
Python
317 lines
12 KiB
Python
import base64
|
|
from collections.abc import Generator
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from datetime import timezone
|
|
from typing import Any
|
|
from typing import cast
|
|
|
|
import requests
|
|
|
|
from onyx.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
|
|
from onyx.configs.app_configs import GONG_CONNECTOR_START_TIME
|
|
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
|
from onyx.configs.constants import DocumentSource
|
|
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
|
from onyx.connectors.interfaces import LoadConnector
|
|
from onyx.connectors.interfaces import PollConnector
|
|
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
|
from onyx.connectors.models import ConnectorMissingCredentialError
|
|
from onyx.connectors.models import Document
|
|
from onyx.connectors.models import Section
|
|
from onyx.utils.logger import setup_logger
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
GONG_BASE_URL = "https://us-34014.api.gong.io"
|
|
|
|
|
|
class GongConnector(LoadConnector, PollConnector):
|
|
def __init__(
|
|
self,
|
|
workspaces: list[str] | None = None,
|
|
batch_size: int = INDEX_BATCH_SIZE,
|
|
continue_on_fail: bool = CONTINUE_ON_CONNECTOR_FAILURE,
|
|
hide_user_info: bool = False,
|
|
) -> None:
|
|
self.workspaces = workspaces
|
|
self.batch_size: int = batch_size
|
|
self.continue_on_fail = continue_on_fail
|
|
self.auth_token_basic: str | None = None
|
|
self.hide_user_info = hide_user_info
|
|
|
|
def _get_auth_header(self) -> dict[str, str]:
|
|
if self.auth_token_basic is None:
|
|
raise ConnectorMissingCredentialError("Gong")
|
|
|
|
return {"Authorization": f"Basic {self.auth_token_basic}"}
|
|
|
|
def _get_workspace_id_map(self) -> dict[str, str]:
|
|
url = f"{GONG_BASE_URL}/v2/workspaces"
|
|
response = requests.get(url, headers=self._get_auth_header())
|
|
response.raise_for_status()
|
|
|
|
workspaces_details = response.json().get("workspaces")
|
|
name_id_map = {
|
|
workspace["name"]: workspace["id"] for workspace in workspaces_details
|
|
}
|
|
id_id_map = {
|
|
workspace["id"]: workspace["id"] for workspace in workspaces_details
|
|
}
|
|
# In very rare case, if a workspace is given a name which is the id of another workspace,
|
|
# Then the user input is treated as the name
|
|
return {**id_id_map, **name_id_map}
|
|
|
|
def _get_transcript_batches(
|
|
self, start_datetime: str | None = None, end_datetime: str | None = None
|
|
) -> Generator[list[dict[str, Any]], None, None]:
|
|
url = f"{GONG_BASE_URL}/v2/calls/transcript"
|
|
body: dict[str, dict] = {"filter": {}}
|
|
if start_datetime:
|
|
body["filter"]["fromDateTime"] = start_datetime
|
|
if end_datetime:
|
|
body["filter"]["toDateTime"] = end_datetime
|
|
|
|
# The batch_ids in the previous method appears to be batches of call_ids to process
|
|
# In this method, we will retrieve transcripts for them in batches.
|
|
transcripts: list[dict[str, Any]] = []
|
|
workspace_list = self.workspaces or [None] # type: ignore
|
|
workspace_map = self._get_workspace_id_map() if self.workspaces else {}
|
|
|
|
for workspace in workspace_list:
|
|
if workspace:
|
|
logger.info(f"Updating Gong workspace: {workspace}")
|
|
workspace_id = workspace_map.get(workspace)
|
|
if not workspace_id:
|
|
logger.error(f"Invalid Gong workspace: {workspace}")
|
|
if not self.continue_on_fail:
|
|
raise ValueError(f"Invalid workspace: {workspace}")
|
|
continue
|
|
body["filter"]["workspaceId"] = workspace_id
|
|
else:
|
|
if "workspaceId" in body["filter"]:
|
|
del body["filter"]["workspaceId"]
|
|
|
|
while True:
|
|
response = requests.post(
|
|
url, headers=self._get_auth_header(), json=body
|
|
)
|
|
# If no calls in the range, just break out
|
|
if response.status_code == 404:
|
|
break
|
|
|
|
try:
|
|
response.raise_for_status()
|
|
except Exception:
|
|
logger.error(f"Error fetching transcripts: {response.text}")
|
|
raise
|
|
|
|
data = response.json()
|
|
call_transcripts = data.get("callTranscripts", [])
|
|
transcripts.extend(call_transcripts)
|
|
|
|
while len(transcripts) >= self.batch_size:
|
|
yield transcripts[: self.batch_size]
|
|
transcripts = transcripts[self.batch_size :]
|
|
|
|
cursor = data.get("records", {}).get("cursor")
|
|
if cursor:
|
|
body["cursor"] = cursor
|
|
else:
|
|
break
|
|
|
|
if transcripts:
|
|
yield transcripts
|
|
|
|
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict:
|
|
url = f"{GONG_BASE_URL}/v2/calls/extensive"
|
|
|
|
body = {
|
|
"filter": {"callIds": call_ids},
|
|
"contentSelector": {"exposedFields": {"parties": True}},
|
|
}
|
|
|
|
response = requests.post(url, headers=self._get_auth_header(), json=body)
|
|
response.raise_for_status()
|
|
|
|
calls = response.json().get("calls")
|
|
call_to_metadata = {}
|
|
for call in calls:
|
|
call_to_metadata[call["metaData"]["id"]] = call
|
|
|
|
return call_to_metadata
|
|
|
|
@staticmethod
|
|
def _parse_parties(parties: list[dict]) -> dict[str, str]:
|
|
id_mapping = {}
|
|
for party in parties:
|
|
name = party.get("name")
|
|
email = party.get("emailAddress")
|
|
|
|
if name and email:
|
|
full_identifier = f"{name} ({email})"
|
|
elif name:
|
|
full_identifier = name
|
|
elif email:
|
|
full_identifier = email
|
|
else:
|
|
full_identifier = "Unknown"
|
|
|
|
id_mapping[party["speakerId"]] = full_identifier
|
|
|
|
return id_mapping
|
|
|
|
def _fetch_calls(
|
|
self, start_datetime: str | None = None, end_datetime: str | None = None
|
|
) -> GenerateDocumentsOutput:
|
|
for transcript_batch in self._get_transcript_batches(
|
|
start_datetime, end_datetime
|
|
):
|
|
doc_batch: list[Document] = []
|
|
|
|
call_ids = cast(
|
|
list[str],
|
|
[t.get("callId") for t in transcript_batch if t.get("callId")],
|
|
)
|
|
call_details_map = self._get_call_details_by_ids(call_ids)
|
|
|
|
for transcript in transcript_batch:
|
|
call_id = transcript.get("callId")
|
|
|
|
if not call_id or call_id not in call_details_map:
|
|
logger.error(
|
|
f"Couldn't get call information for Call ID: {call_id}"
|
|
)
|
|
if not self.continue_on_fail:
|
|
raise RuntimeError(
|
|
f"Couldn't get call information for Call ID: {call_id}"
|
|
)
|
|
continue
|
|
|
|
call_details = call_details_map[call_id]
|
|
call_metadata = call_details["metaData"]
|
|
|
|
call_time_str = call_metadata["started"]
|
|
call_title = call_metadata["title"]
|
|
logger.info(
|
|
f"Indexing Gong call from {call_time_str.split('T', 1)[0]}: {call_title}"
|
|
)
|
|
|
|
call_parties = cast(list[dict] | None, call_details.get("parties"))
|
|
if call_parties is None:
|
|
logger.error(f"Couldn't get parties for Call ID: {call_id}")
|
|
call_parties = []
|
|
|
|
id_to_name_map = self._parse_parties(call_parties)
|
|
|
|
# Keeping a separate dict here in case the parties info is incomplete
|
|
speaker_to_name: dict[str, str] = {}
|
|
|
|
transcript_text = ""
|
|
call_purpose = call_metadata["purpose"]
|
|
if call_purpose:
|
|
transcript_text += f"Call Description: {call_purpose}\n\n"
|
|
|
|
contents = transcript["transcript"]
|
|
for segment in contents:
|
|
speaker_id = segment.get("speakerId", "")
|
|
if speaker_id not in speaker_to_name:
|
|
if self.hide_user_info:
|
|
speaker_to_name[
|
|
speaker_id
|
|
] = f"User {len(speaker_to_name) + 1}"
|
|
else:
|
|
speaker_to_name[speaker_id] = id_to_name_map.get(
|
|
speaker_id, "Unknown"
|
|
)
|
|
|
|
speaker_name = speaker_to_name[speaker_id]
|
|
|
|
sentences = segment.get("sentences", {})
|
|
monolog = " ".join(
|
|
[sentence.get("text", "") for sentence in sentences]
|
|
)
|
|
transcript_text += f"{speaker_name}: {monolog}\n\n"
|
|
|
|
metadata = {}
|
|
if call_metadata.get("system"):
|
|
metadata["client"] = call_metadata.get("system")
|
|
# TODO calls have a clientUniqueId field, can pull that in later
|
|
|
|
doc_batch.append(
|
|
Document(
|
|
id=call_id,
|
|
sections=[
|
|
Section(link=call_metadata["url"], text=transcript_text)
|
|
],
|
|
source=DocumentSource.GONG,
|
|
# Should not ever be Untitled as a call cannot be made without a Title
|
|
semantic_identifier=call_title or "Untitled",
|
|
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
|
|
timezone.utc
|
|
),
|
|
metadata={"client": call_metadata.get("system")},
|
|
)
|
|
)
|
|
yield doc_batch
|
|
|
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
|
combined = (
|
|
f'{credentials["gong_access_key"]}:{credentials["gong_access_key_secret"]}'
|
|
)
|
|
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
|
|
"utf-8"
|
|
)
|
|
return None
|
|
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
return self._fetch_calls()
|
|
|
|
def poll_source(
|
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
|
) -> GenerateDocumentsOutput:
|
|
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
|
|
|
|
# if this env variable is set, don't start from a timestamp before the specified
|
|
# start time
|
|
# TODO: remove this once this is globally available
|
|
if GONG_CONNECTOR_START_TIME:
|
|
special_start_datetime = datetime.fromisoformat(GONG_CONNECTOR_START_TIME)
|
|
special_start_datetime = special_start_datetime.replace(tzinfo=timezone.utc)
|
|
else:
|
|
special_start_datetime = datetime.fromtimestamp(0, tz=timezone.utc)
|
|
|
|
# don't let the special start dt be past the end time, this causes issues when
|
|
# the Gong API (`filter.fromDateTime: must be before toDateTime`)
|
|
special_start_datetime = min(special_start_datetime, end_datetime)
|
|
|
|
start_datetime = max(
|
|
datetime.fromtimestamp(start, tz=timezone.utc), special_start_datetime
|
|
)
|
|
|
|
# Because these are meeting start times, the meeting needs to end and be processed
|
|
# so adding a 1 day buffer and fetching by default till current time
|
|
start_one_day_offset = start_datetime - timedelta(days=1)
|
|
start_time = start_one_day_offset.isoformat()
|
|
|
|
end_time = datetime.fromtimestamp(end, tz=timezone.utc).isoformat()
|
|
|
|
logger.info(f"Fetching Gong calls between {start_time} and {end_time}")
|
|
return self._fetch_calls(start_time, end_time)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import os
|
|
|
|
connector = GongConnector()
|
|
connector.load_credentials(
|
|
{
|
|
"gong_access_key": os.environ["GONG_ACCESS_KEY"],
|
|
"gong_access_key_secret": os.environ["GONG_ACCESS_KEY_SECRET"],
|
|
}
|
|
)
|
|
|
|
latest_docs = connector.load_from_state()
|
|
print(next(latest_docs))
|