Reformat Slack Message Display (#1056)

This commit is contained in:
Yuhong Sun 2024-02-08 14:37:46 -08:00 committed by GitHub
parent 5a056f1c0c
commit cd8d8def1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 250 additions and 129 deletions

View File

@ -119,6 +119,11 @@ POSTGRES_DB = os.environ.get("POSTGRES_DB") or "postgres"
#####
POLL_CONNECTOR_OFFSET = 30 # Minutes overlap between poll windows
# Some calls to get information on expert users are quite costly especially with rate limiting
# Since experts are not used in the actual user experience, currently it is turned off
# for some connectors
ENABLE_EXPENSIVE_EXPERT_CALLS = False
GOOGLE_DRIVE_INCLUDE_SHARED = False
GOOGLE_DRIVE_FOLLOW_SHORTCUTS = False
GOOGLE_DRIVE_ONLY_ORG_PUBLIC = False

View File

@ -27,8 +27,8 @@ from danswer.connectors.productboard.connector import ProductboardConnector
from danswer.connectors.requesttracker.connector import RequestTrackerConnector
from danswer.connectors.sharepoint.connector import SharepointConnector
from danswer.connectors.slab.connector import SlabConnector
from danswer.connectors.slack.connector import SlackLoadConnector
from danswer.connectors.slack.connector import SlackPollConnector
from danswer.connectors.slack.load_connector import SlackLoadConnector
from danswer.connectors.web.connector import WebConnector
from danswer.connectors.zendesk.connector import ZendeskConnector
from danswer.connectors.zulip.connector import ZulipConnector

View File

@ -42,6 +42,25 @@ class BasicExpertInfo(BaseModel):
last_name: str | None = None
email: str | None = None
def get_semantic_name(self) -> str:
if self.first_name and self.last_name:
name_parts = [self.first_name]
if self.middle_initial:
name_parts.append(self.middle_initial + ".")
name_parts.append(self.last_name)
return " ".join([name_part.capitalize() for name_part in name_parts])
if self.display_name:
return self.display_name
if self.email:
return self.email
if self.first_name:
return self.first_name.capitalize()
return "Unknown"
class DocumentBase(BaseModel):
"""Used for Danswer ingestion api, the ID is inferred before use if not provided"""

View File

@ -1,10 +1,8 @@
import json
import re
from collections.abc import Callable
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
@ -12,16 +10,18 @@ from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.web import SlackResponse
from danswer.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.cross_connector_utils.retry_wrapper import retry_builder
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import BasicExpertInfo
from danswer.connectors.models import ConnectorMissingCredentialError
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.slack.utils import expert_info_from_slack_id
from danswer.connectors.slack.utils import get_message_link
from danswer.connectors.slack.utils import make_slack_api_call_logged
from danswer.connectors.slack.utils import make_slack_api_call_paginated
@ -142,20 +142,46 @@ def get_latest_message_time(thread: ThreadType) -> datetime:
return datetime.fromtimestamp(max_ts, tz=timezone.utc)
def get_event_time(event: dict[str, Any]) -> datetime | None:
ts = event.get("ts")
if not ts:
return None
return datetime.fromtimestamp(float(ts), tz=timezone.utc)
def thread_to_doc(
workspace: str,
channel: ChannelType,
thread: ThreadType,
slack_cleaner: SlackTextCleaner,
client: WebClient,
user_cache: dict[str, BasicExpertInfo | None],
) -> Document:
channel_id = channel["id"]
initial_sender_expert_info = expert_info_from_slack_id(
user_id=thread[0].get("user"), client=client, user_cache=user_cache
)
initial_sender_name = (
initial_sender_expert_info.get_semantic_name()
if initial_sender_expert_info
else "Unknown"
)
valid_experts = None
if ENABLE_EXPENSIVE_EXPERT_CALLS:
all_sender_ids = [m.get("user") for m in thread]
experts = [
expert_info_from_slack_id(
user_id=sender_id, client=client, user_cache=user_cache
)
for sender_id in all_sender_ids
if sender_id
]
valid_experts = [expert for expert in experts if expert]
first_message = slack_cleaner.index_clean(cast(str, thread[0]["text"]))
snippet = (
first_message[:50].rstrip() + "..."
if len(first_message) > 50
else first_message
)
doc_sem_id = f"{initial_sender_name} in #{channel['name']}: {snippet}"
return Document(
id=f"{channel_id}__{thread[0]['ts']}",
sections=[
@ -168,10 +194,11 @@ def thread_to_doc(
for m in thread
],
source=DocumentSource.SLACK,
semantic_identifier=channel["name"],
semantic_identifier=doc_sem_id,
doc_updated_at=get_latest_message_time(thread),
title="", # slack docs don't really have a "title"
metadata={},
primary_owners=valid_experts,
metadata={"Channel": channel["name"]},
)
@ -204,7 +231,7 @@ def _default_msg_filter(message: MessageType) -> bool:
return False
def _filter_channels(
def filter_channels(
all_channels: list[dict[str, Any]],
channels_to_connect: list[str] | None,
regex_enabled: bool,
@ -250,8 +277,11 @@ def get_all_docs(
"""Get all documents in the workspace, channel by channel"""
slack_cleaner = SlackTextCleaner(client=client)
# Cache to prevent refetching via API since users
user_cache: dict[str, BasicExpertInfo | None] = {}
all_channels = get_channels(client)
filtered_channels = _filter_channels(
filtered_channels = filter_channels(
all_channels, channels, channel_name_regex_enabled
)
@ -288,6 +318,8 @@ def get_all_docs(
channel=channel,
thread=filtered_thread,
slack_cleaner=slack_cleaner,
client=client,
user_cache=user_cache,
)
logger.info(
@ -295,118 +327,6 @@ def get_all_docs(
)
class SlackLoadConnector(LoadConnector):
def __init__(
self,
workspace: str,
export_path_str: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.export_path_str = export_path_str
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
if credentials:
logger.warning("Unexpected credentials provided for Slack Load Connector")
return None
@staticmethod
def _process_batch_event(
slack_event: dict[str, Any],
channel: dict[str, Any],
matching_doc: Document | None,
workspace: str,
) -> Document | None:
if (
slack_event["type"] == "message"
and slack_event.get("subtype") != "channel_join"
):
if matching_doc:
return Document(
id=matching_doc.id,
sections=matching_doc.sections
+ [
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=matching_doc.source,
semantic_identifier=matching_doc.semantic_identifier,
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata=matching_doc.metadata,
)
return Document(
id=slack_event["ts"],
sections=[
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=DocumentSource.SLACK,
semantic_identifier=channel["name"],
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata={},
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
export_path = Path(self.export_path_str)
with open(export_path / "channels.json") as f:
all_channels = json.load(f)
filtered_channels = _filter_channels(
all_channels, self.channels, self.channel_regex_enabled
)
document_batch: dict[str, Document] = {}
for channel_info in filtered_channels:
channel_dir_path = export_path / cast(str, channel_info["name"])
channel_file_paths = [
channel_dir_path / file_name
for file_name in os.listdir(channel_dir_path)
]
for path in channel_file_paths:
with open(path) as f:
events = cast(list[dict[str, Any]], json.load(f))
for slack_event in events:
doc = self._process_batch_event(
slack_event=slack_event,
channel=channel_info,
matching_doc=document_batch.get(
slack_event.get("thread_ts", "")
),
workspace=self.workspace,
)
if doc:
document_batch[doc.id] = doc
if len(document_batch) >= self.batch_size:
yield list(document_batch.values())
yield list(document_batch.values())
class SlackPollConnector(PollConnector):
def __init__(
self,

View File

@ -0,0 +1,139 @@
import json
import os
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.slack.connector import filter_channels
from danswer.connectors.slack.utils import get_message_link
from danswer.utils.logger import setup_logger
logger = setup_logger()
def get_event_time(event: dict[str, Any]) -> datetime | None:
ts = event.get("ts")
if not ts:
return None
return datetime.fromtimestamp(float(ts), tz=timezone.utc)
class SlackLoadConnector(LoadConnector):
# WARNING: DEPRECATED, DO NOT USE
def __init__(
self,
workspace: str,
export_path_str: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.export_path_str = export_path_str
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
if credentials:
logger.warning("Unexpected credentials provided for Slack Load Connector")
return None
@staticmethod
def _process_batch_event(
slack_event: dict[str, Any],
channel: dict[str, Any],
matching_doc: Document | None,
workspace: str,
) -> Document | None:
if (
slack_event["type"] == "message"
and slack_event.get("subtype") != "channel_join"
):
if matching_doc:
return Document(
id=matching_doc.id,
sections=matching_doc.sections
+ [
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=matching_doc.source,
semantic_identifier=matching_doc.semantic_identifier,
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata=matching_doc.metadata,
)
return Document(
id=slack_event["ts"],
sections=[
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=DocumentSource.SLACK,
semantic_identifier=channel["name"],
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata={},
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
export_path = Path(self.export_path_str)
with open(export_path / "channels.json") as f:
all_channels = json.load(f)
filtered_channels = filter_channels(
all_channels, self.channels, self.channel_regex_enabled
)
document_batch: dict[str, Document] = {}
for channel_info in filtered_channels:
channel_dir_path = export_path / cast(str, channel_info["name"])
channel_file_paths = [
channel_dir_path / file_name
for file_name in os.listdir(channel_dir_path)
]
for path in channel_file_paths:
with open(path) as f:
events = cast(list[dict[str, Any]], json.load(f))
for slack_event in events:
doc = self._process_batch_event(
slack_event=slack_event,
channel=channel_info,
matching_doc=document_batch.get(
slack_event.get("thread_ts", "")
),
workspace=self.workspace,
)
if doc:
document_batch[doc.id] = doc
if len(document_batch) >= self.batch_size:
yield list(document_batch.values())
yield list(document_batch.values())

View File

@ -10,6 +10,7 @@ from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.web import SlackResponse
from danswer.connectors.models import BasicExpertInfo
from danswer.utils.logger import setup_logger
logger = setup_logger()
@ -104,6 +105,38 @@ def make_slack_api_rate_limited(
return rate_limited_call
def expert_info_from_slack_id(
user_id: str | None,
client: WebClient,
user_cache: dict[str, BasicExpertInfo | None],
) -> BasicExpertInfo | None:
if not user_id:
return None
if user_id in user_cache:
return user_cache[user_id]
response = make_slack_api_rate_limited(client.users_info)(user=user_id)
if not response["ok"]:
user_cache[user_id] = None
return None
user: dict = cast(dict[Any, dict], response.data).get("user", {})
profile = user.get("profile", {})
expert = BasicExpertInfo(
display_name=user.get("real_name") or profile.get("display_name"),
first_name=profile.get("first_name"),
last_name=profile.get("last_name"),
email=profile.get("email"),
)
user_cache[user_id] = expert
return expert
class SlackTextCleaner:
"""Utility class to replace user IDs with usernames in a message.
Handles caching, so the same request is not made multiple times

View File

@ -305,8 +305,13 @@ def fetch_groupids_from_names(
return list(group_ids), failed_to_find
def fetch_user_semantic_id_from_id(user_id: str, client: WebClient) -> str | None:
response = client.users_info(user=user_id)
def fetch_user_semantic_id_from_id(
user_id: str | None, client: WebClient
) -> str | None:
if not user_id:
return None
response = make_slack_api_rate_limited(client.users_info)(user=user_id)
if not response["ok"]:
return None