From 3e5c34e3fe44f57557dcfa4720a10ced9741ef6a Mon Sep 17 00:00:00 2001 From: hzrd149 Date: Sat, 4 May 2024 13:25:53 -0500 Subject: [PATCH] rebuild zap and reaction services using batch relation loader --- src/classes/batch-relation-loader.ts | 146 +++++++++++++++++++++++++++ src/services/event-reactions.ts | 126 +++++++++++------------ src/services/event-zaps.ts | 126 ++++++++++------------- src/services/single-event.ts | 1 + 4 files changed, 260 insertions(+), 139 deletions(-) create mode 100644 src/classes/batch-relation-loader.ts diff --git a/src/classes/batch-relation-loader.ts b/src/classes/batch-relation-loader.ts new file mode 100644 index 000000000..80ab2a346 --- /dev/null +++ b/src/classes/batch-relation-loader.ts @@ -0,0 +1,146 @@ +import { NostrEvent, AbstractRelay } from "nostr-tools"; +import _throttle from "lodash.throttle"; +import debug, { Debugger } from "debug"; + +import PersistentSubscription from "./persistent-subscription"; +import Process from "./process"; +import processManager from "../services/process-manager"; +import createDefer, { Deferred } from "./deferred"; +import Dataflow04 from "../components/icons/dataflow-04"; +import SuperMap from "./super-map"; +import Subject from "./subject"; + +/** Batches requests for events that reference another event (via #e tag) from a single relay */ +export default class BatchRelationLoader { + kinds: number[]; + relay: AbstractRelay; + process: Process; + + requested = new Set(); + references = new SuperMap>(() => new Map()); + + onEventUpdate = new Subject(); + + subscription: PersistentSubscription; + + // a map of events that are waiting for the current request to finish + private next = new Map>>(); + + // a map of events currently being requested from the relay + private pending = new Map>>(); + + log: Debugger; + + constructor(relay: AbstractRelay, kinds: number[], log?: Debugger) { + this.relay = relay; + this.kinds = kinds; + this.log = log || debug("BatchRelationLoader"); + this.process = new Process("BatchRelationLoader", this, [relay]); + this.process.icon = Dataflow04; + processManager.registerProcess(this.process); + + this.subscription = new PersistentSubscription(this.relay, { + onevent: (event) => this.handleEvent(event), + oneose: () => this.handleEOSE(), + }); + this.process.addChild(this.subscription.process); + } + + requestEvents(uid: string): Promise> { + // if there is a cache only return it if we have requested this id before + if (this.references.has(uid) && this.requested.has(uid)) { + return Promise.resolve(this.references.get(uid)); + } + + if (this.pending.has(uid)) return this.pending.get(uid)!; + if (this.next.has(uid)) return this.next.get(uid)!; + + const defer = createDefer>(); + this.next.set(uid, defer); + + // request subscription update + this.requestUpdate(); + + return defer; + } + + requestUpdate = _throttle( + () => { + // don't do anything if the subscription is already running + if (this.process.active) return; + + this.process.active = true; + this.update(); + }, + 500, + { leading: false, trailing: true }, + ); + + private handleEvent(event: NostrEvent) { + // add event to cache + const updateIds = new Set(); + for (const tag of event.tags) { + if (tag[0] === "e" && tag[1]) { + const id = tag[1]; + this.references.get(id).set(event.id, event); + updateIds.add(id); + } else if (tag[0] === "a" && tag[1]) { + const cord = tag[1]; + this.references.get(cord).set(event.id, event); + updateIds.add(cord); + } + } + + for (const id of updateIds) this.onEventUpdate.next(id); + } + private handleEOSE() { + // resolve all pending from the last request + for (const [uid, defer] of this.pending) { + defer.resolve(this.references.get(uid)); + } + + // reset + this.pending.clear(); + this.process.active = false; + + // do next request or close the subscription + if (this.next.size > 0) this.requestUpdate(); + } + + update() { + // copy everything from next to pending + for (const [uid, defer] of this.next) this.pending.set(uid, defer); + this.next.clear(); + + // update subscription + if (this.pending.size > 0) { + this.log(`Updating filters ${this.pending.size} events`); + + const ids: string[] = []; + const cords: string[] = []; + const uids = Array.from(this.pending.keys()); + for (const uid of uids) { + this.requested.add(uid); + + if (uid.includes(":")) cords.push(uid); + else ids.push(uid); + } + + this.subscription.filters = []; + if (ids.length > 0) this.subscription.filters.push({ "#e": ids, kinds: this.kinds }); + if (ids.length > 0) this.subscription.filters.push({ "#a": cords, kinds: this.kinds }); + + this.subscription.update(); + this.process.active = true; + } else { + this.log("Closing"); + this.subscription.close(); + this.process.active = false; + } + } + + destroy() { + this.process.remove(); + processManager.unregisterProcess(this.process); + } +} diff --git a/src/services/event-reactions.ts b/src/services/event-reactions.ts index 939ce9e17..c4d2bad5c 100644 --- a/src/services/event-reactions.ts +++ b/src/services/event-reactions.ts @@ -1,92 +1,82 @@ -import { Filter, kinds, nip25 } from "nostr-tools"; +import { AbstractRelay, kinds } from "nostr-tools"; import _throttle from "lodash.throttle"; import Subject from "../classes/subject"; import SuperMap from "../classes/super-map"; import { NostrEvent } from "../types/nostr-event"; import { localRelay } from "./local-relay"; -import { relayRequest } from "../helpers/relay"; import relayPoolService from "./relay-pool"; - -type eventId = string; -type relay = string; +import Process from "../classes/process"; +import { LightningIcon } from "../components/icons"; +import processManager from "./process-manager"; +import BatchRelationLoader from "../classes/batch-relation-loader"; +import { logger } from "../helpers/debug"; class EventReactionsService { - subjects = new SuperMap>(() => new Subject([])); - pending = new SuperMap>(() => new Set()); + log = logger.extend("EventReactionsService"); + process: Process; - requestReactions(eventId: string, relays: Iterable, alwaysRequest = true) { - const subject = this.subjects.get(eventId); + subjects = new SuperMap>(() => new Subject([])); - if (!subject.value || alwaysRequest) { - for (const relay of relays) { - this.pending.get(eventId).add(relay); + loaders = new SuperMap((relay) => { + const loader = new BatchRelationLoader(relay, [kinds.Reaction], this.log.extend(relay.url)); + this.process.addChild(loader.process); + loader.onEventUpdate.subscribe((id) => { + this.updateSubject(id); + }); + return loader; + }); + + constructor() { + this.process = new Process("EventReactionsService", this); + this.process.icon = LightningIcon; + this.process.active = true; + + processManager.registerProcess(this.process); + } + + // merged results from all loaders for a single event + private updateSubject(id: string) { + const ids = new Set(); + const events: NostrEvent[] = []; + const subject = this.subjects.get(id); + + for (const [relay, loader] of this.loaders) { + if (loader.references.has(id)) { + const other = loader.references.get(id); + for (const [_, e] of other) { + if (!ids.has(e.id)) { + ids.add(e.id); + events.push(e); + } + } } } - this.throttleBatchRequest(); + + subject.next(events); + } + + requestReactions(eventUID: string, urls: Iterable, alwaysRequest = true) { + const subject = this.subjects.get(eventUID); + if (subject.value && !alwaysRequest) return; + + if (localRelay) { + this.loaders.get(localRelay as AbstractRelay).requestEvents(eventUID); + } + + const relays = relayPoolService.getRelays(urls); + for (const relay of relays) { + this.loaders.get(relay).requestEvents(eventUID); + } return subject; } - - handleEvent(event: NostrEvent, cache = true) { - if (event.kind !== kinds.Reaction) return; - const pointer = nip25.getReactedEventPointer(event); - if (!pointer?.id) return; - - const subject = this.subjects.get(pointer.id); - if (!subject.value) { - subject.next([event]); - } else if (!subject.value.some((e) => e.id === event.id)) { - subject.next([...subject.value, event]); - } - - if (cache && localRelay) localRelay.publish(event); - } - - throttleBatchRequest = _throttle(this.batchRequests, 2000); - batchRequests() { - if (this.pending.size === 0) return; - - // load events from cache - const uids = Array.from(this.pending.keys()); - const ids = uids.filter((id) => !id.includes(":")); - const cords = uids.filter((id) => id.includes(":")); - const filters: Filter[] = []; - if (ids.length > 0) filters.push({ "#e": ids, kinds: [kinds.Reaction] }); - if (cords.length > 0) filters.push({ "#a": cords, kinds: [kinds.Reaction] }); - if (filters.length > 0 && localRelay) { - relayRequest(localRelay, filters).then((events) => events.forEach((e) => this.handleEvent(e, false))); - } - - const idsFromRelays: Record = {}; - for (const [id, relays] of this.pending) { - for (const relay of relays) { - idsFromRelays[relay] = idsFromRelays[relay] ?? []; - idsFromRelays[relay].push(id); - } - } - - for (const [relay, ids] of Object.entries(idsFromRelays)) { - const eventIds = ids.filter((id) => !id.includes(":")); - const coordinates = ids.filter((id) => id.includes(":")); - const filters: Filter[] = []; - if (eventIds.length > 0) filters.push({ "#e": eventIds, kinds: [kinds.Reaction] }); - if (coordinates.length > 0) filters.push({ "#a": coordinates, kinds: [kinds.Reaction] }); - - if (filters.length > 0) { - const subscription = relayPoolService - .requestRelay(relay) - .subscribe(filters, { onevent: (event) => this.handleEvent(event), oneose: () => subscription.close() }); - } - } - this.pending.clear(); - } } const eventReactionsService = new EventReactionsService(); if (import.meta.env.DEV) { - //@ts-expect-error + // @ts-ignore window.eventReactionsService = eventReactionsService; } diff --git a/src/services/event-zaps.ts b/src/services/event-zaps.ts index c57565758..98f463981 100644 --- a/src/services/event-zaps.ts +++ b/src/services/event-zaps.ts @@ -1,91 +1,75 @@ -import { Filter, kinds } from "nostr-tools"; +import { AbstractRelay, kinds } from "nostr-tools"; import _throttle from "lodash.throttle"; import Subject from "../classes/subject"; import SuperMap from "../classes/super-map"; -import { NostrEvent, isATag, isETag } from "../types/nostr-event"; -import { relayRequest } from "../helpers/relay"; +import { NostrEvent } from "../types/nostr-event"; import { localRelay } from "./local-relay"; import relayPoolService from "./relay-pool"; - -type eventUID = string; -type relay = string; +import Process from "../classes/process"; +import { LightningIcon } from "../components/icons"; +import processManager from "./process-manager"; +import BatchRelationLoader from "../classes/batch-relation-loader"; +import { logger } from "../helpers/debug"; class EventZapsService { - subjects = new SuperMap>(() => new Subject([])); - pending = new SuperMap>(() => new Set()); + log = logger.extend("EventZapsService"); + process: Process; - requestZaps(eventUID: eventUID, relays: Iterable, alwaysRequest = true) { - const subject = this.subjects.get(eventUID); + subjects = new SuperMap>(() => new Subject([])); - if (!subject.value || alwaysRequest) { - for (const relay of relays) { - this.pending.get(eventUID).add(relay); - } - } - this.throttleBatchRequest(); + loaders = new SuperMap((relay) => { + const loader = new BatchRelationLoader(relay, [kinds.Zap], this.log.extend(relay.url)); + this.process.addChild(loader.process); + loader.onEventUpdate.subscribe((id) => { + this.updateSubject(id); + }); + return loader; + }); - return subject; + constructor() { + this.process = new Process("EventZapsService", this); + this.process.icon = LightningIcon; + this.process.active = true; + + processManager.registerProcess(this.process); } - handleEvent(event: NostrEvent, cache = true) { - if (event.kind !== kinds.Zap) return; - const eventUID = event.tags.find(isETag)?.[1] ?? event.tags.find(isATag)?.[1]; - if (!eventUID) return; + // merged results from all loaders for a single event + private updateSubject(id: string) { + const ids = new Set(); + const events: NostrEvent[] = []; + const subject = this.subjects.get(id); - const subject = this.subjects.get(eventUID); - if (!subject.value) { - subject.next([event]); - } else if (!subject.value.some((e) => e.id === event.id)) { - subject.next([...subject.value, event]); - } - - if (cache && localRelay) localRelay.publish(event); - } - - throttleBatchRequest = _throttle(this.batchRequests, 2000); - batchRequests() { - if (this.pending.size === 0) return; - - // load events from cache - const uids = Array.from(this.pending.keys()); - const ids = uids.filter((id) => !id.includes(":")); - const cords = uids.filter((id) => id.includes(":")); - const filters: Filter[] = []; - if (ids.length > 0) filters.push({ "#e": ids, kinds: [kinds.Zap] }); - if (cords.length > 0) filters.push({ "#a": cords, kinds: [kinds.Zap] }); - if (filters.length > 0 && localRelay) { - relayRequest(localRelay, filters).then((events) => events.forEach((e) => this.handleEvent(e, false))); - } - - const idsFromRelays: Record = {}; - for (const [id, relays] of this.pending) { - for (const relay of relays) { - idsFromRelays[relay] = idsFromRelays[relay] ?? []; - idsFromRelays[relay].push(id); - } - } - - for (const [url, ids] of Object.entries(idsFromRelays)) { - const eventIds = ids.filter((id) => !id.includes(":")); - const coordinates = ids.filter((id) => id.includes(":")); - const filter: Filter[] = []; - if (eventIds.length > 0) filter.push({ "#e": eventIds, kinds: [kinds.Zap] }); - if (coordinates.length > 0) filter.push({ "#a": coordinates, kinds: [kinds.Zap] }); - - if (filter.length > 0) { - const relay = relayPoolService.getRelay(url); - if (relay) { - if (!relay.connected) relayPoolService.requestConnect(relay); - - const sub = relay.subscribe(filter, { - onevent: (event) => this.handleEvent(event), - oneose: () => sub.close(), - }); + for (const [relay, loader] of this.loaders) { + if (loader.references.has(id)) { + const other = loader.references.get(id); + for (const [_, e] of other) { + if (!ids.has(e.id)) { + ids.add(e.id); + events.push(e); + } } } } - this.pending.clear(); + + subject.next(events); + } + + requestZaps(eventUID: string, urls: Iterable, alwaysRequest = true) { + const subject = this.subjects.get(eventUID); + if (subject.value && !alwaysRequest) return; + + if (localRelay) { + this.loaders.get(localRelay as AbstractRelay).requestEvents(eventUID); + } + + const relays = relayPoolService.getRelays(urls); + for (const relay of relays) { + this.loaders.get(relay).requestEvents(eventUID); + } + + return subject; } } diff --git a/src/services/single-event.ts b/src/services/single-event.ts index cb46eced3..66867d483 100644 --- a/src/services/single-event.ts +++ b/src/services/single-event.ts @@ -34,6 +34,7 @@ class SingleEventService { constructor() { this.process = new Process("SingleEventService", this); this.process.icon = Code02; + this.process.active = true; processManager.registerProcess(this.process); // when an event is added to the store, pass it along to the subjects