2025-02-15 16:35:15 -08:00

217 lines
8.2 KiB
Python

import os
import tempfile
import urllib.parse
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
from zulip import Client
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.connectors.zulip.schemas import GetMessagesResponse
from onyx.connectors.zulip.schemas import Message
from onyx.connectors.zulip.utils import build_search_narrow
from onyx.connectors.zulip.utils import call_api
from onyx.connectors.zulip.utils import encode_zulip_narrow_operand
from onyx.utils.logger import setup_logger
# Potential improvements
# 1. Group documents messages into topics, make 1 document per topic per week
# 2. Add end date support once https://github.com/zulip/zulip/issues/25436 is solved
logger = setup_logger()
class ZulipConnector(LoadConnector, PollConnector):
def __init__(
self, realm_name: str, realm_url: str, batch_size: int = INDEX_BATCH_SIZE
) -> None:
self.batch_size = batch_size
self.realm_name = realm_name
# Clean and normalize the URL
realm_url = realm_url.strip().lower()
# Remove any trailing slashes
realm_url = realm_url.rstrip("/")
# Ensure the URL has a scheme
if not realm_url.startswith(("http://", "https://")):
realm_url = f"https://{realm_url}"
try:
parsed = urllib.parse.urlparse(realm_url)
# Extract the base domain without any paths or ports
netloc = parsed.netloc.split(":")[0] # Remove port if present
if not netloc:
raise ValueError(
f"Invalid realm URL format: {realm_url}. "
f"URL must include a valid domain name."
)
# Always use HTTPS for security
self.base_url = f"https://{netloc}"
self.client: Client | None = None
except Exception as e:
raise ValueError(
f"Failed to parse Zulip realm URL: {realm_url}. "
f"Please provide a URL in the format: domain.com or https://domain.com. "
f"Error: {str(e)}"
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
contents = credentials["zuliprc_content"]
# The input field converts newlines to spaces in the provided
# zuliprc file. This reverts them back to newlines.
contents_spaces_to_newlines = contents.replace(" ", "\n")
# create a temporary zuliprc file
tempdir = tempfile.tempdir
if tempdir is None:
raise Exception("Could not determine tempfile directory")
config_file = os.path.join(tempdir, f"zuliprc-{self.realm_name}")
with open(config_file, "w") as f:
f.write(contents_spaces_to_newlines)
self.client = Client(config_file=config_file)
return None
def _message_to_narrow_link(self, m: Message) -> str:
try:
stream_name = m.display_recipient # assume str
stream_operand = encode_zulip_narrow_operand(f"{m.stream_id}-{stream_name}")
topic_operand = encode_zulip_narrow_operand(m.subject)
narrow_link = f"{self.base_url}#narrow/stream/{stream_operand}/topic/{topic_operand}/near/{m.id}"
return narrow_link
except Exception as e:
logger.error(f"Error generating Zulip message link: {e}")
# Fallback to a basic link that at least includes the base URL
return f"{self.base_url}#narrow/id/{m.id}"
def _get_message_batch(self, anchor: str) -> Tuple[bool, List[Message]]:
if self.client is None:
raise ConnectorMissingCredentialError("Zulip")
logger.info(f"Fetching messages starting with anchor={anchor}")
request = build_search_narrow(
limit=INDEX_BATCH_SIZE, anchor=anchor, apply_md=False
)
response = GetMessagesResponse(**call_api(self.client.get_messages, request))
end = False
if len(response.messages) == 0 or response.found_oldest:
end = True
# reverse, so that the last message is the new anchor
# and the order is from newest to oldest
return end, response.messages[::-1]
def _message_to_doc(self, message: Message) -> Document:
text = f"{message.sender_full_name}: {message.content}"
try:
# Convert timestamps to UTC datetime objects
post_time = datetime.fromtimestamp(message.timestamp, tz=timezone.utc)
edit_time = (
datetime.fromtimestamp(message.last_edit_timestamp, tz=timezone.utc)
if message.last_edit_timestamp is not None
else None
)
# Use the most recent edit time if available, otherwise use post time
doc_time = edit_time if edit_time is not None else post_time
except (ValueError, TypeError) as e:
logger.warning(f"Failed to parse timestamp for message {message.id}: {e}")
post_time = None
edit_time = None
doc_time = None
metadata: Dict[str, Union[str, List[str]]] = {
"stream_name": str(message.display_recipient),
"topic": str(message.subject),
"sender_name": str(message.sender_full_name),
"sender_email": str(message.sender_email),
"message_timestamp": str(message.timestamp),
"message_id": str(message.id),
"stream_id": str(message.stream_id),
"has_reactions": str(len(message.reactions) > 0),
"content_type": str(message.content_type or "text"),
}
# Always include edit timestamp in metadata when available
if edit_time is not None:
metadata["edit_timestamp"] = str(message.last_edit_timestamp)
return Document(
id=f"{message.stream_id}__{message.id}",
sections=[
Section(
link=self._message_to_narrow_link(message),
text=text,
)
],
source=DocumentSource.ZULIP,
semantic_identifier=f"{message.display_recipient} > {message.subject}",
metadata=metadata,
doc_updated_at=doc_time, # Use most recent edit time or post time
)
def _get_docs(
self, anchor: str, start: SecondsSinceUnixEpoch | None = None
) -> Generator[Document, None, None]:
message: Message | None = None
while True:
end, message_batch = self._get_message_batch(anchor)
for message in message_batch:
if start is not None and float(message.timestamp) < start:
return
yield self._message_to_doc(message)
if end or message is None:
return
# Last message is oldest, use as next anchor
anchor = str(message.id)
def _poll_source(
self, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
# Since Zulip doesn't support searching by timestamp,
# we have to always start from the newest message
# and go backwards.
anchor = "newest"
docs = []
for doc in self._get_docs(anchor=anchor, start=start):
docs.append(doc)
if len(docs) == self.batch_size:
yield docs
docs = []
if docs:
yield docs
def load_from_state(self) -> GenerateDocumentsOutput:
return self._poll_source(start=None, end=None)
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
return self._poll_source(start, end)