slightly better slack logging (#4554)

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-04-17 18:45:48 -07:00 committed by GitHub
parent 953a4e3793
commit a8a5a82251
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -108,14 +108,13 @@ def get_channels(
channel_types=channel_types, channel_types=channel_types,
) )
except SlackApiError as e: except SlackApiError as e:
logger.info( msg = f"Unable to fetch private channels due to: {e}."
f"Unable to fetch private channels due to: {e}. Trying again without private channels." if not get_public:
) logger.warning(msg + " Public channels are not enabled.")
if get_public:
channel_types = ["public_channel"]
else:
logger.warning("No channels to fetch.")
return [] return []
logger.warning(msg + " Trying again with public channels only.")
channel_types = ["public_channel"]
channels = _collect_paginated_channels( channels = _collect_paginated_channels(
client=client, client=client,
exclude_archived=exclude_archived, exclude_archived=exclude_archived,
@ -615,6 +614,10 @@ class SlackConnector(
filtered_channels = filter_channels( filtered_channels = filter_channels(
raw_channels, self.channels, self.channel_regex_enabled raw_channels, self.channels, self.channel_regex_enabled
) )
logger.info(
f"Channels: all={len(raw_channels)} post_filtering={len(filtered_channels)}"
)
checkpoint.channel_ids = [c["id"] for c in filtered_channels] checkpoint.channel_ids = [c["id"] for c in filtered_channels]
if len(filtered_channels) == 0: if len(filtered_channels) == 0:
checkpoint.has_more = False checkpoint.has_more = False
@ -645,6 +648,7 @@ class SlackConnector(
) )
new_latest = message_batch[-1]["ts"] if message_batch else latest new_latest = message_batch[-1]["ts"] if message_batch else latest
num_threads_start = len(seen_thread_ts)
# Process messages in parallel using ThreadPoolExecutor # Process messages in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=self.num_threads) as executor: with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures: list[Future[ProcessedSlackMessage]] = [] futures: list[Future[ProcessedSlackMessage]] = []
@ -678,13 +682,13 @@ class SlackConnector(
if thread_or_message_ts not in seen_thread_ts: if thread_or_message_ts not in seen_thread_ts:
yield doc yield doc
assert (
thread_or_message_ts
), "found non-None doc with None thread_or_message_ts"
seen_thread_ts.add(thread_or_message_ts) seen_thread_ts.add(thread_or_message_ts)
elif failure: elif failure:
yield failure yield failure
num_threads_processed = len(seen_thread_ts) - num_threads_start
logger.info(f"Processed {num_threads_processed} threads.")
checkpoint.seen_thread_ts = list(seen_thread_ts) checkpoint.seen_thread_ts = list(seen_thread_ts)
checkpoint.channel_completion_map[channel["id"]] = new_latest checkpoint.channel_completion_map[channel["id"]] = new_latest
if has_more_in_channel: if has_more_in_channel: