diff --git a/src/classes/batch-kind-loader.ts b/src/classes/batch-kind-loader.ts index 93c5bdf94..429664d34 100644 --- a/src/classes/batch-kind-loader.ts +++ b/src/classes/batch-kind-loader.ts @@ -9,6 +9,7 @@ import PersistentSubscription from "./persistent-subscription"; import Process from "./process"; import BracketsX from "../components/icons/brackets-x"; import processManager from "../services/process-manager"; +import createDefer, { Deferred } from "./deferred"; export function createCoordinate(kind: number, pubkey: string, d?: string) { return `${kind}:${pubkey}${d ? ":" + d : ""}`; @@ -25,6 +26,7 @@ export default class BatchKindLoader { private requestNext = new Set(); private requested = new Map(); + private promises = new Map>(); log: Debugger; @@ -45,21 +47,52 @@ export default class BatchKindLoader { const current = this.events.getEvent(key); if (!current || event.created_at > current.created_at) { this.events.addEvent(event); + + // if there is a promise waiting, resolve with event + const defer = this.promises.get(key); + if (defer) { + this.promises.delete(key); + defer.resolve(event); + } } } private handleEOSE() { // relays says it has nothing left this.requested.clear(); + + // prune requests + const timeout = dayjs().subtract(1, "minute"); + for (const [key, date] of this.requested) { + if (dayjs(date).isBefore(timeout)) { + this.requested.delete(key); + + // if there is a promise waiting for this event, resolve null + const defer = this.promises.get(key); + if (defer) { + this.promises.delete(key); + defer.resolve(null); + } + } + } } - requestEvent(kind: number, pubkey: string, d?: string) { + requestEvent(kind: number, pubkey: string, d?: string): Promise { const key = createCoordinate(kind, pubkey, d); const event = this.events.getEvent(key); + if (!event) { + if (this.promises.has(key)) return this.promises.get(key)!; + + const p = createDefer(); + this.promises.set(key, p); + this.requestNext.add(key); this.updateThrottle(); + + return p; } - return event; + + return Promise.resolve(event); } updateThrottle = _throttle(this.update, RELAY_REQUEST_BATCH_TIME); @@ -73,15 +106,6 @@ export default class BatchKindLoader { } this.requestNext.clear(); - // prune requests - const timeout = dayjs().subtract(1, "minute"); - for (const [key, date] of this.requested) { - if (dayjs(date).isBefore(timeout)) { - this.requested.delete(key); - needsUpdate = true; - } - } - // update the subscription if (needsUpdate) { if (this.requested.size > 0) { diff --git a/src/classes/multi-subscription.ts b/src/classes/multi-subscription.ts index 169a167dd..e32dbc971 100644 --- a/src/classes/multi-subscription.ts +++ b/src/classes/multi-subscription.ts @@ -103,7 +103,7 @@ export default class MultiSubscription { } // create cache sub if it does not exist - if (!this.cacheSubscription) { + if (!this.cacheSubscription && localRelay) { this.cacheSubscription = new PersistentSubscription(localRelay as AbstractRelay, { onevent: (event) => this.handleEvent(event), }); @@ -111,7 +111,7 @@ export default class MultiSubscription { } // update cache sub filters if they changed - if (!isFilterEqual(this.cacheSubscription.filters, this.filters)) { + if (this.cacheSubscription && !isFilterEqual(this.cacheSubscription.filters, this.filters)) { this.cacheSubscription.filters = this.filters; this.cacheSubscription.fire(); } diff --git a/src/services/replaceable-events.ts b/src/services/replaceable-events.ts index 2be5090ec..fcbcb04fc 100644 --- a/src/services/replaceable-events.ts +++ b/src/services/replaceable-events.ts @@ -1,12 +1,10 @@ -import { Filter, NostrEvent } from "nostr-tools"; +import { AbstractRelay, NostrEvent } from "nostr-tools"; import _throttle from "lodash.throttle"; import SuperMap from "../classes/super-map"; import { logger } from "../helpers/debug"; import { getEventCoordinate } from "../helpers/nostr/event"; -import createDefer, { Deferred } from "../classes/deferred"; import { localRelay } from "./local-relay"; -import { relayRequest } from "../helpers/relay"; import EventStore from "../classes/event-store"; import Subject from "../classes/subject"; import BatchKindLoader, { createCoordinate } from "../classes/batch-kind-loader"; @@ -30,13 +28,14 @@ export function getHumanReadableCoordinate(kind: number, pubkey: string, d?: str return `${kind}:${truncateId(pubkey)}${d ? ":" + d : ""}`; } -const READ_CACHE_BATCH_TIME = 250; const WRITE_CACHE_BATCH_TIME = 250; class ReplaceableEventsService { process: Process; private subjects = new SuperMap>(() => new Subject()); + + private cacheLoader: BatchKindLoader | null = null; private loaders = new SuperMap((relay) => { const loader = new BatchKindLoader(relayPoolService.requestRelay(relay), this.log.extend(relay)); loader.events.onEvent.subscribe((e) => this.handleEvent(e)); @@ -47,13 +46,18 @@ class ReplaceableEventsService { events = new EventStore(); log = logger.extend("ReplaceableEventLoader"); - dbLog = this.log.extend("database"); constructor() { this.process = new Process("ReplaceableEventsService", this); this.process.icon = UserSquare; this.process.active = true; processManager.registerProcess(this.process); + + if (localRelay) { + this.cacheLoader = new BatchKindLoader(localRelay as AbstractRelay, this.log.extend("cache-relay")); + this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e)); + this.process.addChild(this.cacheLoader.process); + } } handleEvent(event: NostrEvent, saveToCache = true) { @@ -73,69 +77,13 @@ class ReplaceableEventsService { return this.subjects.get(createCoordinate(kind, pubkey, d)); } - private readFromCachePromises = new Map>(); - private readFromCacheThrottle = _throttle(this.readFromCache, READ_CACHE_BATCH_TIME); - private async readFromCache() { - if (this.readFromCachePromises.size === 0) return; - if (!localRelay) return; - - const loading = new Map>(); - - const kindFilters: Record = {}; - for (const [cord, p] of this.readFromCachePromises) { - const [kindStr, pubkey, d] = cord.split(":") as [string, string] | [string, string, string]; - const kind = parseInt(kindStr); - kindFilters[kind] = kindFilters[kind] || { kinds: [kind] }; - - const arr = (kindFilters[kind].authors = kindFilters[kind].authors || []); - arr.push(pubkey); - - if (d) { - const arr = (kindFilters[kind]["#d"] = kindFilters[kind]["#d"] || []); - arr.push(d); - } - - loading.set(cord, p); - } - const filters = Object.values(kindFilters); - - for (const [cord] of loading) this.readFromCachePromises.delete(cord); - - const events = await relayRequest(localRelay, filters); - for (const event of events) { - this.handleEvent(event, false); - const cord = getEventCoordinate(event); - const promise = loading.get(cord); - if (promise) promise.resolve(true); - loading.delete(cord); - } - - // resolve remaining promises - for (const [_, promise] of loading) promise.resolve(); - - if (events.length > 0) this.dbLog(`Read ${events.length} events from database`); - } - loadFromCache(cord: string) { - if (!localRelay) return Promise.resolve(false); - const dedupe = this.readFromCachePromises.get(cord); - if (dedupe) return dedupe; - - // add to read queue - const promise = createDefer(); - this.readFromCachePromises.set(cord, promise); - - this.readFromCacheThrottle(); - - return promise; - } - private writeCacheQueue = new Map(); private writeToCacheThrottle = _throttle(this.writeToCache, WRITE_CACHE_BATCH_TIME); private async writeToCache() { if (this.writeCacheQueue.size === 0) return; if (localRelay) { - this.dbLog(`Writing ${this.writeCacheQueue.size} events to database`); + this.log(`Sending ${this.writeCacheQueue.size} events to cache relay`); for (const [_, event] of this.writeCacheQueue) localRelay.publish(event); } this.writeCacheQueue.clear(); @@ -158,13 +106,14 @@ class ReplaceableEventsService { const key = createCoordinate(kind, pubkey, d); const sub = this.subjects.get(key); - if (!sub.value) { - this.loadFromCache(key).then((loaded) => { + if (!sub.value && this.cacheLoader) { + this.cacheLoader.requestEvent(kind, pubkey, d).then((loaded) => { if (!loaded && !sub.value) this.requestEventFromRelays(relays, kind, pubkey, d); }); } - if (opts?.alwaysRequest || (!sub.value && opts.ignoreCache)) { + if (opts?.alwaysRequest || !this.cacheLoader || (!sub.value && opts.ignoreCache)) { + this.log("Skipping cache for", key); this.requestEventFromRelays(relays, kind, pubkey, d); } diff --git a/src/services/single-event.ts b/src/services/single-event.ts index 66e56c0eb..638866e33 100644 --- a/src/services/single-event.ts +++ b/src/services/single-event.ts @@ -41,7 +41,7 @@ class SingleEventLoader { // this.pending.get(id).add(relay); this.idsFromRelays.get(relay).add(id); } - this.idsFromRelays.get(localRelay as AbstractRelay).add(id); + if (localRelay) this.idsFromRelays.get(localRelay as AbstractRelay).add(id); this.updateSubscriptionsThrottle(); return subject;