diff --git a/src/hooks/useReqTimelineEnhanced.ts b/src/hooks/useReqTimelineEnhanced.ts index c6fe80e..9da1414 100644 --- a/src/hooks/useReqTimelineEnhanced.ts +++ b/src/hooks/useReqTimelineEnhanced.ts @@ -64,6 +64,12 @@ export function useReqTimelineEnhanced( new Map(), ); const queryStartedAt = useRef(Date.now()); + const eoseReceivedRef = useRef(false); + + // Keep ref in sync with state + useEffect(() => { + eoseReceivedRef.current = eoseReceived; + }, [eoseReceived]); // Get global relay connection states from RelayStateManager const { relays: globalRelayStates } = useRelayState(); @@ -179,114 +185,137 @@ export function useReqTimelineEnhanced( limit: limit || f.limit, })); - const observable = pool.subscription(relays, filtersWithLimit, { - retries: 5, - reconnect: 5, - resubscribe: true, - eventStore, - }); + // CRITICAL FIX: Subscribe to each relay INDIVIDUALLY to get per-relay EOSE + // Previously used pool.subscription() which only emits EOSE when ALL relays finish + // Now we track each relay separately for accurate per-relay EOSE detection + const subscriptions = relays.map((url) => { + const relay = pool.relay(url); - const subscription = observable.subscribe( - (response) => { - // Response can be an event or 'EOSE' string - if (typeof response === "string") { - console.log("REQ Enhanced: EOSE received"); - setEoseReceived(true); - if (!stream) { - setLoading(false); - } + return relay + .subscription(filtersWithLimit, { + retries: 5, + reconnect: 5, + resubscribe: true, + }) + .subscribe( + (response) => { + // Response can be an event or 'EOSE' string + if (typeof response === "string" && response === "EOSE") { + console.log("REQ Enhanced: EOSE received from", url); - // Mark all connected relays as having received EOSE - // Note: We can't tell which specific relay sent EOSE due to - // applesauce-relay's catchError bug that converts errors to EOSE. - // We mark all connected relays as a best-effort approximation. - setRelayStates((prev) => { - const next = new Map(prev); - let changed = false; + // Mark THIS specific relay as having received EOSE + setRelayStates((prev) => { + const state = prev.get(url); + if (!state || state.subscriptionState === "eose") { + return prev; // No change needed + } - for (const [url, state] of prev) { - if ( - state.connectionState === "connected" && - state.subscriptionState !== "eose" - ) { + const next = new Map(prev); next.set(url, { ...state, subscriptionState: "eose", eoseAt: Date.now(), }); - changed = true; - } - } - return changed ? next : prev; - }); - } else if (isNostrEvent(response)) { - // Event received - store and track per relay - const event = response as NostrEvent & { _relay?: string }; - const relayUrl = event._relay; - - // Store in EventStore and local map - eventStore.add(event); - setEventsMap((prev) => { - const next = new Map(prev); - next.set(event.id, event); - return next; - }); - - // Update relay state for this specific relay - if (relayUrl) { - setRelayStates((prev) => { - const state = prev.get(relayUrl); - const now = Date.now(); - const next = new Map(prev); - - if (!state) { - // Relay not in map - initialize it (defensive) - console.warn( - "REQ Enhanced: Event from unknown relay, initializing", - relayUrl, + // Check if ALL relays have reached EOSE + const allEose = Array.from(next.values()).every( + (s) => + s.subscriptionState === "eose" || + s.connectionState === "error" || + s.connectionState === "disconnected", ); - next.set(relayUrl, { - url: relayUrl, - connectionState: "connected", - subscriptionState: "receiving", - eventCount: 1, - firstEventAt: now, - lastEventAt: now, - }); - } else { - // Update existing relay state - next.set(relayUrl, { - ...state, - subscriptionState: "receiving", - eventCount: state.eventCount + 1, - firstEventAt: state.firstEventAt ?? now, - lastEventAt: now, - }); - } + if (allEose && !eoseReceivedRef.current) { + console.log("REQ Enhanced: All relays finished"); + setEoseReceived(true); + if (!stream) { + setLoading(false); + } + } + + return next; + }); + } else if (isNostrEvent(response)) { + // Event received - store and track per relay + const event = response as NostrEvent & { _relay?: string }; + + // Store in EventStore and local map + eventStore.add(event); + setEventsMap((prev) => { + const next = new Map(prev); + next.set(event.id, event); + return next; + }); + + // Update relay state for this specific relay + // Use url from subscription, not event._relay (which might be wrong) + setRelayStates((prev) => { + const state = prev.get(url); + const now = Date.now(); + const next = new Map(prev); + + if (!state) { + // Relay not in map - initialize it (defensive) + console.warn( + "REQ Enhanced: Event from unknown relay, initializing", + url, + ); + next.set(url, { + url, + connectionState: "connected", + subscriptionState: "receiving", + eventCount: 1, + firstEventAt: now, + lastEventAt: now, + }); + } else { + // Update existing relay state + next.set(url, { + ...state, + subscriptionState: "receiving", + eventCount: state.eventCount + 1, + firstEventAt: state.firstEventAt ?? now, + lastEventAt: now, + }); + } + + return next; + }); + } else { + console.warn( + "REQ Enhanced: Unexpected response type from", + url, + response, + ); + } + }, + (err: Error) => { + console.error("REQ Enhanced: Error from", url, err); + // Mark this relay as errored + setRelayStates((prev) => { + const state = prev.get(url); + if (!state) return prev; + + const next = new Map(prev); + next.set(url, { + ...state, + subscriptionState: "error", + errorMessage: err.message, + errorType: "connection", + }); return next; }); - } - } else { - console.warn("REQ Enhanced: Unexpected response type:", response); - } - }, - (err: Error) => { - console.error("REQ Enhanced: Error", err); - setError(err); - setLoading(false); - }, - () => { - // Observable completed - if (!stream) { - setLoading(false); - } - }, - ); + }, + () => { + // This relay's observable completed + console.log("REQ Enhanced: Relay completed", url); + }, + ); + }); + // Cleanup: unsubscribe from all relays return () => { - subscription.unsubscribe(); + subscriptions.forEach((sub) => sub.unsubscribe()); }; }, [id, stableFilters, stableRelays, limit, stream, eventStore]);