From df094b2c4fb59fe88681d8211de41c1063aa9906 Mon Sep 17 00:00:00 2001 From: hzrd149 Date: Thu, 15 Feb 2024 22:27:39 +0000 Subject: [PATCH] Rebuild observable class --- .changeset/clever-swans-walk.md | 5 + .prettierrc | 2 + .vscode/settings.json | 3 +- package.json | 4 +- src/classes/controlled-observable.ts | 64 ++++++++ src/classes/event-store.ts | 72 ++++----- src/classes/nostr-multi-subscription.ts | 19 ++- src/classes/nostr-publish-action.ts | 12 +- src/classes/nostr-request.ts | 28 ++-- src/classes/nostr-subscription.ts | 31 ++-- src/classes/relay-set.ts | 1 - src/classes/relay.ts | 17 +- src/classes/subject.ts | 147 ++++++------------ src/classes/timeline-loader.ts | 38 ++--- .../generic-note-timeline/index.tsx | 4 +- src/hooks/use-events-reactions.ts | 15 +- src/hooks/use-subject.ts | 12 +- src/hooks/use-subjects.ts | 10 +- src/providers/local/intersection-observer.tsx | 2 +- src/services/channel-metadata.ts | 4 +- src/services/db/index.ts | 1 + src/services/delete-events.ts | 4 +- src/services/event-reactions.ts | 2 +- src/services/event-zaps.ts | 2 +- src/services/local-relay.ts | 3 +- src/services/nostr-connect.ts | 4 +- src/services/relay-stats.ts | 4 +- src/services/replaceable-event-requester.ts | 62 +++----- src/services/settings/app-settings.ts | 7 +- src/services/settings/user-app-settings.ts | 2 +- src/services/signing.tsx | 2 +- src/services/single-event.ts | 16 +- src/services/user-mailboxes.ts | 15 +- src/services/user-metadata.ts | 41 +---- src/services/username-search.ts | 33 ++++ src/views/relays/relay/relay-details.tsx | 6 +- src/views/streams/dashboard/users-card.tsx | 5 +- src/views/streams/dashboard/zaps-card.tsx | 4 +- src/views/tools/network-dm-graph.tsx | 7 +- yarn.lock | 10 ++ 40 files changed, 375 insertions(+), 345 deletions(-) create mode 100644 .changeset/clever-swans-walk.md create mode 100644 src/classes/controlled-observable.ts create mode 100644 src/services/username-search.ts diff --git a/.changeset/clever-swans-walk.md b/.changeset/clever-swans-walk.md new file mode 100644 index 000000000..6bcaeb946 --- /dev/null +++ b/.changeset/clever-swans-walk.md @@ -0,0 +1,5 @@ +--- +"nostrudel": patch +--- + +Rebuild observable class diff --git a/.prettierrc b/.prettierrc index 963354f23..054d599cf 100644 --- a/.prettierrc +++ b/.prettierrc @@ -1,3 +1,5 @@ { + "tabWidth": 2, + "useTabs": false, "printWidth": 120 } diff --git a/.vscode/settings.json b/.vscode/settings.json index 491303bc7..85c632814 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,5 +13,6 @@ "webln" ], "typescript.enablePromptUseWorkspaceTsdk": true, - "typescript.tsdk": "node_modules/typescript/lib" + "typescript.tsdk": "node_modules/typescript/lib", + "deno.enable": false } diff --git a/package.json b/package.json index 338364ac2..c8ab1fd2e 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,8 @@ "three-spritetext": "^1.8.1", "three-stdlib": "^2.29.4", "webln": "^0.3.2", - "yet-another-react-lightbox": "^3.15.6" + "yet-another-react-lightbox": "^3.15.6", + "zen-observable": "^0.10.0" }, "devDependencies": { "@changesets/cli": "^2.27.1", @@ -95,6 +96,7 @@ "@types/react-dom": "^18.2.18", "@types/three": "^0.160.0", "@types/webscopeio__react-textarea-autocomplete": "^4.7.5", + "@types/zen-observable": "^0.8.7", "@vitejs/plugin-react": "^4.2.1", "camelcase": "^8.0.0", "prettier": "^3.1.1", diff --git a/src/classes/controlled-observable.ts b/src/classes/controlled-observable.ts new file mode 100644 index 000000000..91da6853e --- /dev/null +++ b/src/classes/controlled-observable.ts @@ -0,0 +1,64 @@ +import Observable from "zen-observable"; + +export default class ControlledObservable implements Observable { + private observable: Observable; + private subscriptions = new Set>(); + private _complete = false; + get closed() { + return this._complete; + } + get used() { + return this.subscriptions.size > 0; + } + + constructor(subscriber?: ZenObservable.Subscriber) { + this.observable = new Observable((observer) => { + this.subscriptions.add(observer); + const cleanup = subscriber && subscriber(observer); + return () => { + this.subscriptions.delete(observer); + if (typeof cleanup === "function") cleanup(); + else if (cleanup?.unsubscribe) cleanup.unsubscribe(); + }; + }); + + this.subscribe = this.observable.subscribe.bind(this.observable); + this.map = this.observable.map.bind(this.observable); + this.flatMap = this.observable.flatMap.bind(this.observable); + this.forEach = this.observable.forEach.bind(this.observable); + this.reduce = this.observable.reduce.bind(this.observable); + this.filter = this.observable.filter.bind(this.observable); + this.concat = this.observable.concat.bind(this.observable); + } + + next(v: T) { + if (this._complete) return; + for (const observer of this.subscriptions) { + observer.next(v); + } + } + error(err: any) { + if (this._complete) return; + for (const observer of this.subscriptions) { + observer.error(err); + } + } + complete() { + if (this._complete) return; + this._complete = true; + for (const observer of this.subscriptions) { + observer.complete(); + } + } + + [Symbol.observable]() { + return this.observable; + } + subscribe: Observable["subscribe"]; + map: Observable["map"]; + flatMap: Observable["flatMap"]; + forEach: Observable["forEach"]; + reduce: Observable["reduce"]; + filter: Observable["filter"]; + concat: Observable["concat"]; +} diff --git a/src/classes/event-store.ts b/src/classes/event-store.ts index 54c710f11..e53302094 100644 --- a/src/classes/event-store.ts +++ b/src/classes/event-store.ts @@ -1,7 +1,8 @@ -import { getEventUID, isReplaceable, sortByDate } from "../helpers/nostr/events"; -import replaceableEventLoaderService from "../services/replaceable-event-requester"; -import { NostrEvent, isDTag } from "../types/nostr-event"; -import Subject from "./subject"; +import { NostrEvent } from "nostr-tools"; +import { getEventUID, sortByDate } from "../helpers/nostr/events"; +import ControlledObservable from "./controlled-observable"; +import SuperMap from "./super-map"; +import deleteEventService from "../services/delete-events"; export type EventFilter = (event: NostrEvent, store: EventStore) => boolean; @@ -11,20 +12,27 @@ export default class EventStore { customSort?: typeof sortByDate; + private deleteSub: ZenObservable.Subscription; + constructor(name?: string, customSort?: typeof sortByDate) { this.name = name; this.customSort = customSort; + + this.deleteSub = deleteEventService.stream.subscribe((event) => { + const uid = getEventUID(event); + this.deleteEvent(uid); + if (uid !== event.id) this.deleteEvent(event.id); + }); } getSortedEvents() { return Array.from(this.events.values()).sort(this.customSort || sortByDate); } - onEvent = new Subject(undefined, false); - onDelete = new Subject(undefined, false); - onClear = new Subject(undefined, false); + onEvent = new ControlledObservable(); + onDelete = new ControlledObservable(); + onClear = new ControlledObservable(); - private replaceableEventSubs = new Map>(); private handleEvent(event: NostrEvent) { const id = getEventUID(event); const existing = this.events.get(id); @@ -37,16 +45,6 @@ export default class EventStore { addEvent(event: NostrEvent) { const id = getEventUID(event); this.handleEvent(event); - - if (isReplaceable(event.kind)) { - // pass the event on - replaceableEventLoaderService.handleEvent(event); - - // subscribe to any future changes - const sub = replaceableEventLoaderService.getEvent(event.kind, event.pubkey, event.tags.find(isDTag)?.[1]); - sub.subscribe(this.handleEvent, this); - this.replaceableEventSubs.set(id, sub); - } } getEvent(id: string) { return this.events.get(id); @@ -56,32 +54,36 @@ export default class EventStore { this.events.delete(id); this.onDelete.next(id); } - - if (this.replaceableEventSubs.has(id)) { - this.replaceableEventSubs.get(id)?.unsubscribe(this.handleEvent, this); - this.replaceableEventSubs.delete(id); - } } clear() { this.events.clear(); this.onClear.next(undefined); - - for (const [_, sub] of this.replaceableEventSubs) { - sub.unsubscribe(this.handleEvent, this); - } - } - cleanup() { - this.clear(); } - connect(other: EventStore) { - other.onEvent.subscribe(this.addEvent, this); - other.onDelete.subscribe(this.deleteEvent, this); + private storeSubs = new SuperMap(() => []); + connect(other: EventStore, fullSync = true) { + const subs = this.storeSubs.get(other); + subs.push( + other.onEvent.subscribe((e) => { + if (fullSync || this.events.has(getEventUID(e))) this.addEvent(e); + }), + ); + subs.push(other.onDelete.subscribe(this.deleteEvent.bind(this))); } disconnect(other: EventStore) { - other.onEvent.unsubscribe(this.addEvent, this); - other.onDelete.unsubscribe(this.deleteEvent, this); + const subs = this.storeSubs.get(other); + for (const sub of subs) sub.unsubscribe(); + this.storeSubs.delete(other); + } + + cleanup() { + this.clear(); + for (const [_, subs] of this.storeSubs) { + for (const sub of subs) sub.unsubscribe(); + } + this.storeSubs.clear(); + this.deleteSub.unsubscribe(); } getFirstEvent(nth = 0, filter?: EventFilter) { diff --git a/src/classes/nostr-multi-subscription.ts b/src/classes/nostr-multi-subscription.ts index 2ac063658..1da4555bf 100644 --- a/src/classes/nostr-multi-subscription.ts +++ b/src/classes/nostr-multi-subscription.ts @@ -1,11 +1,12 @@ import { nanoid } from "nanoid"; -import { Subject } from "./subject"; import { NostrEvent } from "../types/nostr-event"; import { NostrOutgoingRequest, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query"; import Relay, { IncomingEvent } from "./relay"; import relayPoolService from "../services/relay-pool"; import { isFilterEqual, isQueryMapEqual } from "../helpers/nostr/filter"; +import ControlledObservable from "./controlled-observable"; +import SuperMap from "./super-map"; export default class NostrMultiSubscription { static INIT = "initial"; @@ -18,7 +19,7 @@ export default class NostrMultiSubscription { relays: Relay[] = []; state = NostrMultiSubscription.INIT; - onEvent = new Subject(); + onEvent = new ControlledObservable(); seenEvents = new Set(); constructor(name?: string) { @@ -36,18 +37,20 @@ export default class NostrMultiSubscription { } } + private relaySubs = new SuperMap(() => []); /** listen for event and open events from relays */ private connectToRelay(relay: Relay) { - relay.onEvent.subscribe(this.handleEvent, this); - relay.onOpen.subscribe(this.handleRelayConnect, this); - relay.onClose.subscribe(this.handleRelayDisconnect, this); + const subs = this.relaySubs.get(relay); + subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this))); + subs.push(relay.onOpen.subscribe(this.handleRelayConnect.bind(this))); + subs.push(relay.onClose.subscribe(this.handleRelayDisconnect.bind(this))); relayPoolService.addClaim(relay.url, this); } /** stop listing to events from relays */ private disconnectFromRelay(relay: Relay) { - relay.onEvent.unsubscribe(this.handleEvent, this); - relay.onOpen.unsubscribe(this.handleRelayConnect, this); - relay.onClose.unsubscribe(this.handleRelayDisconnect, this); + const subs = this.relaySubs.get(relay); + for (const sub of subs) sub.unsubscribe(); + this.relaySubs.delete(relay); relayPoolService.removeClaim(relay.url, this); // if the subscription is open and had sent a request to the relay diff --git a/src/classes/nostr-publish-action.ts b/src/classes/nostr-publish-action.ts index 4e2bb8f55..dbc098622 100644 --- a/src/classes/nostr-publish-action.ts +++ b/src/classes/nostr-publish-action.ts @@ -4,7 +4,9 @@ import { NostrEvent } from "nostr-tools"; import relayPoolService from "../services/relay-pool"; import createDefer from "./deferred"; import Relay, { IncomingCommandResult } from "./relay"; -import Subject, { PersistentSubject } from "./subject"; +import { PersistentSubject } from "./subject"; +import ControlledObservable from "./controlled-observable"; +import SuperMap from "./super-map"; export default class NostrPublishAction { id = nanoid(); @@ -13,10 +15,11 @@ export default class NostrPublishAction { event: NostrEvent; results = new PersistentSubject([]); - onResult = new Subject(undefined, false); + onResult = new ControlledObservable(); onComplete = createDefer(); private remaining = new Set(); + private relayResultSubs = new SuperMap(() => []); constructor(label: string, relays: Iterable, event: NostrEvent, timeout: number = 5000) { this.label = label; @@ -26,7 +29,7 @@ export default class NostrPublishAction { for (const url of relays) { const relay = relayPoolService.requestRelay(url); this.remaining.add(relay); - relay.onCommandResult.subscribe(this.handleResult, this); + this.relayResultSubs.get(relay).push(relay.onCommandResult.subscribe(this.handleResult.bind(this))); // send event relay.send(["EVENT", event]); @@ -42,7 +45,8 @@ export default class NostrPublishAction { this.onResult.next(result); - relay.onCommandResult.unsubscribe(this.handleResult, this); + this.relayResultSubs.get(relay).forEach((s) => s.unsubscribe()); + this.relayResultSubs.delete(relay); this.remaining.delete(relay); if (this.remaining.size === 0) this.onComplete.resolve(this.results.value); } diff --git a/src/classes/nostr-request.ts b/src/classes/nostr-request.ts index 164c9baee..22b3a9c82 100644 --- a/src/classes/nostr-request.ts +++ b/src/classes/nostr-request.ts @@ -3,8 +3,9 @@ import { CountResponse, NostrEvent } from "../types/nostr-event"; import { NostrRequestFilter } from "../types/nostr-query"; import relayPoolService from "../services/relay-pool"; import Relay, { IncomingCount, IncomingEOSE, IncomingEvent } from "./relay"; -import Subject from "./subject"; import createDefer from "./deferred"; +import ControlledObservable from "./controlled-observable"; +import SuperMap from "./super-map"; const REQUEST_DEFAULT_TIMEOUT = 1000 * 5; export default class NostrRequest { @@ -12,23 +13,26 @@ export default class NostrRequest { static RUNNING = "running"; static COMPLETE = "complete"; - id: string; + id = nanoid(); timeout: number; relays: Set; state = NostrRequest.IDLE; - onEvent = new Subject(undefined, false); - onCount = new Subject(undefined, false); + onEvent = new ControlledObservable(); + onCount = new ControlledObservable(); + /** @deprecated */ onComplete = createDefer(); seenEvents = new Set(); + private relaySubs: SuperMap = new SuperMap(() => []); + constructor(relayUrls: Iterable, timeout?: number) { - this.id = nanoid(); this.relays = new Set(Array.from(relayUrls).map((url) => relayPoolService.requestRelay(url))); for (const relay of this.relays) { - relay.onEOSE.subscribe(this.handleEOSE, this); - relay.onEvent.subscribe(this.handleEvent, this); - relay.onCount.subscribe(this.handleCount, this); + const subs = this.relaySubs.get(relay); + subs.push(relay.onEOSE.subscribe(this.handleEOSE.bind(this))); + subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this))); + subs.push(relay.onCount.subscribe(this.handleCount.bind(this))); } this.timeout = timeout ?? REQUEST_DEFAULT_TIMEOUT; @@ -40,8 +44,8 @@ export default class NostrRequest { this.relays.delete(relay); relay.send(["CLOSE", this.id]); - relay.onEOSE.unsubscribe(this.handleEOSE, this); - relay.onEvent.unsubscribe(this.handleEvent, this); + this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe()); + this.relaySubs.delete(relay); if (this.relays.size === 0) { this.state = NostrRequest.COMPLETE; @@ -87,9 +91,9 @@ export default class NostrRequest { this.state = NostrRequest.COMPLETE; for (const relay of this.relays) { relay.send(["CLOSE", this.id]); - relay.onEOSE.unsubscribe(this.handleEOSE, this); - relay.onEvent.unsubscribe(this.handleEvent, this); + this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe()); } + this.relaySubs.clear(); this.onComplete.resolve(); return this; diff --git a/src/classes/nostr-subscription.ts b/src/classes/nostr-subscription.ts index fbd9831e5..d3211dcb7 100644 --- a/src/classes/nostr-subscription.ts +++ b/src/classes/nostr-subscription.ts @@ -4,7 +4,7 @@ import { NostrEvent } from "../types/nostr-event"; import { NostrOutgoingMessage, NostrRequestFilter } from "../types/nostr-query"; import Relay, { IncomingEOSE } from "./relay"; import relayPoolService from "../services/relay-pool"; -import { Subject } from "./subject"; +import ControlledObservable from "./controlled-observable"; export default class NostrSubscription { static INIT = "initial"; @@ -16,8 +16,10 @@ export default class NostrSubscription { query?: NostrRequestFilter; relay: Relay; state = NostrSubscription.INIT; - onEvent = new Subject(); - onEOSE = new Subject(); + onEvent = new ControlledObservable(); + onEOSE = new ControlledObservable(); + + private subs: ZenObservable.Subscription[] = []; constructor(relayUrl: string | URL, query?: NostrRequestFilter, name?: string) { this.id = nanoid(); @@ -26,14 +28,18 @@ export default class NostrSubscription { this.relay = relayPoolService.requestRelay(relayUrl); - this.onEvent.connectWithHandler(this.relay.onEvent, (event, next) => { - if (this.state === NostrSubscription.OPEN) { - next(event.body); - } - }); - this.onEOSE.connectWithHandler(this.relay.onEOSE, (eose, next) => { - if (this.state === NostrSubscription.OPEN) next(eose); - }); + this.subs.push( + this.relay.onEvent.subscribe((message) => { + if (this.state === NostrSubscription.OPEN && message.subId === this.id) { + this.onEvent.next(message.body); + } + }), + ); + this.subs.push( + this.relay.onEOSE.subscribe((eose) => { + if (this.state === NostrSubscription.OPEN && eose.subId === this.id) this.onEOSE.next(eose); + }), + ); } send(message: NostrOutgoingMessage) { @@ -72,6 +78,9 @@ export default class NostrSubscription { // unsubscribe from relay messages relayPoolService.removeClaim(this.relay.url, this); + for (const sub of this.subs) sub.unsubscribe(); + this.subs = []; + return this; } } diff --git a/src/classes/relay-set.ts b/src/classes/relay-set.ts index 0f1d9914f..f0720dd2d 100644 --- a/src/classes/relay-set.ts +++ b/src/classes/relay-set.ts @@ -1,6 +1,5 @@ import { relaysFromContactsEvent } from "../helpers/nostr/contacts"; import { getRelaysFromMailbox } from "../helpers/nostr/mailbox"; -import { safeJson } from "../helpers/parse"; import { safeRelayUrl } from "../helpers/relay"; import relayPoolService from "../services/relay-pool"; import { NostrEvent } from "../types/nostr-event"; diff --git a/src/classes/relay.ts b/src/classes/relay.ts index ac6abe17b..a2e39718d 100644 --- a/src/classes/relay.ts +++ b/src/classes/relay.ts @@ -2,8 +2,9 @@ import { offlineMode } from "../services/offline-mode"; import relayScoreboardService from "../services/relay-scoreboard"; import { RawIncomingNostrEvent, NostrEvent, CountResponse } from "../types/nostr-event"; import { NostrOutgoingMessage } from "../types/nostr-query"; +import ControlledObservable from "./controlled-observable"; import createDefer, { Deferred } from "./deferred"; -import { PersistentSubject, Subject } from "./subject"; +import { PersistentSubject } from "./subject"; export type IncomingEvent = { type: "EVENT"; @@ -46,13 +47,13 @@ const CONNECTION_TIMEOUT = 1000 * 30; export default class Relay { url: string; status = new PersistentSubject(WebSocket.CLOSED); - onOpen = new Subject(undefined, false); - onClose = new Subject(undefined, false); - onEvent = new Subject(undefined, false); - onNotice = new Subject(undefined, false); - onCount = new Subject(undefined, false); - onEOSE = new Subject(undefined, false); - onCommandResult = new Subject(undefined, false); + onOpen = new ControlledObservable(); + onClose = new ControlledObservable(); + onEvent = new ControlledObservable(); + onNotice = new ControlledObservable(); + onCount = new ControlledObservable(); + onEOSE = new ControlledObservable(); + onCommandResult = new ControlledObservable(); ws?: WebSocket; private connectionPromises: Deferred[] = []; diff --git a/src/classes/subject.ts b/src/classes/subject.ts index eeebd9f25..6bb593eaa 100644 --- a/src/classes/subject.ts +++ b/src/classes/subject.ts @@ -1,111 +1,62 @@ -export type ListenerFn = (value: T) => void; -interface Connectable { - value?: Value; - subscribe(listener: ListenerFn, ctx?: Object): this; - unsubscribe(listener: ListenerFn, ctx?: Object): this; -} -interface ConnectableApi { - connect(connectable: Connectable): this; - disconnect(connectable: Connectable): this; -} -export type Connection = (value: From, next: (value: To) => any, prevValue: Prev) => void; +import Observable from "zen-observable"; +import { nanoid } from "nanoid"; +import ControlledObservable from "./controlled-observable"; -export class Subject implements Connectable { - listeners: [ListenerFn, Object | undefined][] = []; +/** An observable that is always open and stores the last value */ +export default class Subject { + private observable: ControlledObservable; + id = nanoid(8); + value: T | undefined; - value?: Value; - cacheValue: boolean; - constructor(value?: Value, cacheValue = true) { - this.cacheValue = cacheValue; - if (this.cacheValue) this.value = value; + constructor(value?: T) { + this.observable = new ControlledObservable(); + + this.value = value; + this.subscribe = this.observable.subscribe.bind(this.observable); } - next(value: Value) { - if (this.value === value) return; - - if (this.cacheValue) this.value = value; - for (const [listener, ctx] of this.listeners) { - if (ctx) listener.call(ctx, value); - else listener(value); - } - return this; + next(v: T) { + this.value = v; + this.observable.next(v); + } + error(err: any) { + this.observable.error(err); } - private findListener(callback: ListenerFn, ctx?: Object) { - return this.listeners.find((l) => { - return l[0] === callback && l[1] === ctx; + [Symbol.observable]() { + return this.observable; + } + subscribe: Observable["subscribe"]; + + map(callback: (value: T) => R, defaultValue?: R): Subject { + const child = new Subject(defaultValue); + + this.subscribe((value) => { + try { + child.next(callback(value)); + } catch (e) { + child.error(e); + } + }); + + return child; + } + + /** @deprecated */ + connectWithMapper( + subject: Subject, + map: (value: R, next: (value: T) => void, current: T | undefined) => void, + ): ZenObservable.Subscription { + return subject.subscribe((value) => { + map(value, (v) => this.next(v), this.value); }); } - - subscribe(listener: ListenerFn, ctx?: Object, initCall = true) { - if (!this.findListener(listener, ctx)) { - this.listeners.push([listener, ctx]); - - if (initCall) { - if (this.value !== undefined) { - if (ctx) listener.call(ctx, this.value); - else listener(this.value); - } - } - } - return this; - } - unsubscribe(listener: ListenerFn, ctx?: Object) { - const entry = this.findListener(listener, ctx); - if (entry) { - this.listeners = this.listeners.filter((l) => l !== entry); - } - return this; - } - get hasListeners() { - return this.listeners.length > 0; - } - - upstream = new Map, ListenerFn>(); - - connect(connectable: Connectable) { - if (!this.upstream.has(connectable)) { - const handler = this.next.bind(this); - this.upstream.set(connectable, handler); - connectable.subscribe(handler, this); - - if (connectable.value !== undefined) { - handler(connectable.value); - } - } - return this; - } - connectWithHandler(connectable: Connectable, connection: Connection) { - if (!this.upstream.has(connectable)) { - const handler = (value: From) => { - connection(value, this.next.bind(this), this.value); - }; - this.upstream.set(connectable, handler); - connectable.subscribe(handler, this); - } - return this; - } - disconnect(connectable: Connectable) { - const handler = this.upstream.get(connectable); - if (handler) { - this.upstream.delete(connectable); - connectable.unsubscribe(handler, this); - } - return this; - } - disconnectAll() { - for (const [connectable, listener] of this.upstream) { - this.disconnect(connectable); - } - } } -export class PersistentSubject extends Subject implements ConnectableApi { - value: Value; - constructor(value: Value) { - super(value, true); +export class PersistentSubject extends Subject { + value: T; + constructor(value: T) { + super(); this.value = value; } } - -export default Subject; diff --git a/src/classes/timeline-loader.ts b/src/classes/timeline-loader.ts index 4a8000635..a715aae53 100644 --- a/src/classes/timeline-loader.ts +++ b/src/classes/timeline-loader.ts @@ -22,6 +22,7 @@ import { } from "../helpers/nostr/filter"; import { localRelay } from "../services/local-relay"; import { relayRequest } from "../helpers/relay"; +import SuperMap from "./super-map"; const BLOCK_SIZE = 100; @@ -32,6 +33,7 @@ export class RelayBlockLoader { filter: NostrRequestFilter; blockSize = BLOCK_SIZE; private log: Debugger; + private subs: ZenObservable.Subscription[] = []; loading = false; events: EventStore; @@ -47,7 +49,7 @@ export class RelayBlockLoader { this.log = log || logger.extend(relay); this.events = new EventStore(relay); - deleteEventService.stream.subscribe(this.handleDeleteEvent, this); + this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e))); } loadNextBlock() { @@ -91,7 +93,8 @@ export class RelayBlockLoader { } cleanup() { - deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this); + for (const sub of this.subs) sub.unsubscribe(); + this.subs = []; } getFirstEvent(nth = 0, eventFilter?: EventFilter) { @@ -124,16 +127,15 @@ export default class TimelineLoader { this.name = name; this.log = logger.extend("TimelineLoader:" + name); this.events = new EventStore(name); + this.events.connect(replaceableEventLoaderService.events, false); this.subscription = new NostrMultiSubscription(name); - this.subscription.onEvent.subscribe(this.handleEvent, this); + this.subscription.onEvent.subscribe(this.handleEvent.bind(this)); // update the timeline when there are new events - this.events.onEvent.subscribe(this.throttleUpdateTimeline, this); - this.events.onDelete.subscribe(this.throttleUpdateTimeline, this); - this.events.onClear.subscribe(this.throttleUpdateTimeline, this); - - deleteEventService.stream.subscribe(this.handleDeleteEvent, this); + this.events.onEvent.subscribe(this.throttleUpdateTimeline.bind(this)); + this.events.onDelete.subscribe(this.throttleUpdateTimeline.bind(this)); + this.events.onClear.subscribe(this.throttleUpdateTimeline.bind(this)); } private throttleUpdateTimeline = _throttle(this.updateTimeline, 10); @@ -150,24 +152,20 @@ export default class TimelineLoader { this.events.addEvent(event); if (cache) localRelay.publish(event); } - private handleDeleteEvent(deleteEvent: NostrEvent) { - const cord = deleteEvent.tags.find(isATag)?.[1]; - const eventId = deleteEvent.tags.find(isETag)?.[1]; - - if (cord) this.events.deleteEvent(cord); - if (eventId) this.events.deleteEvent(eventId); - } + private blockLoaderSubs = new SuperMap(() => []); private connectToBlockLoader(loader: RelayBlockLoader) { this.events.connect(loader.events); - loader.onBlockFinish.subscribe(this.updateLoading, this); - loader.onBlockFinish.subscribe(this.updateComplete, this); + const subs = this.blockLoaderSubs.get(loader); + subs.push(loader.onBlockFinish.subscribe(this.updateLoading.bind(this))); + subs.push(loader.onBlockFinish.subscribe(this.updateComplete.bind(this))); } private disconnectToBlockLoader(loader: RelayBlockLoader) { loader.cleanup(); this.events.disconnect(loader.events); - loader.onBlockFinish.unsubscribe(this.updateLoading, this); - loader.onBlockFinish.unsubscribe(this.updateComplete, this); + const subs = this.blockLoaderSubs.get(loader); + for (const sub of subs) sub.unsubscribe(); + this.blockLoaderSubs.delete(loader); } private loadQueriesFromCache(queryMap: RelayQueryMap) { @@ -305,7 +303,5 @@ export default class TimelineLoader { this.blockLoaders.clear(); this.events.cleanup(); - - deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this); } } diff --git a/src/components/timeline-page/generic-note-timeline/index.tsx b/src/components/timeline-page/generic-note-timeline/index.tsx index d76b3de78..e8144b033 100644 --- a/src/components/timeline-page/generic-note-timeline/index.tsx +++ b/src/components/timeline-page/generic-note-timeline/index.tsx @@ -132,9 +132,9 @@ function GenericNoteTimeline({ timeline }: { timeline: TimelineLoader }) { } }; - intersectionSubject.subscribe(listener); + const sub = intersectionSubject.subscribe(listener); return () => { - intersectionSubject.unsubscribe(listener); + sub.unsubscribe(); }; }, [ setPinDate, diff --git a/src/hooks/use-events-reactions.ts b/src/hooks/use-events-reactions.ts index 18f4fb096..2637f6eb5 100644 --- a/src/hooks/use-events-reactions.ts +++ b/src/hooks/use-events-reactions.ts @@ -1,8 +1,9 @@ -import { useEffect, useMemo, useState } from "react"; +import { useMemo, useState } from "react"; import eventReactionsService from "../services/event-reactions"; import { useReadRelays } from "./use-client-relays"; import { NostrEvent } from "../types/nostr-event"; import Subject from "../classes/subject"; +import useSubjects from "./use-subjects"; export default function useEventsReactions( eventIds: string[], @@ -29,17 +30,7 @@ export default function useEventsReactions( const [_, update] = useState(0); // subscribe to subjects - useEffect(() => { - const listener = () => update((v) => v + 1); - for (const [_, sub] of Object.entries(subjects)) { - sub?.subscribe(listener, undefined, false); - } - return () => { - for (const [_, sub] of Object.entries(subjects)) { - sub?.unsubscribe(listener, undefined); - } - }; - }, [subjects, update]); + useSubjects(Object.values(subjects)); return reactions; } diff --git a/src/hooks/use-subject.ts b/src/hooks/use-subject.ts index 06a062dd9..a5e8bf98e 100644 --- a/src/hooks/use-subject.ts +++ b/src/hooks/use-subject.ts @@ -1,16 +1,16 @@ -import { useEffect, useState } from "react"; -import { PersistentSubject, Subject } from "../classes/subject"; +import { useEffect, useRef, useState } from "react"; +import Subject, { PersistentSubject } from "../classes/subject"; function useSubject(subject: PersistentSubject): Value; function useSubject(subject?: PersistentSubject): Value | undefined; function useSubject(subject?: Subject): Value | undefined; function useSubject(subject?: Subject) { const [_, setValue] = useState(subject?.value); + const subRef = useRef(subject); useEffect(() => { - subject?.subscribe(setValue, undefined, false); - return () => { - subject?.unsubscribe(setValue, undefined); - }; + if (subject?.value !== undefined) setValue(subject?.value); + const sub = subject?.subscribe((v) => setValue(v)); + return () => sub?.unsubscribe(); }, [subject, setValue]); return subject?.value; diff --git a/src/hooks/use-subjects.ts b/src/hooks/use-subjects.ts index a005817c7..3e5a0a2c4 100644 --- a/src/hooks/use-subjects.ts +++ b/src/hooks/use-subjects.ts @@ -1,5 +1,5 @@ import { useEffect, useState } from "react"; -import { PersistentSubject, Subject } from "../classes/subject"; +import Subject, { PersistentSubject } from "../classes/subject"; function useSubjects( subjects: (Subject | PersistentSubject | undefined)[] = [], @@ -9,13 +9,9 @@ function useSubjects( useEffect(() => { const listener = () => update((v) => v + 1); - for (const sub of subjects) { - sub?.subscribe(listener, undefined, false); - } + const subs = subjects.map((s) => s?.subscribe(listener)); return () => { - for (const sub of subjects) { - sub?.unsubscribe(listener, undefined); - } + for (const sub of subs) sub?.unsubscribe(); }; }, [subjects, update]); diff --git a/src/providers/local/intersection-observer.tsx b/src/providers/local/intersection-observer.tsx index 3f76d5ed0..da6bda8e2 100644 --- a/src/providers/local/intersection-observer.tsx +++ b/src/providers/local/intersection-observer.tsx @@ -75,7 +75,7 @@ export default function IntersectionObserverProvider({ callback: ExtendedIntersectionObserverCallback; }) { const elementIds = useMemo(() => new WeakMap(), []); - const [subject] = useState(() => new Subject([], false)); + const [subject] = useState(() => new Subject([])); const handleIntersection = useCallback( (entries, observer) => { diff --git a/src/services/channel-metadata.ts b/src/services/channel-metadata.ts index 5a12d8505..c8d10a1a3 100644 --- a/src/services/channel-metadata.ts +++ b/src/services/channel-metadata.ts @@ -30,7 +30,7 @@ const RELAY_REQUEST_BATCH_TIME = 1000; /** This class is ued to batch requests to a single relay */ class ChannelMetadataRelayLoader { private subscription: NostrSubscription; - private events = new SuperMap>(() => new Subject()); + private events = new SuperMap>(() => new Subject()); private requestNext = new Set(); private requested = new Map(); @@ -234,7 +234,7 @@ class ChannelMetadataService { for (const relay of relayUrls) { const request = this.loaders.get(relay).requestMetadata(channelId); - sub.connectWithHandler(request, (event, next, current) => { + sub.connectWithMapper(request, (event, next, current) => { if (!current || event.created_at > current.created_at) { next(event); this.saveToCache(channelId, event); diff --git a/src/services/db/index.ts b/src/services/db/index.ts index b0f9f710b..570308597 100644 --- a/src/services/db/index.ts +++ b/src/services/db/index.ts @@ -203,6 +203,7 @@ export async function deleteDatabase() { db.close(); log("Deleting"); await deleteDB(dbName); + localDatabase.close(); await nostrIDBDelete(); window.location.reload(); } diff --git a/src/services/delete-events.ts b/src/services/delete-events.ts index c914a9119..99b1d7263 100644 --- a/src/services/delete-events.ts +++ b/src/services/delete-events.ts @@ -1,10 +1,10 @@ import { kinds } from "nostr-tools"; -import Subject from "../classes/subject"; import { getEventUID } from "../helpers/nostr/events"; import { NostrEvent } from "../types/nostr-event"; +import ControlledObservable from "../classes/controlled-observable"; -const deleteEventStream = new Subject(); +const deleteEventStream = new ControlledObservable(); function handleEvent(deleteEvent: NostrEvent) { if (deleteEvent.kind !== kinds.EventDeletion) return; diff --git a/src/services/event-reactions.ts b/src/services/event-reactions.ts index d4a6841e6..60ebae9f1 100644 --- a/src/services/event-reactions.ts +++ b/src/services/event-reactions.ts @@ -74,7 +74,7 @@ class EventReactionsService { if (filters.length > 0) { const request = new NostrRequest([relay]); - request.onEvent.subscribe(this.handleEvent, this); + request.onEvent.subscribe((e) => this.handleEvent(e)); request.start(filters); } } diff --git a/src/services/event-zaps.ts b/src/services/event-zaps.ts index 463a905b6..9a9679921 100644 --- a/src/services/event-zaps.ts +++ b/src/services/event-zaps.ts @@ -74,7 +74,7 @@ class EventZapsService { if (filter.length > 0) { const request = new NostrRequest([relay]); - request.onEvent.subscribe(this.handleEvent, this); + request.onEvent.subscribe((e) => this.handleEvent(e)); request.start(filter); } } diff --git a/src/services/local-relay.ts b/src/services/local-relay.ts index 2b1618cfd..5e2484434 100644 --- a/src/services/local-relay.ts +++ b/src/services/local-relay.ts @@ -1,7 +1,6 @@ -import { CacheRelay, openDB, pruneLastUsed } from "nostr-idb"; +import { CacheRelay, openDB } from "nostr-idb"; import { Relay } from "nostr-tools"; import { logger } from "../helpers/debug"; -import _throttle from "lodash.throttle"; import { safeRelayUrl } from "../helpers/relay"; // save the local relay from query params to localStorage diff --git a/src/services/nostr-connect.ts b/src/services/nostr-connect.ts index c59416190..118559413 100644 --- a/src/services/nostr-connect.ts +++ b/src/services/nostr-connect.ts @@ -1,6 +1,7 @@ import { finalizeEvent, generateSecretKey, getPublicKey, kinds, nip04, nip19 } from "nostr-tools"; import dayjs from "dayjs"; import { nanoid } from "nanoid"; +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; import NostrMultiSubscription from "../classes/nostr-multi-subscription"; import { getPubkeyFromDecodeResult, isHexKey, normalizeToHexPubkey } from "../helpers/nip19"; @@ -10,7 +11,6 @@ import { DraftNostrEvent, NostrEvent, isPTag } from "../types/nostr-event"; import createDefer, { Deferred } from "../classes/deferred"; import { truncatedId } from "../helpers/nostr/events"; import { NostrConnectAccount } from "./account"; -import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; import { safeRelayUrl } from "../helpers/relay"; export function isErrorResponse(response: any): response is NostrConnectErrorResponse { @@ -79,7 +79,7 @@ export class NostrConnectClient { this.secretKey = secretKey || bytesToHex(generateSecretKey()); this.publicKey = getPublicKey(hexToBytes(this.secretKey)); - this.sub.onEvent.subscribe(this.handleEvent, this); + this.sub.onEvent.subscribe((e) => this.handleEvent(e)); this.sub.setQueryMap( createSimpleQueryMap(this.relays, { kinds: [kinds.NostrConnect, 24134], diff --git a/src/services/relay-stats.ts b/src/services/relay-stats.ts index 06bce9bf0..ee39b1cd1 100644 --- a/src/services/relay-stats.ts +++ b/src/services/relay-stats.ts @@ -51,7 +51,7 @@ class RelayStatsService { if (!info.pubkey) return sub.next(null); const request = new NostrRequest([relay, MONITOR_RELAY]); - request.onEvent.subscribe(this.handleEvent, this); + request.onEvent.subscribe((e) => this.handleEvent(e)); request.start({ kinds: [SELF_REPORTED_KIND], authors: [info.pubkey] }); }); } @@ -75,7 +75,7 @@ class RelayStatsService { const relays = Array.from(this.pendingMonitorStats); const request = new NostrRequest([MONITOR_RELAY]); - request.onEvent.subscribe(this.handleEvent, this); + request.onEvent.subscribe((e) => this.handleEvent(e)); request.start({ since: 1704196800, kinds: [MONITOR_STATS_KIND], "#d": relays, authors: [MONITOR_PUBKEY] }); this.pendingMonitorStats.clear(); diff --git a/src/services/replaceable-event-requester.ts b/src/services/replaceable-event-requester.ts index 3b3eb1f18..f2f5a2ccd 100644 --- a/src/services/replaceable-event-requester.ts +++ b/src/services/replaceable-event-requester.ts @@ -6,7 +6,6 @@ import { Filter } from "nostr-tools"; import NostrSubscription from "../classes/nostr-subscription"; import SuperMap from "../classes/super-map"; import { NostrEvent } from "../types/nostr-event"; -import Subject from "../classes/subject"; import { NostrQuery } from "../types/nostr-query"; import { logger } from "../helpers/debug"; import { nameOrPubkey } from "./user-metadata"; @@ -14,6 +13,8 @@ import { getEventCoordinate } from "../helpers/nostr/events"; import createDefer, { Deferred } from "../classes/deferred"; import { localRelay } from "./local-relay"; import { relayRequest } from "../helpers/relay"; +import EventStore from "../classes/event-store"; +import Subject from "../classes/subject"; type Pubkey = string; type Relay = string; @@ -39,7 +40,7 @@ const RELAY_REQUEST_BATCH_TIME = 500; /** This class is ued to batch requests to a single relay */ class ReplaceableEventRelayLoader { private subscription: NostrSubscription; - private events = new SuperMap>(() => new Subject()); + events = new EventStore(); private requestNext = new Set(); private requested = new Map(); @@ -58,14 +59,12 @@ class ReplaceableEventRelayLoader { private handleEvent(event: NostrEvent) { const cord = getEventCoordinate(event); - // remove the pubkey from the waiting list + // remove the cord from the waiting list this.requested.delete(cord); - const sub = this.events.get(cord); - - const current = sub.value; + const current = this.events.getEvent(cord); if (!current || event.created_at > current.created_at) { - sub.next(event); + this.events.addEvent(event); } } private handleEOSE() { @@ -73,19 +72,13 @@ class ReplaceableEventRelayLoader { this.requested.clear(); } - getEvent(kind: number, pubkey: string, d?: string) { - return this.events.get(createCoordinate(kind, pubkey, d)); - } - requestEvent(kind: number, pubkey: string, d?: string) { const cord = createCoordinate(kind, pubkey, d); - const event = this.events.get(cord); - - if (!event.value) { + const event = this.events.getEvent(cord); + if (!event) { this.requestNext.add(cord); this.updateThrottle(); } - return event; } @@ -152,11 +145,14 @@ const READ_CACHE_BATCH_TIME = 250; const WRITE_CACHE_BATCH_TIME = 250; class ReplaceableEventLoaderService { - private events = new SuperMap>(() => new Subject()); + private subjects = new SuperMap>(() => new Subject()); + private loaders = new SuperMap((relay) => { + const loader = new ReplaceableEventRelayLoader(relay, this.log.extend(relay)); + loader.events.onEvent.subscribe((e) => this.handleEvent(e)); + return loader; + }); - private loaders = new SuperMap( - (relay) => new ReplaceableEventRelayLoader(relay, this.log.extend(relay)), - ); + events = new EventStore(); log = logger.extend("ReplaceableEventLoader"); dbLog = this.log.extend("database"); @@ -164,18 +160,17 @@ class ReplaceableEventLoaderService { handleEvent(event: NostrEvent, saveToCache = true) { const cord = getEventCoordinate(event); - const sub = this.events.get(cord); - const current = sub.value; + const subject = this.subjects.get(cord); + const current = subject.value; if (!current || event.created_at > current.created_at) { - sub.next(event); - if (saveToCache) { - this.saveToCache(cord, event); - } + subject.next(event); + this.events.addEvent(event); + if (saveToCache) this.saveToCache(cord, event); } } getEvent(kind: number, pubkey: string, d?: string) { - return this.events.get(createCoordinate(kind, pubkey, d)); + return this.subjects.get(createCoordinate(kind, pubkey, d)); } private readFromCachePromises = new Map>(); @@ -248,25 +243,16 @@ class ReplaceableEventLoaderService { private requestEventFromRelays(relays: Iterable, kind: number, pubkey: string, d?: string) { const cord = createCoordinate(kind, pubkey, d); - const sub = this.events.get(cord); + const sub = this.subjects.get(cord); - for (const relay of relays) { - const request = this.loaders.get(relay).requestEvent(kind, pubkey, d); - - sub.connectWithHandler(request, (event, next, current) => { - if (!current || event.created_at > current.created_at) { - next(event); - this.saveToCache(cord, event); - } - }); - } + for (const relay of relays) this.loaders.get(relay).requestEvent(kind, pubkey, d); return sub; } requestEvent(relays: Iterable, kind: number, pubkey: string, d?: string, opts: RequestOptions = {}) { const cord = createCoordinate(kind, pubkey, d); - const sub = this.events.get(cord); + const sub = this.subjects.get(cord); if (!sub.value) { this.loadFromCache(cord).then((loaded) => { diff --git a/src/services/settings/app-settings.ts b/src/services/settings/app-settings.ts index 926e35dd9..46b26af9c 100644 --- a/src/services/settings/app-settings.ts +++ b/src/services/settings/app-settings.ts @@ -12,6 +12,7 @@ appSettings.subscribe((event) => { log(`Changed`, event); }); +let accountSub: ZenObservable.Subscription; accountService.current.subscribe(() => { const account = accountService.current.value; @@ -20,7 +21,7 @@ accountService.current.subscribe(() => { return; } - appSettings.disconnectAll(); + if (accountSub) accountSub.unsubscribe(); if (account.localSettings) { appSettings.next(account.localSettings); @@ -30,8 +31,8 @@ accountService.current.subscribe(() => { const subject = userAppSettings.requestAppSettings(account.pubkey, clientRelaysService.readRelays.value, { alwaysRequest: true, }); - appSettings.next(defaultSettings); - appSettings.connect(subject); + appSettings.next(subject.value || defaultSettings); + accountSub = subject.subscribe((s) => appSettings.next(s)); }); // clientRelaysService.relays.subscribe(() => { diff --git a/src/services/settings/user-app-settings.ts b/src/services/settings/user-app-settings.ts index 9fee29c98..321b86122 100644 --- a/src/services/settings/user-app-settings.ts +++ b/src/services/settings/user-app-settings.ts @@ -26,7 +26,7 @@ class UserAppSettings { SETTING_EVENT_IDENTIFIER, opts, ); - sub.connectWithHandler(requestSub, (event, next) => next(parseAppSettings(event))); + sub.connectWithMapper(requestSub, (event, next) => next(parseAppSettings(event))); return sub; } diff --git a/src/services/signing.tsx b/src/services/signing.tsx index 9d9741951..db7d83776 100644 --- a/src/services/signing.tsx +++ b/src/services/signing.tsx @@ -181,7 +181,7 @@ class SigningService { await client.ensureConnected(); return await client.nip04Encrypt(pubkey, text); default: - throw new Error("Unknown connection type"); + throw new Error("Unknown account type"); } } } diff --git a/src/services/single-event.ts b/src/services/single-event.ts index 7d2847f7a..79df75e4a 100644 --- a/src/services/single-event.ts +++ b/src/services/single-event.ts @@ -1,22 +1,22 @@ import _throttle from "lodash.throttle"; import NostrRequest from "../classes/nostr-request"; -import Subject from "../classes/subject"; import SuperMap from "../classes/super-map"; import { NostrEvent } from "../types/nostr-event"; import { localRelay } from "./local-relay"; import { relayRequest, safeRelayUrls } from "../helpers/relay"; import { logger } from "../helpers/debug"; +import Subject from "../classes/subject"; const RELAY_REQUEST_BATCH_TIME = 500; class SingleEventService { - private cache = new SuperMap>(() => new Subject()); + private subjects = new SuperMap>(() => new Subject()); pending = new Map(); log = logger.extend("SingleEvent"); requestEvent(id: string, relays: Iterable) { - const subject = this.cache.get(id); + const subject = this.subjects.get(id); if (subject.value) return subject; const safeURLs = safeRelayUrls(Array.from(relays)); @@ -28,8 +28,7 @@ class SingleEventService { } handleEvent(event: NostrEvent, cache = true) { - this.cache.get(event.id).next(event); - + this.subjects.get(event.id).next(event); if (cache) localRelay.publish(event); } @@ -62,7 +61,7 @@ class SingleEventService { for (const [relay, ids] of Object.entries(idsFromRelays)) { const request = new NostrRequest([relay]); - request.onEvent.subscribe(this.handleEvent, this); + request.onEvent.subscribe((event) => this.handleEvent(event)); request.start({ ids }); } this.pending.clear(); @@ -71,4 +70,9 @@ class SingleEventService { const singleEventService = new SingleEventService(); +if (import.meta.env.DEV) { + //@ts-expect-error + window.singleEventService = singleEventService; +} + export default singleEventService; diff --git a/src/services/user-mailboxes.ts b/src/services/user-mailboxes.ts index 61c99174d..a2fe92757 100644 --- a/src/services/user-mailboxes.ts +++ b/src/services/user-mailboxes.ts @@ -29,18 +29,19 @@ function nip65ToUserMailboxes(event: NostrEvent): UserMailboxes { } class UserMailboxesService { - private subjects = new SuperMap>(() => new Subject()); + private subjects = new SuperMap>((pubkey) => + replaceableEventLoaderService.getEvent(kinds.RelayList, pubkey).map(nip65ToUserMailboxes), + ); getMailboxes(pubkey: string) { return this.subjects.get(pubkey); } requestMailboxes(pubkey: string, relays: Iterable, opts: RequestOptions = {}) { const sub = this.subjects.get(pubkey); - const requestSub = replaceableEventLoaderService.requestEvent(relays, kinds.RelayList, pubkey, undefined, opts); - sub.connectWithHandler(requestSub, (event, next) => next(nip65ToUserMailboxes(event))); + replaceableEventLoaderService.requestEvent(relays, kinds.RelayList, pubkey, undefined, opts); // also fetch the relays from the users contacts const contactsSub = replaceableEventLoaderService.requestEvent(relays, kinds.Contacts, pubkey, undefined, opts); - sub.connectWithHandler(contactsSub, (event, next, value) => { + sub.connectWithMapper(contactsSub, (event, next, value) => { // NOTE: only use relays from contact list if the user dose not have a NIP-65 relay list const relays = relaysFromContactsEvent(event); if (relays.length > 0 && !value) { @@ -60,12 +61,8 @@ class UserMailboxesService { async loadFromCache(pubkey: string) { const sub = this.subjects.get(pubkey); - - // load from cache await replaceableEventLoaderService.loadFromCache(createCoordinate(kinds.RelayList, pubkey)); - - const requestSub = replaceableEventLoaderService.getEvent(kinds.RelayList, pubkey); - sub.connectWithHandler(requestSub, (event, next) => next(nip65ToUserMailboxes(event))); + return sub; } receiveEvent(event: NostrEvent) { diff --git a/src/services/user-metadata.ts b/src/services/user-metadata.ts index 798ee48fd..8de1379e7 100644 --- a/src/services/user-metadata.ts +++ b/src/services/user-metadata.ts @@ -1,53 +1,22 @@ -import db from "./db"; import { kinds } from "nostr-tools"; import _throttle from "lodash.throttle"; -import { Kind0ParsedContent, getSearchNames, parseKind0Event } from "../helpers/user-metadata"; +import { Kind0ParsedContent, parseKind0Event } from "../helpers/user-metadata"; import SuperMap from "../classes/super-map"; import Subject from "../classes/subject"; import replaceableEventLoaderService, { RequestOptions } from "./replaceable-event-requester"; -const WRITE_USER_SEARCH_BATCH_TIME = 500; - class UserMetadataService { private metadata = new SuperMap>((pubkey) => { - const sub = new Subject(); - sub.subscribe((metadata) => { - if (metadata) { - this.writeSearchQueue.add(pubkey); - this.writeSearchDataThrottle(); - } - }); - return sub; + return replaceableEventLoaderService.getEvent(0, pubkey).map(parseKind0Event); }); getSubject(pubkey: string) { return this.metadata.get(pubkey); } requestMetadata(pubkey: string, relays: Iterable, opts: RequestOptions = {}) { - const sub = this.metadata.get(pubkey); - const requestSub = replaceableEventLoaderService.requestEvent(relays, kinds.Metadata, pubkey, undefined, opts); - sub.connectWithHandler(requestSub, (event, next) => next(parseKind0Event(event))); - return sub; - } - - private writeSearchQueue = new Set(); - private writeSearchDataThrottle = _throttle(this.writeSearchData.bind(this), WRITE_USER_SEARCH_BATCH_TIME); - private async writeSearchData() { - if (this.writeSearchQueue.size === 0) return; - - const keys = Array.from(this.writeSearchQueue); - this.writeSearchQueue.clear(); - - const transaction = db.transaction("userSearch", "readwrite"); - for (const pubkey of keys) { - const metadata = this.getSubject(pubkey).value; - if (metadata) { - const names = getSearchNames(metadata); - transaction.objectStore("userSearch").put({ pubkey, names }); - } - } - transaction.commit(); - await transaction.done; + const subject = this.metadata.get(pubkey); + replaceableEventLoaderService.requestEvent(relays, kinds.Metadata, pubkey, undefined, opts); + return subject; } } diff --git a/src/services/username-search.ts b/src/services/username-search.ts new file mode 100644 index 000000000..b09019234 --- /dev/null +++ b/src/services/username-search.ts @@ -0,0 +1,33 @@ +import _throttle from "lodash.throttle"; +import { getSearchNames } from "../helpers/user-metadata"; +import db from "./db"; +import replaceableEventLoaderService from "./replaceable-event-requester"; +import userMetadataService from "./user-metadata"; + +const WRITE_USER_SEARCH_BATCH_TIME = 500; + +const writeSearchQueue = new Set(); +const writeSearchData = _throttle(async () => { + if (writeSearchQueue.size === 0) return; + + const keys = Array.from(writeSearchQueue); + writeSearchQueue.clear(); + + const transaction = db.transaction("userSearch", "readwrite"); + for (const pubkey of keys) { + const metadata = userMetadataService.getSubject(pubkey).value; + if (metadata) { + const names = getSearchNames(metadata); + transaction.objectStore("userSearch").put({ pubkey, names }); + } + } + transaction.commit(); + await transaction.done; +}, WRITE_USER_SEARCH_BATCH_TIME); + +replaceableEventLoaderService.events.onEvent.subscribe((event) => { + if (event.kind === 0) { + writeSearchQueue.add(event.pubkey); + writeSearchData(); + } +}); diff --git a/src/views/relays/relay/relay-details.tsx b/src/views/relays/relay/relay-details.tsx index c96c7afa0..701b53bc1 100644 --- a/src/views/relays/relay/relay-details.tsx +++ b/src/views/relays/relay/relay-details.tsx @@ -155,9 +155,11 @@ export default function RelayDetailsTab({ relay }: { relay: string }) { const loadMore = useCallback(() => { setLoading(true); const request = new NostrRequest([relay]); - request.onEvent.subscribe(store.addEvent, store); const throttle = _throttle(() => update({}), 100); - request.onEvent.subscribe(() => throttle()); + request.onEvent.subscribe((e) => { + store.addEvent(e); + throttle(); + }); request.onComplete.then(() => setLoading(false)); const query: NostrQuery = { limit: 500 }; diff --git a/src/views/streams/dashboard/users-card.tsx b/src/views/streams/dashboard/users-card.tsx index 2d2268bb2..8a1222401 100644 --- a/src/views/streams/dashboard/users-card.tsx +++ b/src/views/streams/dashboard/users-card.tsx @@ -1,8 +1,8 @@ import { ReactNode, memo, useMemo, useState } from "react"; import { Button, ButtonGroup, Divider, Flex, Heading } from "@chakra-ui/react"; import dayjs from "dayjs"; +import { useInterval, useObservable } from "react-use"; -import useSubject from "../../../hooks/use-subject"; import useCurrentAccount from "../../../hooks/use-current-account"; import useStreamChatTimeline from "../stream/stream-chat/use-stream-chat-timeline"; import UserAvatar from "../../../components/user-avatar"; @@ -12,7 +12,6 @@ import { useMuteModalContext } from "../../../providers/route/mute-modal-provide import useUserMuteList from "../../../hooks/use-user-mute-list"; import { isPubkeyInList } from "../../../helpers/nostr/lists"; import { ParsedStream } from "../../../helpers/nostr/stream"; -import { useInterval } from "react-use"; function Countdown({ time }: { time: number }) { const [now, setNow] = useState(dayjs().unix()); @@ -61,7 +60,7 @@ function UsersCard({ stream }: { stream: ParsedStream }) { const streamChatTimeline = useStreamChatTimeline(stream); // refresh when a new event - useSubject(streamChatTimeline.events.onEvent); + useObservable(streamChatTimeline.events.onEvent); const chatEvents = streamChatTimeline.events.getSortedEvents(); const muteList = useUserMuteList(account.pubkey); diff --git a/src/views/streams/dashboard/zaps-card.tsx b/src/views/streams/dashboard/zaps-card.tsx index aad1449fb..51c11d77e 100644 --- a/src/views/streams/dashboard/zaps-card.tsx +++ b/src/views/streams/dashboard/zaps-card.tsx @@ -1,8 +1,8 @@ import { memo } from "react"; import { Flex } from "@chakra-ui/react"; import { kinds } from "nostr-tools"; +import { useObservable } from "react-use"; -import useSubject from "../../../hooks/use-subject"; import useStreamChatTimeline from "../stream/stream-chat/use-stream-chat-timeline"; import ZapMessageMemo from "../stream/stream-chat/zap-message"; import { ParsedStream } from "../../../helpers/nostr/stream"; @@ -11,7 +11,7 @@ function ZapsCard({ stream }: { stream: ParsedStream }) { const streamChatTimeline = useStreamChatTimeline(stream); // refresh when a new event - useSubject(streamChatTimeline.events.onEvent); + useObservable(streamChatTimeline.events.onEvent); const zapMessages = streamChatTimeline.events.getSortedEvents().filter((event) => { if (stream.starts && event.created_at < stream.starts) return false; if (stream.ends && event.created_at > stream.ends) return false; diff --git a/src/views/tools/network-dm-graph.tsx b/src/views/tools/network-dm-graph.tsx index 5b97fb31c..5e430d486 100644 --- a/src/views/tools/network-dm-graph.tsx +++ b/src/views/tools/network-dm-graph.tsx @@ -5,6 +5,7 @@ import ForceGraph, { LinkObject, NodeObject } from "react-force-graph-3d"; import { kinds } from "nostr-tools"; import dayjs from "dayjs"; import { useNavigate } from "react-router-dom"; +import { useDebounce, useObservable } from "react-use"; import { Group, Mesh, @@ -25,8 +26,6 @@ import { useUserMetadata } from "../../hooks/use-user-metadata"; import EventStore from "../../classes/event-store"; import NostrRequest from "../../classes/nostr-request"; import { isPTag } from "../../types/nostr-event"; -import { useDebounce } from "react-use"; -import useSubject from "../../hooks/use-subject"; import { ChevronLeftIcon } from "../../components/icons"; import { useReadRelays } from "../../hooks/use-client-relays"; @@ -53,7 +52,7 @@ function NetworkDMGraphPage() { store.clear(); const request = new NostrRequest(relays); - request.onEvent.subscribe(store.addEvent, store); + request.onEvent.subscribe((e) => store.addEvent(e)); request.start({ authors: contactsPubkeys, kinds: [kinds.EncryptedDirectMessage], @@ -71,7 +70,7 @@ function NetworkDMGraphPage() { const selfMetadata = useUserMetadata(account.pubkey); const usersMetadata = useUsersMetadata(contactsPubkeys); - const newEventTrigger = useSubject(store.onEvent); + const newEventTrigger = useObservable(store.onEvent); const graphData = useMemo(() => { if (store.events.size === 0) return { nodes: [], links: [] }; diff --git a/yarn.lock b/yarn.lock index 8af1869ba..aa3d921cc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3012,6 +3012,11 @@ resolved "https://registry.yarnpkg.com/@types/webxr/-/webxr-0.5.13.tgz#5fd07863819c30869d66b765926d0b5a53a7e9e0" integrity sha512-Hi4K3aTEoaa31Cep75AA9wK5q2iZgC1L70serPbI11L4YieoZpu5LvLr6FZXyIdqkkGPh1WMuDf6oSPHJXBkoA== +"@types/zen-observable@^0.8.7": + version "0.8.7" + resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.7.tgz#114e2ffc8d5be4915fdd5bc90668fc0ceaadb760" + integrity sha512-LKzNTjj+2j09wAo/vvVjzgw5qckJJzhdGgWHW7j69QIGdq/KnZrMAMIHQiWGl3Ccflh5/CudBAntTPYdprPltA== + "@uiw/codemirror-extensions-basic-setup@4.21.21": version "4.21.21" resolved "https://registry.yarnpkg.com/@uiw/codemirror-extensions-basic-setup/-/codemirror-extensions-basic-setup-4.21.21.tgz#243ef309cb53253b14187649a7abc0d996420a20" @@ -7204,6 +7209,11 @@ yocto-queue@^0.1.0: resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== +zen-observable@^0.10.0: + version "0.10.0" + resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.10.0.tgz#ee10eba75272897dbee5f152ab26bb5e0107f0c8" + integrity sha512-iI3lT0iojZhKwT5DaFy2Ce42n3yFcLdFyOh01G7H0flMY60P8MJuVFEoJoNwXlmAyQ45GrjL6AcZmmlv8A5rbw== + zustand@^4.4.7: version "4.5.0" resolved "https://registry.yarnpkg.com/zustand/-/zustand-4.5.0.tgz#141354af56f91de378aa6c4b930032ab338f3ef0"