small fixes to batch kind loader

This commit is contained in:
hzrd149
2024-05-01 15:14:45 -05:00
parent 7506141f54
commit f9e099c0c8
4 changed files with 52 additions and 79 deletions

View File

@@ -9,6 +9,7 @@ import PersistentSubscription from "./persistent-subscription";
import Process from "./process";
import BracketsX from "../components/icons/brackets-x";
import processManager from "../services/process-manager";
import createDefer, { Deferred } from "./deferred";
export function createCoordinate(kind: number, pubkey: string, d?: string) {
return `${kind}:${pubkey}${d ? ":" + d : ""}`;
@@ -25,6 +26,7 @@ export default class BatchKindLoader {
private requestNext = new Set<string>();
private requested = new Map<string, Date>();
private promises = new Map<string, Deferred<NostrEvent | null>>();
log: Debugger;
@@ -45,21 +47,52 @@ export default class BatchKindLoader {
const current = this.events.getEvent(key);
if (!current || event.created_at > current.created_at) {
this.events.addEvent(event);
// if there is a promise waiting, resolve with event
const defer = this.promises.get(key);
if (defer) {
this.promises.delete(key);
defer.resolve(event);
}
}
}
private handleEOSE() {
// relays says it has nothing left
this.requested.clear();
// prune requests
const timeout = dayjs().subtract(1, "minute");
for (const [key, date] of this.requested) {
if (dayjs(date).isBefore(timeout)) {
this.requested.delete(key);
// if there is a promise waiting for this event, resolve null
const defer = this.promises.get(key);
if (defer) {
this.promises.delete(key);
defer.resolve(null);
}
}
}
}
requestEvent(kind: number, pubkey: string, d?: string) {
requestEvent(kind: number, pubkey: string, d?: string): Promise<NostrEvent | null> {
const key = createCoordinate(kind, pubkey, d);
const event = this.events.getEvent(key);
if (!event) {
if (this.promises.has(key)) return this.promises.get(key)!;
const p = createDefer<NostrEvent | null>();
this.promises.set(key, p);
this.requestNext.add(key);
this.updateThrottle();
return p;
}
return event;
return Promise.resolve(event);
}
updateThrottle = _throttle(this.update, RELAY_REQUEST_BATCH_TIME);
@@ -73,15 +106,6 @@ export default class BatchKindLoader {
}
this.requestNext.clear();
// prune requests
const timeout = dayjs().subtract(1, "minute");
for (const [key, date] of this.requested) {
if (dayjs(date).isBefore(timeout)) {
this.requested.delete(key);
needsUpdate = true;
}
}
// update the subscription
if (needsUpdate) {
if (this.requested.size > 0) {

View File

@@ -103,7 +103,7 @@ export default class MultiSubscription {
}
// create cache sub if it does not exist
if (!this.cacheSubscription) {
if (!this.cacheSubscription && localRelay) {
this.cacheSubscription = new PersistentSubscription(localRelay as AbstractRelay, {
onevent: (event) => this.handleEvent(event),
});
@@ -111,7 +111,7 @@ export default class MultiSubscription {
}
// update cache sub filters if they changed
if (!isFilterEqual(this.cacheSubscription.filters, this.filters)) {
if (this.cacheSubscription && !isFilterEqual(this.cacheSubscription.filters, this.filters)) {
this.cacheSubscription.filters = this.filters;
this.cacheSubscription.fire();
}

View File

@@ -1,12 +1,10 @@
import { Filter, NostrEvent } from "nostr-tools";
import { AbstractRelay, NostrEvent } from "nostr-tools";
import _throttle from "lodash.throttle";
import SuperMap from "../classes/super-map";
import { logger } from "../helpers/debug";
import { getEventCoordinate } from "../helpers/nostr/event";
import createDefer, { Deferred } from "../classes/deferred";
import { localRelay } from "./local-relay";
import { relayRequest } from "../helpers/relay";
import EventStore from "../classes/event-store";
import Subject from "../classes/subject";
import BatchKindLoader, { createCoordinate } from "../classes/batch-kind-loader";
@@ -30,13 +28,14 @@ export function getHumanReadableCoordinate(kind: number, pubkey: string, d?: str
return `${kind}:${truncateId(pubkey)}${d ? ":" + d : ""}`;
}
const READ_CACHE_BATCH_TIME = 250;
const WRITE_CACHE_BATCH_TIME = 250;
class ReplaceableEventsService {
process: Process;
private subjects = new SuperMap<string, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
private cacheLoader: BatchKindLoader | null = null;
private loaders = new SuperMap<string, BatchKindLoader>((relay) => {
const loader = new BatchKindLoader(relayPoolService.requestRelay(relay), this.log.extend(relay));
loader.events.onEvent.subscribe((e) => this.handleEvent(e));
@@ -47,13 +46,18 @@ class ReplaceableEventsService {
events = new EventStore();
log = logger.extend("ReplaceableEventLoader");
dbLog = this.log.extend("database");
constructor() {
this.process = new Process("ReplaceableEventsService", this);
this.process.icon = UserSquare;
this.process.active = true;
processManager.registerProcess(this.process);
if (localRelay) {
this.cacheLoader = new BatchKindLoader(localRelay as AbstractRelay, this.log.extend("cache-relay"));
this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e));
this.process.addChild(this.cacheLoader.process);
}
}
handleEvent(event: NostrEvent, saveToCache = true) {
@@ -73,69 +77,13 @@ class ReplaceableEventsService {
return this.subjects.get(createCoordinate(kind, pubkey, d));
}
private readFromCachePromises = new Map<string, Deferred<boolean>>();
private readFromCacheThrottle = _throttle(this.readFromCache, READ_CACHE_BATCH_TIME);
private async readFromCache() {
if (this.readFromCachePromises.size === 0) return;
if (!localRelay) return;
const loading = new Map<string, Deferred<boolean>>();
const kindFilters: Record<number, Filter> = {};
for (const [cord, p] of this.readFromCachePromises) {
const [kindStr, pubkey, d] = cord.split(":") as [string, string] | [string, string, string];
const kind = parseInt(kindStr);
kindFilters[kind] = kindFilters[kind] || { kinds: [kind] };
const arr = (kindFilters[kind].authors = kindFilters[kind].authors || []);
arr.push(pubkey);
if (d) {
const arr = (kindFilters[kind]["#d"] = kindFilters[kind]["#d"] || []);
arr.push(d);
}
loading.set(cord, p);
}
const filters = Object.values(kindFilters);
for (const [cord] of loading) this.readFromCachePromises.delete(cord);
const events = await relayRequest(localRelay, filters);
for (const event of events) {
this.handleEvent(event, false);
const cord = getEventCoordinate(event);
const promise = loading.get(cord);
if (promise) promise.resolve(true);
loading.delete(cord);
}
// resolve remaining promises
for (const [_, promise] of loading) promise.resolve();
if (events.length > 0) this.dbLog(`Read ${events.length} events from database`);
}
loadFromCache(cord: string) {
if (!localRelay) return Promise.resolve(false);
const dedupe = this.readFromCachePromises.get(cord);
if (dedupe) return dedupe;
// add to read queue
const promise = createDefer<boolean>();
this.readFromCachePromises.set(cord, promise);
this.readFromCacheThrottle();
return promise;
}
private writeCacheQueue = new Map<string, NostrEvent>();
private writeToCacheThrottle = _throttle(this.writeToCache, WRITE_CACHE_BATCH_TIME);
private async writeToCache() {
if (this.writeCacheQueue.size === 0) return;
if (localRelay) {
this.dbLog(`Writing ${this.writeCacheQueue.size} events to database`);
this.log(`Sending ${this.writeCacheQueue.size} events to cache relay`);
for (const [_, event] of this.writeCacheQueue) localRelay.publish(event);
}
this.writeCacheQueue.clear();
@@ -158,13 +106,14 @@ class ReplaceableEventsService {
const key = createCoordinate(kind, pubkey, d);
const sub = this.subjects.get(key);
if (!sub.value) {
this.loadFromCache(key).then((loaded) => {
if (!sub.value && this.cacheLoader) {
this.cacheLoader.requestEvent(kind, pubkey, d).then((loaded) => {
if (!loaded && !sub.value) this.requestEventFromRelays(relays, kind, pubkey, d);
});
}
if (opts?.alwaysRequest || (!sub.value && opts.ignoreCache)) {
if (opts?.alwaysRequest || !this.cacheLoader || (!sub.value && opts.ignoreCache)) {
this.log("Skipping cache for", key);
this.requestEventFromRelays(relays, kind, pubkey, d);
}

View File

@@ -41,7 +41,7 @@ class SingleEventLoader {
// this.pending.get(id).add(relay);
this.idsFromRelays.get(relay).add(id);
}
this.idsFromRelays.get(localRelay as AbstractRelay).add(id);
if (localRelay) this.idsFromRelays.get(localRelay as AbstractRelay).add(id);
this.updateSubscriptionsThrottle();
return subject;