From 7277a3db0745386867da8fb67b14e3dcb8384259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Wed, 4 Mar 2026 17:29:40 +0100 Subject: [PATCH] feat: improve event-log reliability, add ERROR type and per-relay timing Service improvements: - Fix notice$ duplicate logging with per-relay dedup tracking - Remove dead Array.isArray code path (notice$ emits strings) - Increase relay poll interval from 1s to 5s - Clean publishIdToEntryId map on terminal state, not just overflow - Immutable entry updates (spread instead of in-place mutation) - Extract NewEntry/AddEntryInput helper types for clean addEntry signature - Clear lastNoticePerRelay on log clear New capabilities: - ERROR log type: subscribes to relay.error$ for connection failure reasons - RelayStatusEntry with updatedAt timestamp for per-relay response timing Co-Authored-By: Claude Opus 4.6 --- src/services/event-log.ts | 164 ++++++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 49 deletions(-) diff --git a/src/services/event-log.ts b/src/services/event-log.ts index c88103b..ac280dd 100644 --- a/src/services/event-log.ts +++ b/src/services/event-log.ts @@ -2,8 +2,9 @@ * Event Log Service * * Provides an ephemeral log of relay operations for introspection: - * - PUBLISH events with per-relay status + * - PUBLISH events with per-relay status and timing * - CONNECT/DISCONNECT events + * - ERROR events for connection failures * - AUTH events * - NOTICE events * @@ -30,9 +31,18 @@ export type EventLogType = | "PUBLISH" | "CONNECT" | "DISCONNECT" + | "ERROR" | "AUTH" | "NOTICE"; +/** Per-relay status with timing */ +export interface RelayStatusEntry { + status: string; + error?: string; + /** Timestamp of the last status transition */ + updatedAt: number; +} + /** Base interface for all log entries */ interface BaseLogEntry { /** Unique ID for this log entry */ @@ -52,8 +62,8 @@ export interface PublishLogEntry extends BaseLogEntry { event: NostrEvent; /** Target relays */ relays: string[]; - /** Per-relay status */ - relayStatus: Map; + /** Per-relay status with timing */ + relayStatus: Map; /** Overall status: pending, partial, success, failed */ status: "pending" | "partial" | "success" | "failed"; /** Publish ID from PublishService */ @@ -66,6 +76,14 @@ export interface ConnectLogEntry extends BaseLogEntry { relay: string; } +/** Connection error log entry */ +export interface ErrorLogEntry extends BaseLogEntry { + type: "ERROR"; + relay: string; + /** Error message */ + message: string; +} + /** Auth event log entry */ export interface AuthLogEntry extends BaseLogEntry { type: "AUTH"; @@ -88,13 +106,30 @@ export interface NoticeLogEntry extends BaseLogEntry { export type LogEntry = | PublishLogEntry | ConnectLogEntry + | ErrorLogEntry | AuthLogEntry | NoticeLogEntry; +/** Helper type for creating new entries (id/timestamp auto-generated) */ +type NewEntry = Omit & { + id?: string; + timestamp?: number; +}; + +type AddEntryInput = + | NewEntry + | NewEntry + | NewEntry + | NewEntry + | NewEntry; + // ============================================================================ // EventLogService Class // ============================================================================ +/** Interval for polling new relays (ms) */ +const RELAY_POLL_INTERVAL = 5000; + class EventLogService { /** Maximum number of entries to keep in the log */ private maxEntries: number; @@ -120,6 +155,9 @@ class EventLogService { /** Map of publish IDs to log entry IDs */ private publishIdToEntryId = new Map(); + /** Track last seen notice per relay to prevent duplicates */ + private lastNoticePerRelay = new Map(); + /** Polling interval for new relays */ private pollingIntervalId?: NodeJS.Timeout; @@ -163,14 +201,14 @@ class EventLogService { // Monitor existing relays pool.relays.forEach((relay) => this.monitorRelay(relay)); - // Poll for new relays + // Poll for new relays (infrequent — new relays don't appear often) this.pollingIntervalId = setInterval(() => { pool.relays.forEach((relay) => { if (!this.relaySubscriptions.has(relay.url)) { this.monitorRelay(relay); } }); - }, 1000); + }, RELAY_POLL_INTERVAL); } /** @@ -194,7 +232,7 @@ class EventLogService { // -------------------------------------------------------------------------- /** - * Monitor a relay for connection, auth, and notice events + * Monitor a relay for connection, error, auth, and notice events */ private monitorRelay(relay: IRelay): void { const url = relay.url; @@ -219,6 +257,19 @@ class EventLogService { }), ); + // Track connection errors + subscription.add( + relay.error$ + .pipe(filter((error): error is Error => error !== null)) + .subscribe((error) => { + this.addEntry({ + type: "ERROR", + relay: url, + message: error.message || "Unknown connection error", + }); + }), + ); + // Track authentication events subscription.add( relay.authenticated$ @@ -250,22 +301,19 @@ class EventLogService { }), ); - // Track notices + // Track notices — deduplicate per relay subscription.add( - relay.notice$.subscribe((notices) => { - // notices can be a single string or array - const noticeArray = Array.isArray(notices) - ? notices - : notices - ? [notices] - : []; - // Only log new notices (last one) - if (noticeArray.length > 0) { - const latestNotice = noticeArray[noticeArray.length - 1]; + relay.notice$.subscribe((notice) => { + if ( + typeof notice === "string" && + notice && + notice !== this.lastNoticePerRelay.get(url) + ) { + this.lastNoticePerRelay.set(url, notice); this.addEntry({ type: "NOTICE", relay: url, - message: latestNotice, + message: notice, }); } }), @@ -285,30 +333,48 @@ class EventLogService { // Check if we already have an entry for this publish (avoid duplicates) const existingEntryId = this.publishIdToEntryId.get(event.id); if (existingEntryId) { - // Update existing entry instead of creating a new one + // Update existing entry immutably const entryIndex = this.entries.findIndex( (e) => e.id === existingEntryId && e.type === "PUBLISH", ); if (entryIndex !== -1) { const entry = this.entries[entryIndex] as PublishLogEntry; - entry.relayStatus = new Map(event.results); - entry.status = this.calculatePublishStatus(event.results); + const newRelayStatus = new Map(); + // Preserve timing from existing entries, add timing for new ones + for (const [relay, status] of event.results) { + const existing = entry.relayStatus.get(relay); + newRelayStatus.set(relay, { + ...status, + updatedAt: existing?.updatedAt ?? Date.now(), + }); + } + this.entries[entryIndex] = { + ...entry, + relayStatus: newRelayStatus, + status: this.calculatePublishStatus(newRelayStatus), + }; this.entriesSubject.next([...this.entries]); } return; } const entryId = this.generateId(); + const now = Date.now(); + + // Create initial publish entry with timing + const relayStatus = new Map(); + for (const [relay, status] of event.results) { + relayStatus.set(relay, { ...status, updatedAt: now }); + } - // Create initial publish entry const entry: PublishLogEntry = { id: entryId, type: "PUBLISH", timestamp: event.startedAt, event: event.event, relays: event.relays, - relayStatus: new Map(event.results), - status: this.calculatePublishStatus(event.results), + relayStatus, + status: this.calculatePublishStatus(relayStatus), publishId: event.id, }; @@ -325,7 +391,7 @@ class EventLogService { const entryId = this.publishIdToEntryId.get(update.publishId); if (!entryId) return; - // Find and update the publish entry + // Find the publish entry const entryIndex = this.entries.findIndex( (e) => e.id === entryId && e.type === "PUBLISH", ); @@ -333,14 +399,31 @@ class EventLogService { const entry = this.entries[entryIndex] as PublishLogEntry; - // Update relay status - entry.relayStatus.set(update.relay, { + // Update immutably with timing + const newRelayStatus = new Map(entry.relayStatus); + newRelayStatus.set(update.relay, { status: update.status, error: update.error, + updatedAt: update.timestamp, }); - // Recalculate overall status - entry.status = this.calculatePublishStatus(entry.relayStatus); + const newStatus = this.calculatePublishStatus(newRelayStatus); + + this.entries[entryIndex] = { + ...entry, + relayStatus: newRelayStatus, + status: newStatus, + }; + + // Clean up publish ID mapping when publish reaches terminal state + if (newStatus !== "pending") { + const allTerminal = Array.from(newRelayStatus.values()).every( + (r) => r.status === "success" || r.status === "error", + ); + if (allTerminal) { + this.publishIdToEntryId.delete(update.publishId); + } + } // Notify subscribers this.entriesSubject.next([...this.entries]); @@ -350,7 +433,7 @@ class EventLogService { * Calculate overall publish status from relay results */ private calculatePublishStatus( - results: Map, + results: Map, ): "pending" | "partial" | "success" | "failed" { const statuses = Array.from(results.values()).map((r) => r.status); @@ -387,25 +470,7 @@ class EventLogService { * Add an entry to the log * Accepts partial entries without id/timestamp (they will be generated) */ - private addEntry( - entry: - | (Omit & { - id?: string; - timestamp?: number; - }) - | (Omit & { - id?: string; - timestamp?: number; - }) - | (Omit & { - id?: string; - timestamp?: number; - }) - | (Omit & { - id?: string; - timestamp?: number; - }), - ): void { + private addEntry(entry: AddEntryInput): void { const fullEntry = { id: entry.id || this.generateId(), timestamp: entry.timestamp || Date.now(), @@ -448,6 +513,7 @@ class EventLogService { clear(): void { this.entries = []; this.publishIdToEntryId.clear(); + this.lastNoticePerRelay.clear(); this.entriesSubject.next([]); }