fix infinite loop with multi-subscription

This commit is contained in:
hzrd149
2024-05-09 11:35:48 -05:00
parent 25510a61b9
commit cc759228cf
8 changed files with 68 additions and 80 deletions

View File

@@ -75,8 +75,8 @@
"react-diff-viewer-continued": "^3.4.0", "react-diff-viewer-continued": "^3.4.0",
"react-dom": "^18.2.0", "react-dom": "^18.2.0",
"react-error-boundary": "^4.0.11", "react-error-boundary": "^4.0.11",
"react-force-graph-2d": "^1.25.1", "react-force-graph-2d": "^1.25.4",
"react-force-graph-3d": "^1.23.1", "react-force-graph-3d": "^1.24.2",
"react-hook-form": "^7.45.4", "react-hook-form": "^7.45.4",
"react-markdown": "^9.0.1", "react-markdown": "^9.0.1",
"react-mosaic-component": "^6.1.0", "react-mosaic-component": "^6.1.0",
@@ -90,8 +90,8 @@
"remark-gfm": "^4.0.0", "remark-gfm": "^4.0.0",
"remark-wiki-link": "^2.0.1", "remark-wiki-link": "^2.0.1",
"three": "^0.160.0", "three": "^0.160.0",
"three-spritetext": "^1.8.1", "three-spritetext": "^1.8.2",
"three-stdlib": "^2.29.4", "three-stdlib": "^2.29.11",
"webln": "^0.3.2", "webln": "^0.3.2",
"workbox-core": "7.0.0", "workbox-core": "7.0.0",
"workbox-precaching": "7.0.0", "workbox-precaching": "7.0.0",

View File

@@ -33,6 +33,7 @@ export default class ChunkedRequest {
/** set to true when the next chunk produces 0 events */ /** set to true when the next chunk produces 0 events */
complete = false; complete = false;
private lastChunkIdx = 0;
onChunkFinish = new Subject<number>(); onChunkFinish = new Subject<number>();
constructor(relay: SimpleRelay | AbstractRelay, filters: Filter[], log?: Debugger) { constructor(relay: SimpleRelay | AbstractRelay, filters: Filter[], log?: Debugger) {
@@ -72,6 +73,8 @@ export default class ChunkedRequest {
this.process.active = true; this.process.active = true;
await new Promise<number>((res) => { await new Promise<number>((res) => {
const sub = this.relay.subscribe(filters, { const sub = this.relay.subscribe(filters, {
// @ts-expect-error
id: this.id + "-" + this.lastChunkIdx++,
onevent: (event) => { onevent: (event) => {
this.handleEvent(event); this.handleEvent(event);
gotEvents++; gotEvents++;

View File

@@ -23,7 +23,10 @@ export default class MultiSubscription {
relays = new Set<AbstractRelay>(); relays = new Set<AbstractRelay>();
subscriptions = new Map<AbstractRelay, PersistentSubscription>(); subscriptions = new Map<AbstractRelay, PersistentSubscription>();
useCache = true;
cacheSubscription: PersistentSubscription | null = null; cacheSubscription: PersistentSubscription | null = null;
onCacheEvent = new ControlledObservable<NostrEvent>();
state = MultiSubscription.CLOSED; state = MultiSubscription.CLOSED;
onEvent = new ControlledObservable<NostrEvent>(); onEvent = new ControlledObservable<NostrEvent>();
@@ -38,9 +41,12 @@ export default class MultiSubscription {
processManager.registerProcess(this.process); processManager.registerProcess(this.process);
} }
private handleEvent(event: NostrEvent) { private handleEvent(event: NostrEvent, fromCache = false) {
if (this.seenEvents.has(event.id)) return; if (this.seenEvents.has(event.id)) return;
this.onEvent.next(event);
if (fromCache) this.onCacheEvent.next(event);
else this.onEvent.next(event);
this.seenEvents.add(event.id); this.seenEvents.add(event.id);
} }
@@ -102,18 +108,22 @@ export default class MultiSubscription {
} }
} }
// create cache sub if it does not exist if (this.useCache) {
if (!this.cacheSubscription && localRelay) { // create cache sub if it does not exist
this.cacheSubscription = new PersistentSubscription(localRelay as AbstractRelay, { if (!this.cacheSubscription && localRelay) {
onevent: (event) => this.handleEvent(event), this.cacheSubscription = new PersistentSubscription(localRelay as AbstractRelay, {
}); onevent: (event) => this.handleEvent(event, true),
this.process.addChild(this.cacheSubscription.process); });
} this.process.addChild(this.cacheSubscription.process);
}
// update cache sub filters if they changed // update cache sub filters if they changed
if (this.cacheSubscription && !isFilterEqual(this.cacheSubscription.filters, this.filters)) { if (this.cacheSubscription && !isFilterEqual(this.cacheSubscription.filters, this.filters)) {
this.cacheSubscription.filters = this.filters; this.cacheSubscription.filters = this.filters;
this.cacheSubscription.update(); this.cacheSubscription.update();
}
} else if (this.cacheSubscription?.closed === false) {
this.cacheSubscription.close();
} }
} }

View File

@@ -55,6 +55,7 @@ export default class TimelineLoader {
this.subscription = new MultiSubscription(name); this.subscription = new MultiSubscription(name);
this.subscription.onEvent.subscribe(this.handleEvent.bind(this)); this.subscription.onEvent.subscribe(this.handleEvent.bind(this));
this.subscription.onCacheEvent.subscribe((event) => this.handleEvent(event, true));
this.process.addChild(this.subscription.process); this.process.addChild(this.subscription.process);
// update the timeline when there are new events // update the timeline when there are new events
@@ -72,12 +73,16 @@ export default class TimelineLoader {
this.timeline.next(this.events.getSortedEvents().filter((e) => filter(e, this.events))); this.timeline.next(this.events.getSortedEvents().filter((e) => filter(e, this.events)));
} else this.timeline.next(this.events.getSortedEvents()); } else this.timeline.next(this.events.getSortedEvents());
} }
private handleEvent(event: NostrEvent, cache = true) {
private seenInCache = new Set<string>();
private handleEvent(event: NostrEvent, fromCache = false) {
// if this is a replaceable event, mirror it over to the replaceable event service // if this is a replaceable event, mirror it over to the replaceable event service
if (isReplaceable(event.kind)) replaceableEventsService.handleEvent(event); if (isReplaceable(event.kind)) replaceableEventsService.handleEvent(event);
this.events.addEvent(event); this.events.addEvent(event);
if (cache && localRelay) localRelay.publish(event); if (!fromCache && localRelay && !this.seenInCache.has(event.id)) localRelay.publish(event);
if (fromCache) this.seenInCache.add(event.id);
} }
private handleChunkFinished() { private handleChunkFinished() {
this.updateLoading(); this.updateLoading();

View File

@@ -26,8 +26,6 @@ export function getHumanReadableCoordinate(kind: number, pubkey: string, d?: str
return `${kind}:${truncateId(pubkey)}${d ? ":" + d : ""}`; return `${kind}:${truncateId(pubkey)}${d ? ":" + d : ""}`;
} }
const WRITE_CACHE_BATCH_TIME = 250;
class ReplaceableEventsService { class ReplaceableEventsService {
process: Process; process: Process;
@@ -53,13 +51,14 @@ class ReplaceableEventsService {
if (localRelay) { if (localRelay) {
this.cacheLoader = new BatchKindLoader(localRelay as AbstractRelay, this.log.extend("cache-relay")); this.cacheLoader = new BatchKindLoader(localRelay as AbstractRelay, this.log.extend("cache-relay"));
this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e, false)); this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e, true));
this.process.addChild(this.cacheLoader.process); this.process.addChild(this.cacheLoader.process);
} }
} }
handleEvent(event: NostrEvent, saveToCache = true) { private seenInCache = new Set<string>();
if (!alwaysVerify(event)) return; handleEvent(event: NostrEvent, fromCache = false) {
if (!fromCache && !alwaysVerify(event)) return;
const cord = getEventCoordinate(event); const cord = getEventCoordinate(event);
const subject = this.subjects.get(cord); const subject = this.subjects.get(cord);
@@ -67,30 +66,17 @@ class ReplaceableEventsService {
if (!current || event.created_at > current.created_at) { if (!current || event.created_at > current.created_at) {
subject.next(event); subject.next(event);
this.events.addEvent(event); this.events.addEvent(event);
if (saveToCache) this.saveToCache(cord, event);
if (!fromCache && localRelay && !this.seenInCache.has(event.id)) localRelay.publish(event);
} }
if (fromCache) this.seenInCache.add(event.id);
} }
getEvent(kind: number, pubkey: string, d?: string) { getEvent(kind: number, pubkey: string, d?: string) {
return this.subjects.get(createCoordinate(kind, pubkey, d)); return this.subjects.get(createCoordinate(kind, pubkey, d));
} }
private writeCacheQueue = new Map<string, NostrEvent>();
private writeToCacheThrottle = _throttle(this.writeToCache, WRITE_CACHE_BATCH_TIME);
private async writeToCache() {
if (this.writeCacheQueue.size === 0) return;
if (localRelay) {
this.log(`Sending ${this.writeCacheQueue.size} events to cache relay`);
for (const [_, event] of this.writeCacheQueue) localRelay.publish(event);
}
this.writeCacheQueue.clear();
}
private async saveToCache(cord: string, event: NostrEvent) {
this.writeCacheQueue.set(cord, event);
this.writeToCacheThrottle();
}
private requestEventFromRelays( private requestEventFromRelays(
urls: Iterable<string | URL | AbstractRelay>, urls: Iterable<string | URL | AbstractRelay>,
kind: number, kind: number,

View File

@@ -73,7 +73,7 @@ class SingleEventService {
.requestEvent(id) .requestEvent(id)
.then((cached) => { .then((cached) => {
this.loadingFromCache.delete(id); this.loadingFromCache.delete(id);
if (cached) this.handleEvent(cached, false); if (cached) this.handleEvent(cached, true);
else this.loadEventFromRelays(id); else this.loadEventFromRelays(id);
}); });
} }
@@ -82,11 +82,11 @@ class SingleEventService {
return subject; return subject;
} }
handleEvent(event: NostrEvent, cache = true) { handleEvent(event: NostrEvent, fromCache = false) {
this.events.addEvent(event); this.events.addEvent(event);
this.pendingRelays.delete(event.id); this.pendingRelays.delete(event.id);
if (cache && localRelay) localRelay.publish(event); if (!fromCache && localRelay) localRelay.publish(event);
} }
} }

View File

@@ -1,17 +1,9 @@
import { useMemo } from "react"; import { useMemo } from "react";
import { Box, Button, Flex, Text } from "@chakra-ui/react"; import { Box, Button, Flex, Text } from "@chakra-ui/react";
import AutoSizer from "react-virtualized-auto-sizer"; import AutoSizer from "react-virtualized-auto-sizer";
import { useNavigate } from "react-router-dom";
import ForceGraph, { LinkObject, NodeObject } from "react-force-graph-3d"; import ForceGraph, { LinkObject, NodeObject } from "react-force-graph-3d";
import { import { Mesh, MeshBasicMaterial, SRGBColorSpace, SphereGeometry, Sprite, SpriteMaterial, TextureLoader } from "three";
Group,
Mesh,
MeshBasicMaterial,
SRGBColorSpace,
SphereGeometry,
Sprite,
SpriteMaterial,
TextureLoader,
} from "three";
import useCurrentAccount from "../../hooks/use-current-account"; import useCurrentAccount from "../../hooks/use-current-account";
import RequireCurrentAccount from "../../providers/route/require-current-account"; import RequireCurrentAccount from "../../providers/route/require-current-account";
@@ -22,7 +14,6 @@ import { useReadRelays } from "../../hooks/use-client-relays";
import replaceableEventsService from "../../services/replaceable-events"; import replaceableEventsService from "../../services/replaceable-events";
import useSubjects from "../../hooks/use-subjects"; import useSubjects from "../../hooks/use-subjects";
import useUserMetadata from "../../hooks/use-user-metadata"; import useUserMetadata from "../../hooks/use-user-metadata";
import { useNavigate } from "react-router-dom";
import { ChevronLeftIcon } from "../../components/icons"; import { ChevronLeftIcon } from "../../components/icons";
export function useUsersMuteLists(pubkeys: string[], additionalRelays?: Iterable<string>) { export function useUsersMuteLists(pubkeys: string[], additionalRelays?: Iterable<string>) {
@@ -62,7 +53,7 @@ function NetworkGraphPage() {
const metadata = usersMetadata[pubkey]; const metadata = usersMetadata[pubkey];
if (metadata) { if (metadata) {
node.image = metadata.picture; node.image = metadata.picture || metadata.image;
node.name = metadata.name; node.name = metadata.name;
} }
@@ -105,25 +96,18 @@ function NetworkGraphPage() {
linkCurvature={0.25} linkCurvature={0.25}
nodeThreeObject={(node: NodeType) => { nodeThreeObject={(node: NodeType) => {
if (!node.image) { if (!node.image) {
return new Mesh(new SphereGeometry(5, 12, 6), new MeshBasicMaterial({ color: 0xaa0f0f })); return new Mesh(
new SphereGeometry(5, 12, 6),
new MeshBasicMaterial({ color: parseInt(node.id.slice(0, 6), 16) }),
);
} }
const group = new Group();
const imgTexture = new TextureLoader().load(node.image); const imgTexture = new TextureLoader().load(node.image);
imgTexture.colorSpace = SRGBColorSpace; imgTexture.colorSpace = SRGBColorSpace;
const material = new SpriteMaterial({ map: imgTexture }); const material = new SpriteMaterial({ map: imgTexture });
const sprite = new Sprite(material); const sprite = new Sprite(material);
sprite.scale.set(10, 10, 10); sprite.scale.set(10, 10, 10);
group.children.push(sprite);
// if (node.name) {
// const text = new SpriteText(node.name, 8, "ffffff");
// text.position.set(0, 0, 16);
// group.children.push(text);
// }
return sprite; return sprite;
}} }}
/> />

View File

@@ -6688,19 +6688,19 @@ react-focus-lock@^2.9.4:
use-callback-ref "^1.3.0" use-callback-ref "^1.3.0"
use-sidecar "^1.1.2" use-sidecar "^1.1.2"
react-force-graph-2d@^1.25.1: react-force-graph-2d@^1.25.4:
version "1.25.3" version "1.25.4"
resolved "https://registry.yarnpkg.com/react-force-graph-2d/-/react-force-graph-2d-1.25.3.tgz#9ed1f4bafd0a9bc3e8ed98d5b1c584b728ac856f" resolved "https://registry.yarnpkg.com/react-force-graph-2d/-/react-force-graph-2d-1.25.4.tgz#91f9e8169d0eeb6a7e36c36dd99da5128702b776"
integrity sha512-BFgZ8Mbq03fEO3Xss9hi1LAReu0LatEVpqJlrm9023p6rJMaUIrGWfVPQUx7ObjMfVBfuq0JXvCcCEpo+FmO1Q== integrity sha512-Y1xwa79PHVZUedfa/TO+Cboq2hIc1flA1z4o1oraOu6qMS0r421vNpfjWhJPR6qJonNme3tzeVt5boEA7Ue8sg==
dependencies: dependencies:
force-graph "1" force-graph "1"
prop-types "15" prop-types "15"
react-kapsule "2" react-kapsule "2"
react-force-graph-3d@^1.23.1: react-force-graph-3d@^1.24.2:
version "1.24.1" version "1.24.2"
resolved "https://registry.yarnpkg.com/react-force-graph-3d/-/react-force-graph-3d-1.24.1.tgz#5a4d39578b7253a96efbdc3a02c98a26901ca751" resolved "https://registry.yarnpkg.com/react-force-graph-3d/-/react-force-graph-3d-1.24.2.tgz#e94d14981e06f3a0b5b4a5f5d69888a2ff88ee35"
integrity sha512-Qx3z397VueK2KMjxETx7HRBqFFjEejjsFLgzCbne4Op+0+SlfGjwqxOCopAdAqcK6D+SQ0mhzTUCWw74Sr5wGA== integrity sha512-/tZ0BywYuj35Q84AH2WN+Cx0RIygnN5F1+EvsdAqsAMoIJ0xl4L/9aD/pwjCoWfFqi3w5wR2DQuitDXeTayZnQ==
dependencies: dependencies:
"3d-force-graph" "^1.73" "3d-force-graph" "^1.73"
prop-types "15" prop-types "15"
@@ -7592,15 +7592,15 @@ three-render-objects@^1.29:
kapsule "1" kapsule "1"
polished "4" polished "4"
three-spritetext@^1.8.1: three-spritetext@^1.8.2:
version "1.8.2" version "1.8.2"
resolved "https://registry.yarnpkg.com/three-spritetext/-/three-spritetext-1.8.2.tgz#62d8106e579274c65d86df2b56dda46b13fa97cc" resolved "https://registry.yarnpkg.com/three-spritetext/-/three-spritetext-1.8.2.tgz#62d8106e579274c65d86df2b56dda46b13fa97cc"
integrity sha512-OYjyAhWnQ6+7CPKjnpq3JQM+Lpr6cSOppCtbOOzF1IbCauGkoDFvbAnxYd0LVxEsIO2RALXXScg2eX+R6CAfyA== integrity sha512-OYjyAhWnQ6+7CPKjnpq3JQM+Lpr6cSOppCtbOOzF1IbCauGkoDFvbAnxYd0LVxEsIO2RALXXScg2eX+R6CAfyA==
three-stdlib@^2.29.4: three-stdlib@^2.29.11:
version "2.29.4" version "2.29.11"
resolved "https://registry.yarnpkg.com/three-stdlib/-/three-stdlib-2.29.4.tgz#6e8741f6a2d435d15ed73f3a14dd149660d4ce51" resolved "https://registry.yarnpkg.com/three-stdlib/-/three-stdlib-2.29.11.tgz#9bec17887464e8f4fe36d41e879655b16411c686"
integrity sha512-XNzGCrz/uAk9XoLwd35eN7dQyI4ggXZTeqjcN034YdYBpBlNO9kmLHehl/0Nw9jCelblB7jla+unHAOIyLyV6Q== integrity sha512-vSyUavKCwOJQd2ch9IHhyJVx6eNG3y+z3/LulqHM7OLMy81OqnmIrvc2b4phKr/c1aVjHRNG2X7JOklfrDDcmQ==
dependencies: dependencies:
"@types/draco3d" "^1.4.0" "@types/draco3d" "^1.4.0"
"@types/offscreencanvas" "^2019.6.4" "@types/offscreencanvas" "^2019.6.4"