mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-29 09:20:01 +02:00
Add more logging + retries to teams connector (#3369)
This commit is contained in:
@ -33,7 +33,7 @@ def get_created_datetime(chat_message: ChatMessage) -> datetime:
|
|||||||
|
|
||||||
def _extract_channel_members(channel: Channel) -> list[BasicExpertInfo]:
|
def _extract_channel_members(channel: Channel) -> list[BasicExpertInfo]:
|
||||||
channel_members_list: list[BasicExpertInfo] = []
|
channel_members_list: list[BasicExpertInfo] = []
|
||||||
members = channel.members.get().execute_query()
|
members = channel.members.get().execute_query_retry()
|
||||||
for member in members:
|
for member in members:
|
||||||
channel_members_list.append(BasicExpertInfo(display_name=member.display_name))
|
channel_members_list.append(BasicExpertInfo(display_name=member.display_name))
|
||||||
return channel_members_list
|
return channel_members_list
|
||||||
@ -51,7 +51,7 @@ def _get_threads_from_channel(
|
|||||||
end = end.replace(tzinfo=timezone.utc)
|
end = end.replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
query = channel.messages.get()
|
query = channel.messages.get()
|
||||||
base_messages: list[ChatMessage] = query.execute_query()
|
base_messages: list[ChatMessage] = query.execute_query_retry()
|
||||||
|
|
||||||
threads: list[list[ChatMessage]] = []
|
threads: list[list[ChatMessage]] = []
|
||||||
for base_message in base_messages:
|
for base_message in base_messages:
|
||||||
@ -65,7 +65,7 @@ def _get_threads_from_channel(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
reply_query = base_message.replies.get_all()
|
reply_query = base_message.replies.get_all()
|
||||||
replies = reply_query.execute_query()
|
replies = reply_query.execute_query_retry()
|
||||||
|
|
||||||
# start a list containing the base message and its replies
|
# start a list containing the base message and its replies
|
||||||
thread: list[ChatMessage] = [base_message]
|
thread: list[ChatMessage] = [base_message]
|
||||||
@ -82,7 +82,7 @@ def _get_channels_from_teams(
|
|||||||
channels_list: list[Channel] = []
|
channels_list: list[Channel] = []
|
||||||
for team in teams:
|
for team in teams:
|
||||||
query = team.channels.get()
|
query = team.channels.get()
|
||||||
channels = query.execute_query()
|
channels = query.execute_query_retry()
|
||||||
channels_list.extend(channels)
|
channels_list.extend(channels)
|
||||||
|
|
||||||
return channels_list
|
return channels_list
|
||||||
@ -210,7 +210,7 @@ class TeamsConnector(LoadConnector, PollConnector):
|
|||||||
|
|
||||||
teams_list: list[Team] = []
|
teams_list: list[Team] = []
|
||||||
|
|
||||||
teams = self.graph_client.teams.get().execute_query()
|
teams = self.graph_client.teams.get().execute_query_retry()
|
||||||
|
|
||||||
if len(self.requested_team_list) > 0:
|
if len(self.requested_team_list) > 0:
|
||||||
adjusted_request_strings = [
|
adjusted_request_strings = [
|
||||||
@ -234,14 +234,25 @@ class TeamsConnector(LoadConnector, PollConnector):
|
|||||||
raise ConnectorMissingCredentialError("Teams")
|
raise ConnectorMissingCredentialError("Teams")
|
||||||
|
|
||||||
teams = self._get_all_teams()
|
teams = self._get_all_teams()
|
||||||
|
logger.debug(f"Found available teams: {[str(t) for t in teams]}")
|
||||||
|
if not teams:
|
||||||
|
msg = "No teams found."
|
||||||
|
logger.error(msg)
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
channels = _get_channels_from_teams(
|
channels = _get_channels_from_teams(
|
||||||
teams=teams,
|
teams=teams,
|
||||||
)
|
)
|
||||||
|
logger.debug(f"Found available channels: {[c.id for c in channels]}")
|
||||||
|
if not channels:
|
||||||
|
msg = "No channels found."
|
||||||
|
logger.error(msg)
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
# goes over channels, converts them into Document objects and then yields them in batches
|
# goes over channels, converts them into Document objects and then yields them in batches
|
||||||
doc_batch: list[Document] = []
|
doc_batch: list[Document] = []
|
||||||
for channel in channels:
|
for channel in channels:
|
||||||
|
logger.debug(f"Fetching threads from channel: {channel.id}")
|
||||||
thread_list = _get_threads_from_channel(channel, start=start, end=end)
|
thread_list = _get_threads_from_channel(channel, start=start, end=end)
|
||||||
for thread in thread_list:
|
for thread in thread_list:
|
||||||
converted_doc = _convert_thread_to_document(channel, thread)
|
converted_doc = _convert_thread_to_document(channel, thread)
|
||||||
@ -259,8 +270,8 @@ class TeamsConnector(LoadConnector, PollConnector):
|
|||||||
def poll_source(
|
def poll_source(
|
||||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||||
) -> GenerateDocumentsOutput:
|
) -> GenerateDocumentsOutput:
|
||||||
start_datetime = datetime.utcfromtimestamp(start)
|
start_datetime = datetime.fromtimestamp(start, timezone.utc)
|
||||||
end_datetime = datetime.utcfromtimestamp(end)
|
end_datetime = datetime.fromtimestamp(end, timezone.utc)
|
||||||
return self._fetch_from_teams(start=start_datetime, end=end_datetime)
|
return self._fetch_from_teams(start=start_datetime, end=end_datetime)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user