mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-24 23:11:01 +02:00
200 lines
6.4 KiB
Python
200 lines
6.4 KiB
Python
from collections.abc import Iterator
|
|
from datetime import datetime
|
|
from datetime import timezone
|
|
from typing import List
|
|
|
|
import requests
|
|
|
|
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
|
from onyx.configs.constants import DocumentSource
|
|
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
|
from onyx.connectors.interfaces import LoadConnector
|
|
from onyx.connectors.interfaces import PollConnector
|
|
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
|
from onyx.connectors.models import BasicExpertInfo
|
|
from onyx.connectors.models import ConnectorMissingCredentialError
|
|
from onyx.connectors.models import Document
|
|
from onyx.connectors.models import Section
|
|
from onyx.utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
_FIREFLIES_ID_PREFIX = "FIREFLIES_"
|
|
|
|
_FIREFLIES_API_URL = "https://api.fireflies.ai/graphql"
|
|
|
|
_FIREFLIES_TRANSCRIPT_QUERY_SIZE = 50 # Max page size is 50
|
|
|
|
_FIREFLIES_API_QUERY = """
|
|
query Transcripts($fromDate: DateTime, $toDate: DateTime, $limit: Int!, $skip: Int!) {
|
|
transcripts(fromDate: $fromDate, toDate: $toDate, limit: $limit, skip: $skip) {
|
|
id
|
|
title
|
|
organizer_email
|
|
participants
|
|
date
|
|
transcript_url
|
|
sentences {
|
|
text
|
|
speaker_name
|
|
start_time
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def _create_doc_from_transcript(transcript: dict) -> Document | None:
|
|
sections: List[Section] = []
|
|
current_speaker_name = None
|
|
current_link = ""
|
|
current_text = ""
|
|
|
|
if transcript["sentences"] is None:
|
|
return None
|
|
|
|
for sentence in transcript["sentences"]:
|
|
if sentence["speaker_name"] != current_speaker_name:
|
|
if current_speaker_name is not None:
|
|
sections.append(
|
|
Section(
|
|
link=current_link,
|
|
text=current_text.strip(),
|
|
)
|
|
)
|
|
current_speaker_name = sentence.get("speaker_name") or "Unknown Speaker"
|
|
current_link = f"{transcript['transcript_url']}?t={sentence['start_time']}"
|
|
current_text = f"{current_speaker_name}: "
|
|
|
|
cleaned_text = sentence["text"].replace("\xa0", " ")
|
|
current_text += f"{cleaned_text} "
|
|
|
|
# Sometimes these links (links with a timestamp) do not work, it is a bug with Fireflies.
|
|
sections.append(
|
|
Section(
|
|
link=current_link,
|
|
text=current_text.strip(),
|
|
)
|
|
)
|
|
|
|
fireflies_id = _FIREFLIES_ID_PREFIX + transcript["id"]
|
|
|
|
meeting_title = transcript["title"] or "No Title"
|
|
|
|
meeting_date_unix = transcript["date"]
|
|
meeting_date = datetime.fromtimestamp(meeting_date_unix / 1000, tz=timezone.utc)
|
|
|
|
meeting_organizer_email = transcript["organizer_email"]
|
|
organizer_email_user_info = [BasicExpertInfo(email=meeting_organizer_email)]
|
|
|
|
meeting_participants_email_list = []
|
|
for participant in transcript.get("participants", []):
|
|
if participant != meeting_organizer_email and participant:
|
|
meeting_participants_email_list.append(BasicExpertInfo(email=participant))
|
|
|
|
return Document(
|
|
id=fireflies_id,
|
|
sections=sections,
|
|
source=DocumentSource.FIREFLIES,
|
|
semantic_identifier=meeting_title,
|
|
metadata={},
|
|
doc_updated_at=meeting_date,
|
|
primary_owners=organizer_email_user_info,
|
|
secondary_owners=meeting_participants_email_list,
|
|
)
|
|
|
|
|
|
class FirefliesConnector(PollConnector, LoadConnector):
|
|
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
|
|
self.batch_size = batch_size
|
|
|
|
def load_credentials(self, credentials: dict[str, str]) -> None:
|
|
api_key = credentials.get("fireflies_api_key")
|
|
|
|
if not isinstance(api_key, str):
|
|
raise ConnectorMissingCredentialError(
|
|
"The Fireflies API key must be a string"
|
|
)
|
|
|
|
self.api_key = api_key
|
|
|
|
return None
|
|
|
|
def _fetch_transcripts(
|
|
self, start_datetime: str | None = None, end_datetime: str | None = None
|
|
) -> Iterator[List[dict]]:
|
|
if self.api_key is None:
|
|
raise ConnectorMissingCredentialError("Missing API key")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Bearer " + self.api_key,
|
|
}
|
|
|
|
skip = 0
|
|
variables: dict[str, int | str] = {
|
|
"limit": _FIREFLIES_TRANSCRIPT_QUERY_SIZE,
|
|
}
|
|
|
|
if start_datetime:
|
|
variables["fromDate"] = start_datetime
|
|
if end_datetime:
|
|
variables["toDate"] = end_datetime
|
|
|
|
while True:
|
|
variables["skip"] = skip
|
|
response = requests.post(
|
|
_FIREFLIES_API_URL,
|
|
headers=headers,
|
|
json={"query": _FIREFLIES_API_QUERY, "variables": variables},
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
if response.status_code == 204:
|
|
break
|
|
|
|
recieved_transcripts = response.json()
|
|
parsed_transcripts = recieved_transcripts.get("data", {}).get(
|
|
"transcripts", []
|
|
)
|
|
|
|
yield parsed_transcripts
|
|
|
|
if len(parsed_transcripts) < _FIREFLIES_TRANSCRIPT_QUERY_SIZE:
|
|
break
|
|
|
|
skip += _FIREFLIES_TRANSCRIPT_QUERY_SIZE
|
|
|
|
def _process_transcripts(
|
|
self, start: str | None = None, end: str | None = None
|
|
) -> GenerateDocumentsOutput:
|
|
doc_batch: List[Document] = []
|
|
|
|
for transcript_batch in self._fetch_transcripts(start, end):
|
|
for transcript in transcript_batch:
|
|
if doc := _create_doc_from_transcript(transcript):
|
|
doc_batch.append(doc)
|
|
|
|
if len(doc_batch) >= self.batch_size:
|
|
yield doc_batch
|
|
doc_batch = []
|
|
|
|
if doc_batch:
|
|
yield doc_batch
|
|
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
return self._process_transcripts()
|
|
|
|
def poll_source(
|
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
|
) -> GenerateDocumentsOutput:
|
|
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc).strftime(
|
|
"%Y-%m-%dT%H:%M:%S.000Z"
|
|
)
|
|
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc).strftime(
|
|
"%Y-%m-%dT%H:%M:%S.000Z"
|
|
)
|
|
|
|
yield from self._process_transcripts(start_datetime, end_datetime)
|