Zulip connector (#247)

Co-authored-by: Yuhong Sun <yuhongsun96@gmail.com>
This commit is contained in:
Michał Flak 2023-08-15 00:29:34 +02:00 committed by GitHub
parent 848e5653a9
commit 286445f9ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 555 additions and 4 deletions

View File

@ -30,6 +30,7 @@ class DocumentSource(str, Enum):
PRODUCTBOARD = "productboard"
FILE = "file"
NOTION = "notion"
ZULIP = "zulip"
class DanswerGenAIModel(str, Enum):

View File

@ -20,6 +20,7 @@ from danswer.connectors.slab.connector import SlabConnector
from danswer.connectors.slack.connector import SlackLoadConnector
from danswer.connectors.slack.connector import SlackPollConnector
from danswer.connectors.web.connector import WebConnector
from danswer.connectors.zulip.connector import ZulipConnector
_NUM_SECONDS_IN_DAY = 86400
@ -47,6 +48,7 @@ def identify_connector_class(
DocumentSource.PRODUCTBOARD: ProductboardConnector,
DocumentSource.SLAB: SlabConnector,
DocumentSource.NOTION: NotionConnector,
DocumentSource.ZULIP: ZulipConnector,
DocumentSource.GURU: GuruConnector,
}
connector_by_source = connector_map.get(source, {})

View File

@ -3,8 +3,6 @@ from enum import Enum
from typing import Any
from uuid import UUID
from pydantic import BaseModel
from danswer.configs.constants import DocumentSource

View File

@ -0,0 +1,140 @@
import os
import tempfile
from collections.abc import Generator
from typing import Any
from typing import List
from typing import Tuple
from zulip import Client
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import ConnectorMissingCredentialError
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.zulip.schemas import GetMessagesResponse
from danswer.connectors.zulip.schemas import Message
from danswer.connectors.zulip.utils import build_search_narrow
from danswer.connectors.zulip.utils import call_api
from danswer.connectors.zulip.utils import encode_zulip_narrow_operand
from danswer.utils.logger import setup_logger
# Potential improvements
# 1. Group documents messages into topics, make 1 document per topic per week
# 2. Add end date support once https://github.com/zulip/zulip/issues/25436 is solved
logger = setup_logger()
class ZulipConnector(LoadConnector, PollConnector):
def __init__(
self, realm_name: str, realm_url: str, batch_size: int = INDEX_BATCH_SIZE
) -> None:
self.batch_size = batch_size
self.realm_name = realm_name
self.realm_url = realm_url if realm_url.endswith("/") else realm_url + "/"
self.client: Client | None = None
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
contents = credentials["zuliprc_content"]
# The input field converts newlines to spaces in the provided
# zuliprc file. This reverts them back to newlines.
contents_spaces_to_newlines = contents.replace(" ", "\n")
# create a temporary zuliprc file
tempdir = tempfile.tempdir
if tempdir is None:
raise Exception("Could not determine tempfile directory")
config_file = os.path.join(tempdir, f"zuliprc-{self.realm_name}")
with open(config_file, "w") as f:
f.write(contents_spaces_to_newlines)
self.client = Client(config_file=config_file)
return None
def _message_to_narrow_link(self, m: Message) -> str:
stream_name = m.display_recipient # assume str
stream_operand = encode_zulip_narrow_operand(f"{m.stream_id}-{stream_name}")
topic_operand = encode_zulip_narrow_operand(m.subject)
narrow_link = f"{self.realm_url}#narrow/stream/{stream_operand}/topic/{topic_operand}/near/{m.id}"
return narrow_link
def _get_message_batch(self, anchor: str) -> Tuple[bool, List[Message]]:
if self.client is None:
raise ConnectorMissingCredentialError("Zulip")
logger.info(f"Fetching messages starting with anchor={anchor}")
request = build_search_narrow(
limit=INDEX_BATCH_SIZE, anchor=anchor, apply_md=False
)
response = GetMessagesResponse(**call_api(self.client.get_messages, request))
end = False
if len(response.messages) == 0 or response.found_oldest:
end = True
# reverse, so that the last message is the new anchor
# and the order is from newest to oldest
return end, response.messages[::-1]
def _message_to_doc(self, message: Message) -> Document:
text = f"{message.sender_full_name}: {message.content}"
return Document(
id=f"{message.stream_id}__{message.id}",
sections=[
Section(
link=self._message_to_narrow_link(message),
text=text,
)
],
source=DocumentSource.ZULIP,
semantic_identifier=message.display_recipient or message.subject,
metadata={},
)
def _get_docs(
self, anchor: str, start: SecondsSinceUnixEpoch | None = None
) -> Generator[Document, None, None]:
message: Message | None = None
while True:
end, message_batch = self._get_message_batch(anchor)
for message in message_batch:
if start is not None and float(message.timestamp) < start:
return
yield self._message_to_doc(message)
if end or message is None:
return
# Last message is oldest, use as next anchor
anchor = str(message.id)
def _poll_source(
self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
# Since Zulip doesn't support searching by timestamp,
# we have to always start from the newest message
# and go backwards.
anchor = "newest"
docs = []
for doc in self._get_docs(anchor=anchor, start=start):
docs.append(doc)
if len(docs) == self.batch_size:
yield docs
docs = []
if docs:
yield docs
def load_from_state(self) -> GenerateDocumentsOutput:
return self._poll_source(start=None, end=None)
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
return self._poll_source(start, end)

View File

@ -0,0 +1,42 @@
from typing import Any
from typing import List
from typing import Optional
from pydantic import BaseModel
class Message(BaseModel):
id: int
sender_id: int
content: str
recipient_id: int
timestamp: int
client: str
is_me_message: bool
sender_full_name: str
sender_email: str
sender_realm_str: str
subject: str
topic_links: Optional[List[Any]] = None
last_edit_timestamp: Optional[int] = None
edit_history: Any
reactions: List[Any]
submessages: List[Any]
flags: List[str] = []
display_recipient: Optional[str] = None
type: Optional[str] = None
stream_id: int
avatar_url: Optional[str]
content_type: Optional[str]
rendered_content: Optional[str] = None
class GetMessagesResponse(BaseModel):
result: str
msg: str
found_anchor: Optional[bool] = None
found_oldest: Optional[bool] = None
found_newest: Optional[bool] = None
history_limited: Optional[bool] = None
anchor: Optional[str] = None
messages: List[Message] = []

View File

@ -0,0 +1,102 @@
import time
from collections.abc import Callable
from typing import Any
from typing import Dict
from typing import Optional
from urllib.parse import quote
from danswer.utils.logger import setup_logger
logger = setup_logger()
class ZulipAPIError(Exception):
def __init__(self, code: Any = None, msg: str | None = None) -> None:
self.code = code
self.msg = msg
def __str__(self) -> str:
return (
f"Error occurred during Zulip API call: {self.msg}" + ""
if self.code is None
else f" ({self.code})"
)
class ZulipHTTPError(ZulipAPIError):
def __init__(self, msg: str | None = None, status_code: Any = None) -> None:
super().__init__(code=None, msg=msg)
self.status_code = status_code
def __str__(self) -> str:
return f"HTTP error {self.status_code} occurred during Zulip API call"
def __call_with_retry(fun: Callable, *args: Any, **kwargs: Any) -> Dict[str, Any]:
result = fun(*args, **kwargs)
if result.get("result") == "error":
if result.get("code") == "RATE_LIMIT_HIT":
retry_after = float(result["retry-after"]) + 1
logger.warn(f"Rate limit hit, retrying after {retry_after} seconds")
time.sleep(retry_after)
return __call_with_retry(fun, *args)
return result
def __raise_if_error(response: dict[str, Any]) -> None:
if response.get("result") == "error":
raise ZulipAPIError(
code=response.get("code"),
msg=response.get("msg"),
)
elif response.get("result") == "http-error":
raise ZulipHTTPError(
msg=response.get("msg"), status_code=response.get("status_code")
)
def call_api(fun: Callable, *args: Any, **kwargs: Any) -> Dict[str, Any]:
response = __call_with_retry(fun, *args, **kwargs)
__raise_if_error(response)
return response
def build_search_narrow(
*,
stream: Optional[str] = None,
topic: Optional[str] = None,
limit: int = 100,
content: Optional[str] = None,
apply_md: bool = False,
anchor: str = "newest",
) -> Dict[str, Any]:
narrow_filters = []
if stream:
narrow_filters.append({"operator": "stream", "operand": stream})
if topic:
narrow_filters.append({"operator": "topic", "operand": topic})
if content:
narrow_filters.append({"operator": "has", "operand": content})
if not stream and not topic and not content:
narrow_filters.append({"operator": "streams", "operand": "public"})
narrow = {
"anchor": anchor,
"num_before": limit,
"num_after": 0,
"narrow": narrow_filters,
}
narrow["apply_markdown"] = apply_md
return narrow
def encode_zulip_narrow_operand(value: str) -> str:
# like https://github.com/zulip/zulip/blob/1577662a6/static/js/hash_util.js#L18-L25
# safe characters necessary to make Python match Javascript's escaping behaviour,
# see: https://stackoverflow.com/a/74439601
return quote(value, safe="!~*'()").replace(".", "%2E").replace("%", ".")

View File

@ -44,4 +44,4 @@ tiktoken==0.4.0
transformers==4.30.1
typesense==0.15.1
uvicorn==0.21.1
zulip==0.8.2

View File

@ -0,0 +1,222 @@
"use client";
import * as Yup from "yup";
import { ZulipIcon, TrashIcon } from "@/components/icons/icons";
import { fetcher } from "@/lib/fetcher";
import useSWR, { useSWRConfig } from "swr";
import { LoadingAnimation } from "@/components/Loading";
import { HealthCheckBanner } from "@/components/health/healthcheck";
import {
ZulipConfig,
Credential,
ZulipCredentialJson,
ConnectorIndexingStatus,
} from "@/lib/types";
import { deleteCredential, linkCredential } from "@/lib/credential";
import { CredentialForm } from "@/components/admin/connectors/CredentialForm";
import { TextFormField } from "@/components/admin/connectors/Field";
import { ConnectorsTable } from "@/components/admin/connectors/table/ConnectorsTable";
import { ConnectorForm } from "@/components/admin/connectors/ConnectorForm";
const MainSection = () => {
const { mutate } = useSWRConfig();
const {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
const {
data: credentialsData,
isLoading: isCredentialsLoading,
error: isCredentialsError,
} = useSWR<Credential<ZulipCredentialJson>[]>(
"/api/manage/credential",
fetcher
);
if (
(!connectorIndexingStatuses && isConnectorIndexingStatusesLoading) ||
(!credentialsData && isCredentialsLoading)
) {
return <LoadingAnimation text="Loading" />;
}
if (isConnectorIndexingStatusesError || !connectorIndexingStatuses) {
return <div>Failed to load connectors</div>;
}
if (isCredentialsError || !credentialsData) {
return <div>Failed to load credentials</div>;
}
const zulipConnectorIndexingStatuses: ConnectorIndexingStatus<ZulipConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "zulip"
);
const zulipCredential = credentialsData.filter(
(credential) => credential.credential_json?.zuliprc_content
)[0];
return (
<>
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">
Step 1: Provide Credentials
</h2>
{zulipCredential ? (
<>
<div className="flex mb-1 text-sm">
<p className="my-auto">Existing zuliprc file content: </p>
<p className="ml-1 italic my-auto">
{zulipCredential.credential_json.zuliprc_content}
</p>{" "}
<button
className="ml-1 hover:bg-gray-700 rounded-full p-1"
onClick={async () => {
await deleteCredential(zulipCredential.id);
mutate("/api/manage/credential");
}}
>
<TrashIcon />
</button>
</div>
</>
) : (
<>
<p className="text-sm mb-4">
To use the Zulip connector, you must first provide content of the
zuliprc config file. For more details on setting up the Danswer
Zulip connector, see the{" "}
<a
className="text-blue-500"
href="https://docs.danswer.dev/connectors/zulip#setting-up"
>
docs
</a>
.
</p>
<div className="border-solid border-gray-600 border rounded-md p-6 mt-2">
<CredentialForm<ZulipCredentialJson>
formBody={
<>
<TextFormField
name="zuliprc_content"
label="Content of the zuliprc file:"
type="text"
/>
</>
}
validationSchema={Yup.object().shape({
zuliprc_content: Yup.string().required(
"Please enter content of the zuliprc file"
),
})}
initialValues={{
zuliprc_content: "",
}}
onSubmit={(isSuccess) => {
if (isSuccess) {
mutate("/api/manage/credential");
}
}}
/>
</div>
</>
)}
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">
Step 2: Which workspaces do you want to make searchable?
</h2>
{zulipConnectorIndexingStatuses.length > 0 && (
<>
<p className="text-sm mb-2">
We pull the latest messages from each workspace listed below every{" "}
<b>10</b> minutes.
</p>
<div className="mb-2">
<ConnectorsTable
connectorIndexingStatuses={zulipConnectorIndexingStatuses}
liveCredential={zulipCredential}
getCredential={(credential) =>
credential.credential_json.zuliprc_content
}
specialColumns={[
{
header: "Realm name",
key: "realm_name",
getValue: (connector) =>
connector.connector_specific_config.realm_name,
},
{
header: "Realm url",
key: "realm_url",
getValue: (connector) =>
connector.connector_specific_config.realm_url,
},
]}
onUpdate={() =>
mutate("/api/manage/admin/connector/indexing-status")
}
onCredentialLink={async (connectorId) => {
if (Credential) {
await linkCredential(connectorId, zulipCredential.id);
mutate("/api/manage/admin/connector/indexing-status");
}
}}
/>
</div>
</>
)}
<div className="border-solid border-gray-600 border rounded-md p-6 mt-4">
<h2 className="font-bold mb-3">Connect to a New Realm</h2>
<ConnectorForm<ZulipConfig>
nameBuilder={(values) => `ZulipConnector-${values.realm_name}`}
source="zulip"
inputType="poll"
formBody={
<>
<TextFormField name="realm_name" label="Realm name:" />
<TextFormField name="realm_url" label="Realm url:" />
</>
}
validationSchema={Yup.object().shape({
realm_name: Yup.string().required("Please enter the realm name"),
realm_url: Yup.string().required("Please enter the realm url"),
})}
initialValues={{
realm_name: "",
realm_url: "",
}}
refreshFreq={10 * 60} // 10 minutes
onSubmit={async (isSuccess, responseJson) => {
if (isSuccess && responseJson) {
await linkCredential(responseJson.id, zulipCredential.id);
mutate("/api/manage/admin/connector/indexing-status");
}
}}
/>
</div>
</>
);
};
export default function Page() {
return (
<div className="mx-auto container">
<div className="mb-4">
<HealthCheckBanner />
</div>
<div className="border-solid border-gray-600 border-b mb-4 pb-2 flex">
<ZulipIcon size={32} />
<h1 className="text-3xl font-bold pl-2">Zulip</h1>
</div>
<MainSection />
</div>
);
}

View File

@ -23,6 +23,7 @@ import {
JiraConfig,
SlackConfig,
WebConfig,
ZulipConfig,
} from "@/lib/types";
import { useState } from "react";
import { getDocsProcessedPerMinute } from "@/lib/indexAttempt";
@ -87,6 +88,13 @@ const ConnectorTitle = ({ connectorIndexingStatus }: ConnectorTitleProps) => {
);
}
}
else if (connector.source === "zulip") {
const typedConnector = connector as Connector<ZulipConfig>;
additionalMetadata.set(
"Realm",
typedConnector.connector_specific_config.realm_name
);
}
return (
<>

View File

@ -14,6 +14,7 @@ import {
JiraIcon,
SlabIcon,
NotionIcon,
ZulipIcon,
ProductboardIcon,
} from "@/components/icons/icons";
import { DISABLE_AUTH } from "@/lib/constants";
@ -168,6 +169,15 @@ export default async function AdminLayout({
),
link: "/admin/connectors/file",
},
{
name: (
<div className="flex">
<ZulipIcon size={16} />
<div className="ml-1">Zulip</div>
</div>
),
link: "/admin/connectors/zulip",
},
],
},
{

View File

@ -14,7 +14,7 @@ import {
X,
Question,
} from "@phosphor-icons/react";
import { SiBookstack } from "react-icons/si";
import { SiBookstack, SiZulip } from "react-icons/si";
import { FaFile, FaGlobe } from "react-icons/fa";
import Image from "next/image";
import jiraSVG from "../../../public/Jira.svg";
@ -112,6 +112,13 @@ export const BrainIcon = ({
return <Brain size={size} className={className} />;
};
export const ZulipIcon = ({
size = 16,
className = defaultTailwindCSS,
}: IconProps) => {
return <SiZulip size={size} className={className} />;
};
export const PencilIcon = ({
size = 16,
className = defaultTailwindCSS,

View File

@ -17,6 +17,7 @@ const sources: Source[] = [
{ displayName: "Guru", internalName: "guru" },
{ displayName: "File", internalName: "file" },
{ displayName: "Notion", internalName: "notion" },
{ displayName: "Zulip", internalName: "zulip" },
];
interface SourceSelectorProps {

View File

@ -12,6 +12,7 @@ import {
ProductboardIcon,
SlabIcon,
SlackIcon,
ZulipIcon,
} from "./icons/icons";
interface SourceMetadata {
@ -88,6 +89,12 @@ export const getSourceMetadata = (sourceType: ValidSources): SourceMetadata => {
displayName: "Notion",
adminPageLink: "/admin/connectors/notion",
};
case "zulip":
return {
icon: ZulipIcon,
displayName: "Zulip",
adminPageLink: "/admin/connectors/zulip",
};
case "guru":
return {
icon: GuruIcon,

View File

@ -19,6 +19,7 @@ export type ValidSources =
| "slab"
| "notion"
| "guru"
| "zulip"
| "file";
export type ValidInputTypes = "load_state" | "poll" | "event";
export type ValidStatuses =
@ -85,6 +86,11 @@ export interface FileConfig {
file_locations: string[];
}
export interface ZulipConfig {
realm_name: string;
realm_url: string;
}
export interface NotionConfig {}
export interface IndexAttemptSnapshot {
@ -164,6 +170,11 @@ export interface SlabCredentialJson {
export interface NotionCredentialJson {
notion_integration_token: string;
}
export interface ZulipCredentialJson {
zuliprc_content: string;
}
export interface GuruCredentialJson {
guru_user: string;
guru_user_token: string;