rebuild zap and reaction services using batch relation loader

This commit is contained in:
hzrd149 2024-05-04 13:25:53 -05:00
parent a288fbf15c
commit 3e5c34e3fe
4 changed files with 260 additions and 139 deletions

View File

@ -0,0 +1,146 @@
import { NostrEvent, AbstractRelay } from "nostr-tools";
import _throttle from "lodash.throttle";
import debug, { Debugger } from "debug";
import PersistentSubscription from "./persistent-subscription";
import Process from "./process";
import processManager from "../services/process-manager";
import createDefer, { Deferred } from "./deferred";
import Dataflow04 from "../components/icons/dataflow-04";
import SuperMap from "./super-map";
import Subject from "./subject";
/** Batches requests for events that reference another event (via #e tag) from a single relay */
export default class BatchRelationLoader {
kinds: number[];
relay: AbstractRelay;
process: Process;
requested = new Set<string>();
references = new SuperMap<string, Map<string, NostrEvent>>(() => new Map());
onEventUpdate = new Subject<string>();
subscription: PersistentSubscription;
// a map of events that are waiting for the current request to finish
private next = new Map<string, Deferred<Map<string, NostrEvent>>>();
// a map of events currently being requested from the relay
private pending = new Map<string, Deferred<Map<string, NostrEvent>>>();
log: Debugger;
constructor(relay: AbstractRelay, kinds: number[], log?: Debugger) {
this.relay = relay;
this.kinds = kinds;
this.log = log || debug("BatchRelationLoader");
this.process = new Process("BatchRelationLoader", this, [relay]);
this.process.icon = Dataflow04;
processManager.registerProcess(this.process);
this.subscription = new PersistentSubscription(this.relay, {
onevent: (event) => this.handleEvent(event),
oneose: () => this.handleEOSE(),
});
this.process.addChild(this.subscription.process);
}
requestEvents(uid: string): Promise<Map<string, NostrEvent>> {
// if there is a cache only return it if we have requested this id before
if (this.references.has(uid) && this.requested.has(uid)) {
return Promise.resolve(this.references.get(uid));
}
if (this.pending.has(uid)) return this.pending.get(uid)!;
if (this.next.has(uid)) return this.next.get(uid)!;
const defer = createDefer<Map<string, NostrEvent>>();
this.next.set(uid, defer);
// request subscription update
this.requestUpdate();
return defer;
}
requestUpdate = _throttle(
() => {
// don't do anything if the subscription is already running
if (this.process.active) return;
this.process.active = true;
this.update();
},
500,
{ leading: false, trailing: true },
);
private handleEvent(event: NostrEvent) {
// add event to cache
const updateIds = new Set<string>();
for (const tag of event.tags) {
if (tag[0] === "e" && tag[1]) {
const id = tag[1];
this.references.get(id).set(event.id, event);
updateIds.add(id);
} else if (tag[0] === "a" && tag[1]) {
const cord = tag[1];
this.references.get(cord).set(event.id, event);
updateIds.add(cord);
}
}
for (const id of updateIds) this.onEventUpdate.next(id);
}
private handleEOSE() {
// resolve all pending from the last request
for (const [uid, defer] of this.pending) {
defer.resolve(this.references.get(uid));
}
// reset
this.pending.clear();
this.process.active = false;
// do next request or close the subscription
if (this.next.size > 0) this.requestUpdate();
}
update() {
// copy everything from next to pending
for (const [uid, defer] of this.next) this.pending.set(uid, defer);
this.next.clear();
// update subscription
if (this.pending.size > 0) {
this.log(`Updating filters ${this.pending.size} events`);
const ids: string[] = [];
const cords: string[] = [];
const uids = Array.from(this.pending.keys());
for (const uid of uids) {
this.requested.add(uid);
if (uid.includes(":")) cords.push(uid);
else ids.push(uid);
}
this.subscription.filters = [];
if (ids.length > 0) this.subscription.filters.push({ "#e": ids, kinds: this.kinds });
if (ids.length > 0) this.subscription.filters.push({ "#a": cords, kinds: this.kinds });
this.subscription.update();
this.process.active = true;
} else {
this.log("Closing");
this.subscription.close();
this.process.active = false;
}
}
destroy() {
this.process.remove();
processManager.unregisterProcess(this.process);
}
}

View File

@ -1,92 +1,82 @@
import { Filter, kinds, nip25 } from "nostr-tools";
import { AbstractRelay, kinds } from "nostr-tools";
import _throttle from "lodash.throttle";
import Subject from "../classes/subject";
import SuperMap from "../classes/super-map";
import { NostrEvent } from "../types/nostr-event";
import { localRelay } from "./local-relay";
import { relayRequest } from "../helpers/relay";
import relayPoolService from "./relay-pool";
type eventId = string;
type relay = string;
import Process from "../classes/process";
import { LightningIcon } from "../components/icons";
import processManager from "./process-manager";
import BatchRelationLoader from "../classes/batch-relation-loader";
import { logger } from "../helpers/debug";
class EventReactionsService {
subjects = new SuperMap<eventId, Subject<NostrEvent[]>>(() => new Subject<NostrEvent[]>([]));
pending = new SuperMap<eventId, Set<relay>>(() => new Set());
log = logger.extend("EventReactionsService");
process: Process;
requestReactions(eventId: string, relays: Iterable<string>, alwaysRequest = true) {
const subject = this.subjects.get(eventId);
subjects = new SuperMap<string, Subject<NostrEvent[]>>(() => new Subject<NostrEvent[]>([]));
if (!subject.value || alwaysRequest) {
for (const relay of relays) {
this.pending.get(eventId).add(relay);
loaders = new SuperMap<AbstractRelay, BatchRelationLoader>((relay) => {
const loader = new BatchRelationLoader(relay, [kinds.Reaction], this.log.extend(relay.url));
this.process.addChild(loader.process);
loader.onEventUpdate.subscribe((id) => {
this.updateSubject(id);
});
return loader;
});
constructor() {
this.process = new Process("EventReactionsService", this);
this.process.icon = LightningIcon;
this.process.active = true;
processManager.registerProcess(this.process);
}
// merged results from all loaders for a single event
private updateSubject(id: string) {
const ids = new Set<string>();
const events: NostrEvent[] = [];
const subject = this.subjects.get(id);
for (const [relay, loader] of this.loaders) {
if (loader.references.has(id)) {
const other = loader.references.get(id);
for (const [_, e] of other) {
if (!ids.has(e.id)) {
ids.add(e.id);
events.push(e);
}
}
}
}
this.throttleBatchRequest();
subject.next(events);
}
requestReactions(eventUID: string, urls: Iterable<string | URL | AbstractRelay>, alwaysRequest = true) {
const subject = this.subjects.get(eventUID);
if (subject.value && !alwaysRequest) return;
if (localRelay) {
this.loaders.get(localRelay as AbstractRelay).requestEvents(eventUID);
}
const relays = relayPoolService.getRelays(urls);
for (const relay of relays) {
this.loaders.get(relay).requestEvents(eventUID);
}
return subject;
}
handleEvent(event: NostrEvent, cache = true) {
if (event.kind !== kinds.Reaction) return;
const pointer = nip25.getReactedEventPointer(event);
if (!pointer?.id) return;
const subject = this.subjects.get(pointer.id);
if (!subject.value) {
subject.next([event]);
} else if (!subject.value.some((e) => e.id === event.id)) {
subject.next([...subject.value, event]);
}
if (cache && localRelay) localRelay.publish(event);
}
throttleBatchRequest = _throttle(this.batchRequests, 2000);
batchRequests() {
if (this.pending.size === 0) return;
// load events from cache
const uids = Array.from(this.pending.keys());
const ids = uids.filter((id) => !id.includes(":"));
const cords = uids.filter((id) => id.includes(":"));
const filters: Filter[] = [];
if (ids.length > 0) filters.push({ "#e": ids, kinds: [kinds.Reaction] });
if (cords.length > 0) filters.push({ "#a": cords, kinds: [kinds.Reaction] });
if (filters.length > 0 && localRelay) {
relayRequest(localRelay, filters).then((events) => events.forEach((e) => this.handleEvent(e, false)));
}
const idsFromRelays: Record<relay, eventId[]> = {};
for (const [id, relays] of this.pending) {
for (const relay of relays) {
idsFromRelays[relay] = idsFromRelays[relay] ?? [];
idsFromRelays[relay].push(id);
}
}
for (const [relay, ids] of Object.entries(idsFromRelays)) {
const eventIds = ids.filter((id) => !id.includes(":"));
const coordinates = ids.filter((id) => id.includes(":"));
const filters: Filter[] = [];
if (eventIds.length > 0) filters.push({ "#e": eventIds, kinds: [kinds.Reaction] });
if (coordinates.length > 0) filters.push({ "#a": coordinates, kinds: [kinds.Reaction] });
if (filters.length > 0) {
const subscription = relayPoolService
.requestRelay(relay)
.subscribe(filters, { onevent: (event) => this.handleEvent(event), oneose: () => subscription.close() });
}
}
this.pending.clear();
}
}
const eventReactionsService = new EventReactionsService();
if (import.meta.env.DEV) {
//@ts-expect-error
// @ts-ignore
window.eventReactionsService = eventReactionsService;
}

View File

@ -1,91 +1,75 @@
import { Filter, kinds } from "nostr-tools";
import { AbstractRelay, kinds } from "nostr-tools";
import _throttle from "lodash.throttle";
import Subject from "../classes/subject";
import SuperMap from "../classes/super-map";
import { NostrEvent, isATag, isETag } from "../types/nostr-event";
import { relayRequest } from "../helpers/relay";
import { NostrEvent } from "../types/nostr-event";
import { localRelay } from "./local-relay";
import relayPoolService from "./relay-pool";
type eventUID = string;
type relay = string;
import Process from "../classes/process";
import { LightningIcon } from "../components/icons";
import processManager from "./process-manager";
import BatchRelationLoader from "../classes/batch-relation-loader";
import { logger } from "../helpers/debug";
class EventZapsService {
subjects = new SuperMap<eventUID, Subject<NostrEvent[]>>(() => new Subject<NostrEvent[]>([]));
pending = new SuperMap<eventUID, Set<relay>>(() => new Set());
log = logger.extend("EventZapsService");
process: Process;
requestZaps(eventUID: eventUID, relays: Iterable<string>, alwaysRequest = true) {
const subject = this.subjects.get(eventUID);
subjects = new SuperMap<string, Subject<NostrEvent[]>>(() => new Subject<NostrEvent[]>([]));
if (!subject.value || alwaysRequest) {
for (const relay of relays) {
this.pending.get(eventUID).add(relay);
}
}
this.throttleBatchRequest();
loaders = new SuperMap<AbstractRelay, BatchRelationLoader>((relay) => {
const loader = new BatchRelationLoader(relay, [kinds.Zap], this.log.extend(relay.url));
this.process.addChild(loader.process);
loader.onEventUpdate.subscribe((id) => {
this.updateSubject(id);
});
return loader;
});
return subject;
constructor() {
this.process = new Process("EventZapsService", this);
this.process.icon = LightningIcon;
this.process.active = true;
processManager.registerProcess(this.process);
}
handleEvent(event: NostrEvent, cache = true) {
if (event.kind !== kinds.Zap) return;
const eventUID = event.tags.find(isETag)?.[1] ?? event.tags.find(isATag)?.[1];
if (!eventUID) return;
// merged results from all loaders for a single event
private updateSubject(id: string) {
const ids = new Set<string>();
const events: NostrEvent[] = [];
const subject = this.subjects.get(id);
const subject = this.subjects.get(eventUID);
if (!subject.value) {
subject.next([event]);
} else if (!subject.value.some((e) => e.id === event.id)) {
subject.next([...subject.value, event]);
}
if (cache && localRelay) localRelay.publish(event);
}
throttleBatchRequest = _throttle(this.batchRequests, 2000);
batchRequests() {
if (this.pending.size === 0) return;
// load events from cache
const uids = Array.from(this.pending.keys());
const ids = uids.filter((id) => !id.includes(":"));
const cords = uids.filter((id) => id.includes(":"));
const filters: Filter[] = [];
if (ids.length > 0) filters.push({ "#e": ids, kinds: [kinds.Zap] });
if (cords.length > 0) filters.push({ "#a": cords, kinds: [kinds.Zap] });
if (filters.length > 0 && localRelay) {
relayRequest(localRelay, filters).then((events) => events.forEach((e) => this.handleEvent(e, false)));
}
const idsFromRelays: Record<relay, eventUID[]> = {};
for (const [id, relays] of this.pending) {
for (const relay of relays) {
idsFromRelays[relay] = idsFromRelays[relay] ?? [];
idsFromRelays[relay].push(id);
}
}
for (const [url, ids] of Object.entries(idsFromRelays)) {
const eventIds = ids.filter((id) => !id.includes(":"));
const coordinates = ids.filter((id) => id.includes(":"));
const filter: Filter[] = [];
if (eventIds.length > 0) filter.push({ "#e": eventIds, kinds: [kinds.Zap] });
if (coordinates.length > 0) filter.push({ "#a": coordinates, kinds: [kinds.Zap] });
if (filter.length > 0) {
const relay = relayPoolService.getRelay(url);
if (relay) {
if (!relay.connected) relayPoolService.requestConnect(relay);
const sub = relay.subscribe(filter, {
onevent: (event) => this.handleEvent(event),
oneose: () => sub.close(),
});
for (const [relay, loader] of this.loaders) {
if (loader.references.has(id)) {
const other = loader.references.get(id);
for (const [_, e] of other) {
if (!ids.has(e.id)) {
ids.add(e.id);
events.push(e);
}
}
}
}
this.pending.clear();
subject.next(events);
}
requestZaps(eventUID: string, urls: Iterable<string | URL | AbstractRelay>, alwaysRequest = true) {
const subject = this.subjects.get(eventUID);
if (subject.value && !alwaysRequest) return;
if (localRelay) {
this.loaders.get(localRelay as AbstractRelay).requestEvents(eventUID);
}
const relays = relayPoolService.getRelays(urls);
for (const relay of relays) {
this.loaders.get(relay).requestEvents(eventUID);
}
return subject;
}
}

View File

@ -34,6 +34,7 @@ class SingleEventService {
constructor() {
this.process = new Process("SingleEventService", this);
this.process.icon = Code02;
this.process.active = true;
processManager.registerProcess(this.process);
// when an event is added to the store, pass it along to the subjects