From f371efc9165eb14d4d9e3d2098293d8a8eb39a82 Mon Sep 17 00:00:00 2001 From: Adam Siemiginowski <1068543+ATSiem@users.noreply.github.com> Date: Sat, 15 Feb 2025 14:49:41 -0500 Subject: [PATCH] Fix Zulip connector schema + links and enable temporal metadata (#4005) --- backend/onyx/connectors/zulip/connector.py | 92 +++++++++++++++++++--- backend/onyx/connectors/zulip/schemas.py | 5 +- 2 files changed, 86 insertions(+), 11 deletions(-) diff --git a/backend/onyx/connectors/zulip/connector.py b/backend/onyx/connectors/zulip/connector.py index 8c586fc64..48eccdc25 100644 --- a/backend/onyx/connectors/zulip/connector.py +++ b/backend/onyx/connectors/zulip/connector.py @@ -1,9 +1,12 @@ import os import tempfile +import urllib.parse from collections.abc import Generator from typing import Any from typing import List from typing import Tuple +from typing import Dict, Union +from datetime import datetime, timezone from zulip import Client @@ -36,8 +39,39 @@ class ZulipConnector(LoadConnector, PollConnector): ) -> None: self.batch_size = batch_size self.realm_name = realm_name - self.realm_url = realm_url if realm_url.endswith("/") else realm_url + "/" - self.client: Client | None = None + + # Clean and normalize the URL + realm_url = realm_url.strip().lower() + + # Remove any trailing slashes + realm_url = realm_url.rstrip('/') + + # Ensure the URL has a scheme + if not realm_url.startswith(('http://', 'https://')): + realm_url = f'https://{realm_url}' + + try: + parsed = urllib.parse.urlparse(realm_url) + + # Extract the base domain without any paths or ports + netloc = parsed.netloc.split(':')[0] # Remove port if present + + if not netloc: + raise ValueError( + f"Invalid realm URL format: {realm_url}. " + f"URL must include a valid domain name." + ) + + # Always use HTTPS for security + self.base_url = f"https://{netloc}" + self.client: Client | None = None + + except Exception as e: + raise ValueError( + f"Failed to parse Zulip realm URL: {realm_url}. " + f"Please provide a URL in the format: domain.com or https://domain.com. " + f"Error: {str(e)}" + ) def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: contents = credentials["zuliprc_content"] @@ -55,12 +89,17 @@ class ZulipConnector(LoadConnector, PollConnector): return None def _message_to_narrow_link(self, m: Message) -> str: - stream_name = m.display_recipient # assume str - stream_operand = encode_zulip_narrow_operand(f"{m.stream_id}-{stream_name}") - topic_operand = encode_zulip_narrow_operand(m.subject) + try: + stream_name = m.display_recipient # assume str + stream_operand = encode_zulip_narrow_operand(f"{m.stream_id}-{stream_name}") + topic_operand = encode_zulip_narrow_operand(m.subject) - narrow_link = f"{self.realm_url}#narrow/stream/{stream_operand}/topic/{topic_operand}/near/{m.id}" - return narrow_link + narrow_link = f"{self.base_url}#narrow/stream/{stream_operand}/topic/{topic_operand}/near/{m.id}" + return narrow_link + except Exception as e: + logger.error(f"Error generating Zulip message link: {e}") + # Fallback to a basic link that at least includes the base URL + return f"{self.base_url}#narrow/id/{m.id}" def _get_message_batch(self, anchor: str) -> Tuple[bool, List[Message]]: if self.client is None: @@ -83,6 +122,40 @@ class ZulipConnector(LoadConnector, PollConnector): def _message_to_doc(self, message: Message) -> Document: text = f"{message.sender_full_name}: {message.content}" + try: + # Convert timestamps to UTC datetime objects + post_time = datetime.fromtimestamp(message.timestamp, tz=timezone.utc) + edit_time = ( + datetime.fromtimestamp(message.last_edit_timestamp, tz=timezone.utc) + if message.last_edit_timestamp is not None + else None + ) + + # Use the most recent edit time if available, otherwise use post time + doc_time = edit_time if edit_time is not None else post_time + + except (ValueError, TypeError) as e: + logger.warning(f"Failed to parse timestamp for message {message.id}: {e}") + post_time = None + edit_time = None + doc_time = None + + metadata: Dict[str, Union[str, List[str]]] = { + "stream_name": str(message.display_recipient), + "topic": str(message.subject), + "sender_name": str(message.sender_full_name), + "sender_email": str(message.sender_email), + "message_timestamp": str(message.timestamp), + "message_id": str(message.id), + "stream_id": str(message.stream_id), + "has_reactions": str(len(message.reactions) > 0), + "content_type": str(message.content_type or "text"), + } + + # Always include edit timestamp in metadata when available + if edit_time is not None: + metadata["edit_timestamp"] = str(message.last_edit_timestamp) + return Document( id=f"{message.stream_id}__{message.id}", sections=[ @@ -92,8 +165,9 @@ class ZulipConnector(LoadConnector, PollConnector): ) ], source=DocumentSource.ZULIP, - semantic_identifier=message.display_recipient or message.subject, - metadata={}, + semantic_identifier=f"{message.display_recipient} > {message.subject}", + metadata=metadata, + doc_updated_at=doc_time, # Use most recent edit time or post time ) def _get_docs( diff --git a/backend/onyx/connectors/zulip/schemas.py b/backend/onyx/connectors/zulip/schemas.py index 385272cb4..e999aace5 100644 --- a/backend/onyx/connectors/zulip/schemas.py +++ b/backend/onyx/connectors/zulip/schemas.py @@ -1,6 +1,7 @@ from typing import Any from typing import List from typing import Optional +from typing import Union from pydantic import BaseModel from pydantic import Field @@ -19,7 +20,7 @@ class Message(BaseModel): sender_realm_str: str subject: str topic_links: Optional[List[Any]] = None - last_edit_timestamp: Optional[int] + last_edit_timestamp: Optional[int] = None edit_history: Any = None reactions: List[Any] submessages: List[Any] @@ -39,5 +40,5 @@ class GetMessagesResponse(BaseModel): found_oldest: Optional[bool] = None found_newest: Optional[bool] = None history_limited: Optional[bool] = None - anchor: Optional[str] = None + anchor: Optional[Union[str, int]] = None messages: List[Message] = Field(default_factory=list)