handle gong api race condition (#4457)

* working around a gong race condition in their api

* add back gong basic test

* formatting

* add the call index

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-04-04 12:33:47 -07:00 committed by GitHub
parent 839c8611b7
commit 15ab0586df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 59 additions and 8 deletions

View File

@ -1,4 +1,5 @@
import base64
import time
from collections.abc import Generator
from datetime import datetime
from datetime import timedelta
@ -28,6 +29,8 @@ logger = setup_logger()
class GongConnector(LoadConnector, PollConnector):
BASE_URL = "https://api.gong.io"
MAX_CALL_DETAILS_ATTEMPTS = 6
CALL_DETAILS_DELAY = 30 # in seconds
def __init__(
self,
@ -173,17 +176,59 @@ class GongConnector(LoadConnector, PollConnector):
def _fetch_calls(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> GenerateDocumentsOutput:
num_calls = 0
for transcript_batch in self._get_transcript_batches(
start_datetime, end_datetime
):
doc_batch: list[Document] = []
call_ids = cast(
transcript_call_ids = cast(
list[str],
[t.get("callId") for t in transcript_batch if t.get("callId")],
)
call_details_map = self._get_call_details_by_ids(call_ids)
call_details_map: dict[str, Any] = {}
# There's a likely race condition in the API where a transcript will have a
# call id but the call to v2/calls/extensive will not return all of the id's
# retry with exponential backoff has been observed to mitigate this
# in ~2 minutes
current_attempt = 0
while True:
current_attempt += 1
call_details_map = self._get_call_details_by_ids(transcript_call_ids)
if set(transcript_call_ids) == set(call_details_map.keys()):
# we got all the id's we were expecting ... break and continue
break
# we are missing some id's. Log and retry with exponential backoff
missing_call_ids = set(transcript_call_ids) - set(
call_details_map.keys()
)
logger.warning(
f"_get_call_details_by_ids is missing call id's: "
f"current_attempt={current_attempt} "
f"missing_call_ids={missing_call_ids}"
)
if current_attempt >= self.MAX_CALL_DETAILS_ATTEMPTS:
raise RuntimeError(
f"Attempt count exceeded for _get_call_details_by_ids: "
f"missing_call_ids={missing_call_ids} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, current_attempt - 1)
logger.warning(
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={current_attempt} "
f"next_attempt={current_attempt+1} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
# now we can iterate per call/transcript
for transcript in transcript_batch:
call_id = transcript.get("callId")
@ -196,7 +241,7 @@ class GongConnector(LoadConnector, PollConnector):
if call_id:
logger.error(
f"Call debug info: call_id={call_id} "
f"call_ids={call_ids} "
f"call_ids={transcript_call_ids} "
f"call_details_map={call_details_map.keys()}"
)
if not self.continue_on_fail:
@ -211,7 +256,8 @@ class GongConnector(LoadConnector, PollConnector):
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"Indexing Gong call from {call_time_str.split('T', 1)[0]}: {call_title}"
f"{num_calls+1}: Indexing Gong call id {call_id} "
f"from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
@ -270,8 +316,13 @@ class GongConnector(LoadConnector, PollConnector):
metadata={"client": call_metadata.get("system")},
)
)
num_calls += 1
yield doc_batch
logger.info(f"_fetch_calls finished: num_calls={num_calls}")
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f'{credentials["gong_access_key"]}:{credentials["gong_access_key_secret"]}'

View File

@ -175,7 +175,7 @@ class EmbeddingModel:
embeddings: list[Embedding] = []
def process_batch(
batch_idx: int, text_batch: list[str]
batch_idx: int, batch_len: int, text_batch: list[str]
) -> tuple[int, list[Embedding]]:
if self.callback:
if self.callback.should_stop():
@ -203,7 +203,7 @@ class EmbeddingModel:
processing_time = end_time - start_time
logger.debug(
f"EmbeddingModel.process_batch: Batch {batch_idx} processing time: {processing_time:.2f} seconds"
f"EmbeddingModel.process_batch: Batch {batch_idx}/{batch_len} processing time: {processing_time:.2f} seconds"
)
return batch_idx, response.embeddings
@ -215,7 +215,7 @@ class EmbeddingModel:
if num_threads >= 1 and self.provider_type and len(text_batches) > 1:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_batch = {
executor.submit(process_batch, idx, batch): idx
executor.submit(process_batch, idx, len(text_batches), batch): idx
for idx, batch in enumerate(text_batches, start=1)
}
@ -238,7 +238,7 @@ class EmbeddingModel:
else:
# Original sequential processing
for idx, text_batch in enumerate(text_batches, start=1):
_, batch_embeddings = process_batch(idx, text_batch)
_, batch_embeddings = process_batch(idx, len(text_batches), text_batch)
embeddings.extend(batch_embeddings)
if self.callback:
self.callback.progress("_batch_encode_texts", 1)