mirror of
https://github.com/purrgrammer/grimoire.git
synced 2026-04-09 15:07:10 +02:00
fix: implement per-relay EOSE detection by subscribing to relays individually
**CRITICAL FIX for EOSE detection:** **The Problem:** - Used pool.subscription(relays, filters) which creates a RelayGroup - RelayGroup tracks per-relay EOSE internally but only emits ONE "EOSE" when ALL relays finish - This caused: 1. EOSE indicators taking forever to appear (waiting for slowest relay) 2. REQ stuck in LOADING state when fast relays finish but slow relays never do 3. No way to show per-relay EOSE status accurately **The Solution:** Subscribe to each relay individually using pool.relay(url).subscription(): - Each relay subscription emits its own EOSE immediately when that relay finishes - We track per-relay EOSE in relayStates map with accurate timing - Overall EOSE is derived when ALL relays reach terminal state (eose/error/disconnected) - EOSE indicators now appear immediately as each relay finishes **Technical Details:** - Changed from: pool.subscription(relays, filters) - Changed to: relays.map(url => pool.relay(url).subscription(filters)) - Added eoseReceivedRef to track overall EOSE in closures - Mark specific relay as EOSE when that relay emits "EOSE" - Calculate overall EOSE when all relays in terminal states - Use url from subscription context (more reliable than event._relay) **Benefits:** ✅ Instant per-relay EOSE indicators (no waiting for slowest relay) ✅ Accurate relay state tracking (each relay independent) ✅ REQ transitions to LIVE/CLOSED as soon as all relays finish ✅ Better user feedback (see which relays are done vs still loading) All 639 tests passing.
This commit is contained in:
@@ -64,6 +64,12 @@ export function useReqTimelineEnhanced(
|
||||
new Map(),
|
||||
);
|
||||
const queryStartedAt = useRef<number>(Date.now());
|
||||
const eoseReceivedRef = useRef<boolean>(false);
|
||||
|
||||
// Keep ref in sync with state
|
||||
useEffect(() => {
|
||||
eoseReceivedRef.current = eoseReceived;
|
||||
}, [eoseReceived]);
|
||||
|
||||
// Get global relay connection states from RelayStateManager
|
||||
const { relays: globalRelayStates } = useRelayState();
|
||||
@@ -179,114 +185,137 @@ export function useReqTimelineEnhanced(
|
||||
limit: limit || f.limit,
|
||||
}));
|
||||
|
||||
const observable = pool.subscription(relays, filtersWithLimit, {
|
||||
retries: 5,
|
||||
reconnect: 5,
|
||||
resubscribe: true,
|
||||
eventStore,
|
||||
});
|
||||
// CRITICAL FIX: Subscribe to each relay INDIVIDUALLY to get per-relay EOSE
|
||||
// Previously used pool.subscription() which only emits EOSE when ALL relays finish
|
||||
// Now we track each relay separately for accurate per-relay EOSE detection
|
||||
const subscriptions = relays.map((url) => {
|
||||
const relay = pool.relay(url);
|
||||
|
||||
const subscription = observable.subscribe(
|
||||
(response) => {
|
||||
// Response can be an event or 'EOSE' string
|
||||
if (typeof response === "string") {
|
||||
console.log("REQ Enhanced: EOSE received");
|
||||
setEoseReceived(true);
|
||||
if (!stream) {
|
||||
setLoading(false);
|
||||
}
|
||||
return relay
|
||||
.subscription(filtersWithLimit, {
|
||||
retries: 5,
|
||||
reconnect: 5,
|
||||
resubscribe: true,
|
||||
})
|
||||
.subscribe(
|
||||
(response) => {
|
||||
// Response can be an event or 'EOSE' string
|
||||
if (typeof response === "string" && response === "EOSE") {
|
||||
console.log("REQ Enhanced: EOSE received from", url);
|
||||
|
||||
// Mark all connected relays as having received EOSE
|
||||
// Note: We can't tell which specific relay sent EOSE due to
|
||||
// applesauce-relay's catchError bug that converts errors to EOSE.
|
||||
// We mark all connected relays as a best-effort approximation.
|
||||
setRelayStates((prev) => {
|
||||
const next = new Map(prev);
|
||||
let changed = false;
|
||||
// Mark THIS specific relay as having received EOSE
|
||||
setRelayStates((prev) => {
|
||||
const state = prev.get(url);
|
||||
if (!state || state.subscriptionState === "eose") {
|
||||
return prev; // No change needed
|
||||
}
|
||||
|
||||
for (const [url, state] of prev) {
|
||||
if (
|
||||
state.connectionState === "connected" &&
|
||||
state.subscriptionState !== "eose"
|
||||
) {
|
||||
const next = new Map(prev);
|
||||
next.set(url, {
|
||||
...state,
|
||||
subscriptionState: "eose",
|
||||
eoseAt: Date.now(),
|
||||
});
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
return changed ? next : prev;
|
||||
});
|
||||
} else if (isNostrEvent(response)) {
|
||||
// Event received - store and track per relay
|
||||
const event = response as NostrEvent & { _relay?: string };
|
||||
const relayUrl = event._relay;
|
||||
|
||||
// Store in EventStore and local map
|
||||
eventStore.add(event);
|
||||
setEventsMap((prev) => {
|
||||
const next = new Map(prev);
|
||||
next.set(event.id, event);
|
||||
return next;
|
||||
});
|
||||
|
||||
// Update relay state for this specific relay
|
||||
if (relayUrl) {
|
||||
setRelayStates((prev) => {
|
||||
const state = prev.get(relayUrl);
|
||||
const now = Date.now();
|
||||
const next = new Map(prev);
|
||||
|
||||
if (!state) {
|
||||
// Relay not in map - initialize it (defensive)
|
||||
console.warn(
|
||||
"REQ Enhanced: Event from unknown relay, initializing",
|
||||
relayUrl,
|
||||
// Check if ALL relays have reached EOSE
|
||||
const allEose = Array.from(next.values()).every(
|
||||
(s) =>
|
||||
s.subscriptionState === "eose" ||
|
||||
s.connectionState === "error" ||
|
||||
s.connectionState === "disconnected",
|
||||
);
|
||||
next.set(relayUrl, {
|
||||
url: relayUrl,
|
||||
connectionState: "connected",
|
||||
subscriptionState: "receiving",
|
||||
eventCount: 1,
|
||||
firstEventAt: now,
|
||||
lastEventAt: now,
|
||||
});
|
||||
} else {
|
||||
// Update existing relay state
|
||||
next.set(relayUrl, {
|
||||
...state,
|
||||
subscriptionState: "receiving",
|
||||
eventCount: state.eventCount + 1,
|
||||
firstEventAt: state.firstEventAt ?? now,
|
||||
lastEventAt: now,
|
||||
});
|
||||
}
|
||||
|
||||
if (allEose && !eoseReceivedRef.current) {
|
||||
console.log("REQ Enhanced: All relays finished");
|
||||
setEoseReceived(true);
|
||||
if (!stream) {
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
|
||||
return next;
|
||||
});
|
||||
} else if (isNostrEvent(response)) {
|
||||
// Event received - store and track per relay
|
||||
const event = response as NostrEvent & { _relay?: string };
|
||||
|
||||
// Store in EventStore and local map
|
||||
eventStore.add(event);
|
||||
setEventsMap((prev) => {
|
||||
const next = new Map(prev);
|
||||
next.set(event.id, event);
|
||||
return next;
|
||||
});
|
||||
|
||||
// Update relay state for this specific relay
|
||||
// Use url from subscription, not event._relay (which might be wrong)
|
||||
setRelayStates((prev) => {
|
||||
const state = prev.get(url);
|
||||
const now = Date.now();
|
||||
const next = new Map(prev);
|
||||
|
||||
if (!state) {
|
||||
// Relay not in map - initialize it (defensive)
|
||||
console.warn(
|
||||
"REQ Enhanced: Event from unknown relay, initializing",
|
||||
url,
|
||||
);
|
||||
next.set(url, {
|
||||
url,
|
||||
connectionState: "connected",
|
||||
subscriptionState: "receiving",
|
||||
eventCount: 1,
|
||||
firstEventAt: now,
|
||||
lastEventAt: now,
|
||||
});
|
||||
} else {
|
||||
// Update existing relay state
|
||||
next.set(url, {
|
||||
...state,
|
||||
subscriptionState: "receiving",
|
||||
eventCount: state.eventCount + 1,
|
||||
firstEventAt: state.firstEventAt ?? now,
|
||||
lastEventAt: now,
|
||||
});
|
||||
}
|
||||
|
||||
return next;
|
||||
});
|
||||
} else {
|
||||
console.warn(
|
||||
"REQ Enhanced: Unexpected response type from",
|
||||
url,
|
||||
response,
|
||||
);
|
||||
}
|
||||
},
|
||||
(err: Error) => {
|
||||
console.error("REQ Enhanced: Error from", url, err);
|
||||
// Mark this relay as errored
|
||||
setRelayStates((prev) => {
|
||||
const state = prev.get(url);
|
||||
if (!state) return prev;
|
||||
|
||||
const next = new Map(prev);
|
||||
next.set(url, {
|
||||
...state,
|
||||
subscriptionState: "error",
|
||||
errorMessage: err.message,
|
||||
errorType: "connection",
|
||||
});
|
||||
return next;
|
||||
});
|
||||
}
|
||||
} else {
|
||||
console.warn("REQ Enhanced: Unexpected response type:", response);
|
||||
}
|
||||
},
|
||||
(err: Error) => {
|
||||
console.error("REQ Enhanced: Error", err);
|
||||
setError(err);
|
||||
setLoading(false);
|
||||
},
|
||||
() => {
|
||||
// Observable completed
|
||||
if (!stream) {
|
||||
setLoading(false);
|
||||
}
|
||||
},
|
||||
);
|
||||
},
|
||||
() => {
|
||||
// This relay's observable completed
|
||||
console.log("REQ Enhanced: Relay completed", url);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
// Cleanup: unsubscribe from all relays
|
||||
return () => {
|
||||
subscription.unsubscribe();
|
||||
subscriptions.forEach((sub) => sub.unsubscribe());
|
||||
};
|
||||
}, [id, stableFilters, stableRelays, limit, stream, eventStore]);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user