performance improvements to timeline loader

don't verify events from cache relay
This commit is contained in:
hzrd149 2024-04-12 20:30:34 -05:00
parent fba68bc221
commit 71ae6739e2
15 changed files with 163 additions and 138 deletions

View File

@ -1,5 +1,5 @@
import { Debugger } from "debug";
import { Filter, NostrEvent, Relay, matchFilters } from "nostr-tools";
import { Filter, NostrEvent, matchFilters } from "nostr-tools";
import _throttle from "lodash.throttle";
import Subject from "./subject";
@ -9,13 +9,14 @@ import deleteEventService from "../services/delete-events";
import { mergeFilter } from "../helpers/nostr/filter";
import { isATag, isETag } from "../types/nostr-event";
import relayPoolService from "../services/relay-pool";
import { SimpleRelay } from "nostr-idb";
const DEFAULT_CHUNK_SIZE = 100;
export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
export default class ChunkedRequest {
relay: Relay;
relay: SimpleRelay;
filters: Filter[];
chunkSize = DEFAULT_CHUNK_SIZE;
private log: Debugger;
@ -28,7 +29,7 @@ export default class ChunkedRequest {
onChunkFinish = new Subject<number>();
constructor(relay: Relay, filters: Filter[], log?: Debugger) {
constructor(relay: SimpleRelay, filters: Filter[], log?: Debugger) {
this.relay = relay;
this.filters = filters;
@ -47,9 +48,10 @@ export default class ChunkedRequest {
filters = mergeFilter(filters, { until: oldestEvent.created_at - 1 });
}
relayPoolService.addClaim(this.relay, this);
relayPoolService.addClaim(this.relay.url, this);
let gotEvents = 0;
if (filters.length === 0) debugger;
const sub = this.relay.subscribe(filters, {
onevent: (event) => {
this.handleEvent(event);
@ -63,7 +65,7 @@ export default class ChunkedRequest {
} else this.log(`Got ${gotEvents} events`);
this.onChunkFinish.next(gotEvents);
sub.close();
relayPoolService.removeClaim(this.relay, this);
relayPoolService.removeClaim(this.relay.url, this);
},
});
}

View File

@ -1,11 +1,10 @@
import { nanoid } from "nanoid";
import { NostrEvent } from "../types/nostr-event";
import { RelayQueryMap } from "../types/nostr-relay";
import relayPoolService from "../services/relay-pool";
import { isFilterEqual, isQueryMapEqual } from "../helpers/nostr/filter";
import { isFilterEqual } from "../helpers/nostr/filter";
import ControlledObservable from "./controlled-observable";
import { Relay, Subscription } from "nostr-tools";
import { Filter, Relay, Subscription } from "nostr-tools";
export default class NostrMultiSubscription {
static INIT = "initial";
@ -14,7 +13,7 @@ export default class NostrMultiSubscription {
id: string;
name?: string;
queryMap: RelayQueryMap = {};
filters: Filter[] = [];
relays: Relay[] = [];
subscriptions = new Map<Relay, Subscription>();
@ -38,32 +37,40 @@ export default class NostrMultiSubscription {
}
private handleRemoveRelay(relay: Relay) {
relayPoolService.removeClaim(relay.url, this);
// close subscription
const sub = this.subscriptions.get(relay);
if (sub && !sub.closed) {
sub.close();
this.subscriptions.delete(relay);
}
}
setQueryMap(queryMap: RelayQueryMap) {
if (isQueryMapEqual(this.queryMap, queryMap)) return;
setFilters(filters: Filter[]) {
if (isFilterEqual(this.filters, filters)) return;
this.filters = filters;
this.updateSubscriptions();
}
setRelays(relays: Iterable<string>) {
// add and remove relays
for (const url of Object.keys(queryMap)) {
if (!this.queryMap[url]) {
if (this.relays.some((r) => r.url === url)) continue;
for (const url of relays) {
if (!this.relays.some((r) => r.url === url)) {
// add relay
const relay = relayPoolService.requestRelay(url);
this.relays.push(relay);
this.handleAddRelay(relay);
}
}
for (const url of Object.keys(this.queryMap)) {
if (!queryMap[url]) {
const relay = this.relays.find((r) => r.url === url);
if (!relay) continue;
const urlArr = Array.from(relays);
for (const relay of this.relays) {
if (!urlArr.includes(relay.url)) {
this.relays = this.relays.filter((r) => r !== relay);
this.handleRemoveRelay(relay);
}
}
this.queryMap = queryMap;
this.updateSubscriptions();
}
@ -79,7 +86,7 @@ export default class NostrMultiSubscription {
// else open and update subscriptions
for (const relay of this.relays) {
const filters = this.queryMap[relay.url];
const filters = this.filters;
let subscription = this.subscriptions.get(relay);
if (!subscription || !isFilterEqual(subscription.filters, filters)) {
@ -87,6 +94,7 @@ export default class NostrMultiSubscription {
subscription.filters = filters;
subscription.fire();
} else {
if (filters.length === 0) debugger;
subscription = relay.subscribe(filters, {
onevent: (event) => this.handleEvent(event),
onclose: () => {

View File

@ -1,12 +1,12 @@
import { nanoid } from "nanoid";
import { NostrEvent, Relay } from "nostr-tools";
import { NostrEvent, AbstractRelay } from "nostr-tools";
import relayPoolService from "../services/relay-pool";
import createDefer from "./deferred";
import { PersistentSubject } from "./subject";
import ControlledObservable from "./controlled-observable";
type Result = { relay: Relay; success: boolean; message: string };
type Result = { relay: AbstractRelay; success: boolean; message: string };
export default class NostrPublishAction {
id = nanoid();
@ -19,7 +19,7 @@ export default class NostrPublishAction {
onResult = new ControlledObservable<Result>();
onComplete = createDefer<Result[]>();
private remaining = new Set<Relay>();
private remaining = new Set<AbstractRelay>();
constructor(label: string, relays: Iterable<string>, event: NostrEvent, timeout: number = 5000) {
this.label = label;
@ -41,7 +41,7 @@ export default class NostrPublishAction {
setTimeout(this.handleTimeout.bind(this), timeout);
}
private handleResult(id: string, success: boolean, message: string, relay: Relay) {
private handleResult(id: string, success: boolean, message: string, relay: AbstractRelay) {
const result: Result = { relay, success, message };
this.results.next([...this.results.value, result]);
this.onResult.next(result);

View File

@ -1,12 +1,12 @@
import { Relay } from "nostr-tools";
import { AbstractRelay, verifyEvent } from "nostr-tools";
import { logger } from "../helpers/debug";
import { validateRelayURL } from "../helpers/relay";
import { offlineMode } from "../services/offline-mode";
import Subject from "./subject";
export default class RelayPool {
relays = new Map<string, Relay>();
onRelayCreated = new Subject<Relay>();
relays = new Map<string, AbstractRelay>();
onRelayCreated = new Subject<AbstractRelay>();
relayClaims = new Map<string, Set<any>>();
@ -28,12 +28,12 @@ export default class RelayPool {
url = validateRelayURL(url);
const key = url.toString();
if (!this.relays.has(key)) {
const newRelay = new Relay(key);
const newRelay = new AbstractRelay(key, { verifyEvent });
this.relays.set(key, newRelay);
this.onRelayCreated.next(newRelay);
}
const relay = this.relays.get(key) as Relay;
const relay = this.relays.get(key) as AbstractRelay;
if (connect && !relay.connected) {
try {
relay.connect();
@ -69,13 +69,17 @@ export default class RelayPool {
}
}
addClaim(relay: string | URL | Relay, id: any) {
const key = relay instanceof Relay ? relay.url : validateRelayURL(relay).toString();
this.getRelayClaims(key).add(id);
addClaim(relay: string | URL, id: any) {
try {
const key = validateRelayURL(relay).toString();
this.getRelayClaims(key).add(id);
} catch (error) {}
}
removeClaim(relay: string | URL | Relay, id: any) {
const key = relay instanceof Relay ? relay.url : validateRelayURL(relay).toString();
this.getRelayClaims(key).delete(id);
removeClaim(relay: string | URL, id: any) {
try {
const key = validateRelayURL(relay).toString();
this.getRelayClaims(key).delete(id);
} catch (error) {}
}
get connectedCount() {

View File

@ -3,16 +3,14 @@ import { Debugger } from "debug";
import { Filter, NostrEvent } from "nostr-tools";
import _throttle from "lodash.throttle";
import { RelayQueryMap } from "../types/nostr-relay";
import NostrMultiSubscription from "./nostr-multi-subscription";
import { PersistentSubject } from "./subject";
import { logger } from "../helpers/debug";
import EventStore from "./event-store";
import { isReplaceable } from "../helpers/nostr/event";
import replaceableEventsService from "../services/replaceable-events";
import { mergeFilter, isFilterEqual, isQueryMapEqual, mapQueryMap, stringifyFilter } from "../helpers/nostr/filter";
import { mergeFilter, isFilterEqual } from "../helpers/nostr/filter";
import { localRelay } from "../services/local-relay";
import { relayRequest } from "../helpers/relay";
import SuperMap from "./super-map";
import ChunkedRequest from "./chunked-request";
import relayPoolService from "../services/relay-pool";
@ -23,7 +21,8 @@ export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
export default class TimelineLoader {
cursor = dayjs().unix();
queryMap: RelayQueryMap = {};
filters: Filter[] = [];
relays: string[] = [];
events: EventStore;
timeline = new PersistentSubject<NostrEvent[]>([]);
@ -37,6 +36,7 @@ export default class TimelineLoader {
private log: Debugger;
private subscription: NostrMultiSubscription;
private cacheChunkLoader: ChunkedRequest | null = null;
private chunkLoaders = new Map<string, ChunkedRequest>();
constructor(name: string) {
@ -87,65 +87,62 @@ export default class TimelineLoader {
this.chunkLoaderSubs.delete(loader);
}
private loadQueriesFromCache(queryMap: RelayQueryMap) {
const queries: Record<string, Filter[]> = {};
for (const [url, filters] of Object.entries(queryMap)) {
const key = stringifyFilter(filters);
if (!queries[key]) queries[key] = Array.isArray(filters) ? filters : [filters];
setFilters(filters: Filter[]) {
if (isFilterEqual(this.filters, filters)) return;
this.log("Set filters", filters);
// recreate all chunk loaders
for (const url of this.relays) {
const loader = this.chunkLoaders.get(url);
if (loader) {
this.disconnectToChunkLoader(loader);
this.chunkLoaders.delete(url);
}
const chunkLoader = new ChunkedRequest(relayPoolService.requestRelay(url), filters, this.log.extend(url));
this.chunkLoaders.set(url, chunkLoader);
this.connectToChunkLoader(chunkLoader);
}
for (const filters of Object.values(queries)) {
relayRequest(localRelay, filters).then((events) => {
for (const e of events) this.handleEvent(e, false);
});
}
// set filters
this.filters = filters;
// recreate cache chunk loader
if (this.cacheChunkLoader) this.disconnectToChunkLoader(this.cacheChunkLoader);
this.cacheChunkLoader = new ChunkedRequest(localRelay, this.filters, this.log.extend("local-relay"));
this.connectToChunkLoader(this.cacheChunkLoader);
// update the live subscription query map and add limit
this.subscription.setFilters(mergeFilter(filters, { limit: BLOCK_SIZE / 2 }));
}
setQueryMap(queryMap: RelayQueryMap) {
if (isQueryMapEqual(this.queryMap, queryMap)) return;
setRelays(relays: Iterable<string>) {
this.relays = Array.from(relays);
this.log("set query map", queryMap);
// remove relays
for (const relay of Object.keys(this.queryMap)) {
const loader = this.chunkLoaders.get(relay);
// remove chunk loaders
for (const url of relays) {
const loader = this.chunkLoaders.get(url);
if (!loader) continue;
if (!queryMap[relay]) {
if (!this.relays.includes(url)) {
this.disconnectToChunkLoader(loader);
this.chunkLoaders.delete(relay);
this.chunkLoaders.delete(url);
}
}
for (const [relay, filter] of Object.entries(queryMap)) {
// remove outdated loaders
if (this.queryMap[relay] && !isFilterEqual(this.queryMap[relay], filter)) {
const old = this.chunkLoaders.get(relay)!;
this.disconnectToChunkLoader(old);
this.chunkLoaders.delete(relay);
}
if (!this.chunkLoaders.has(relay)) {
const loader = new ChunkedRequest(
relayPoolService.requestRelay(relay),
Array.isArray(filter) ? filter : [filter],
this.log.extend(relay),
);
this.chunkLoaders.set(relay, loader);
this.connectToChunkLoader(loader);
// create chunk loaders only if filters are set
if (this.filters.length > 0) {
for (const url of relays) {
if (!this.chunkLoaders.has(url)) {
const loader = new ChunkedRequest(relayPoolService.requestRelay(url), this.filters, this.log.extend(url));
this.chunkLoaders.set(url, loader);
this.connectToChunkLoader(loader);
}
}
}
this.queryMap = queryMap;
// load all filters from cache relay
this.loadQueriesFromCache(queryMap);
// update the subscription query map and add limit
this.subscription.setQueryMap(
mapQueryMap(this.queryMap, (filter) => mergeFilter(filter, { limit: BLOCK_SIZE / 2 })),
);
this.triggerChunkLoad();
// update live subscription
this.subscription.setRelays(relays);
}
setEventFilter(filter?: EventFilter) {
@ -157,30 +154,48 @@ export default class TimelineLoader {
this.triggerChunkLoad();
}
private getAllLoaders() {
return this.cacheChunkLoader
? [...this.chunkLoaders.values(), this.cacheChunkLoader]
: Array.from(this.chunkLoaders.values());
}
triggerChunkLoad() {
let triggeredLoad = false;
for (const [relay, loader] of this.chunkLoaders) {
const loaders = this.getAllLoaders();
for (const loader of loaders) {
// skip loader if its already loading or complete
if (loader.complete || loader.loading) continue;
const event = loader.getLastEvent(this.loadNextBlockBuffer, this.eventFilter);
if (!event || event.created_at >= this.cursor) {
loader.loadNextChunk();
triggeredLoad = true;
}
}
if (triggeredLoad) this.updateLoading();
}
loadAllNextChunks() {
let triggeredLoad = false;
for (const [relay, loader] of this.chunkLoaders) {
const loaders = this.getAllLoaders();
for (const loader of loaders) {
// skip loader if its already loading or complete
if (loader.complete || loader.loading) continue;
loader.loadNextChunk();
triggeredLoad = true;
}
if (triggeredLoad) this.updateLoading();
}
private updateLoading() {
for (const [relay, loader] of this.chunkLoaders) {
const loaders = this.getAllLoaders();
for (const loader of loaders) {
if (loader.loading) {
if (!this.loading.value) {
this.loading.next(true);
@ -191,7 +206,9 @@ export default class TimelineLoader {
if (this.loading.value) this.loading.next(false);
}
private updateComplete() {
for (const [relay, loader] of this.chunkLoaders) {
const loaders = this.getAllLoaders();
for (const loader of loaders) {
if (!loader.complete) {
this.complete.next(false);
return;
@ -213,8 +230,10 @@ export default class TimelineLoader {
}
reset() {
this.cursor = dayjs().unix();
for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader);
const loaders = this.getAllLoaders();
for (const loader of loaders) this.disconnectToChunkLoader(loader);
this.chunkLoaders.clear();
this.cacheChunkLoader = null;
this.forgetEvents();
}
@ -222,8 +241,10 @@ export default class TimelineLoader {
cleanup() {
this.close();
for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader);
const loaders = this.getAllLoaders();
for (const loader of loaders) this.disconnectToChunkLoader(loader);
this.chunkLoaders.clear();
this.cacheChunkLoader = null;
this.events.cleanup();
}

View File

@ -20,7 +20,7 @@ export function renderWavlakeUrl(match: URL) {
frameBorder="0"
title="Wavlake Embed"
src={embedUrl.toString()}
style={{ width: "100%", height: 354, maxWidth: 573, ...setZIndex }}
style={{ width: "100%", height: 400, maxWidth: 600, ...setZIndex }}
></iframe>
);
}

View File

@ -2,9 +2,9 @@ import { Badge, useForceUpdate } from "@chakra-ui/react";
import { useInterval } from "react-use";
import relayPoolService from "../services/relay-pool";
import { Relay } from "nostr-tools";
import { AbstractRelay } from "nostr-tools";
const getStatusText = (relay: Relay) => {
const getStatusText = (relay: AbstractRelay) => {
// if (relay.connecting) return "Connecting...";
if (relay.connected) return "Connected";
// if (relay.closing) return "Disconnecting...";
@ -12,7 +12,7 @@ const getStatusText = (relay: Relay) => {
return "Disconnected";
// return "Unused";
};
const getStatusColor = (relay: Relay) => {
const getStatusColor = (relay: AbstractRelay) => {
// if (relay.connecting) return "yellow";
if (relay.connected) return "green";
// if (relay.closing) return "yellow";

View File

@ -77,7 +77,7 @@ function EventRow({
export default function TimelineHealth({ timeline }: { timeline: TimelineLoader }) {
const events = useSubject(timeline.timeline);
const relays = Array.from(Object.keys(timeline.queryMap));
const relays = Array.from(Object.keys(timeline.relays));
return (
<>

View File

@ -1,7 +1,5 @@
import stringify from "json-stringify-deterministic";
import { RelayQueryMap } from "../../types/nostr-relay";
import { Filter } from "nostr-tools";
import { safeRelayUrls } from "../relay";
export function mergeFilter(filter: Filter, query: Filter): Filter;
export function mergeFilter(filter: Filter[], query: Filter): Filter[];
@ -18,19 +16,3 @@ export function stringifyFilter(filter: Filter | Filter[]) {
export function isFilterEqual(a: Filter | Filter[], b: Filter | Filter[]) {
return stringifyFilter(a) === stringifyFilter(b);
}
export function isQueryMapEqual(a: RelayQueryMap, b: RelayQueryMap) {
return stringify(a) === stringify(b);
}
export function mapQueryMap(queryMap: RelayQueryMap, fn: (filters: Filter[]) => Filter[]) {
const newMap: RelayQueryMap = {};
for (const [relay, filters] of Object.entries(queryMap)) newMap[relay] = fn(filters);
return newMap;
}
export function createSimpleQueryMap(relays: Iterable<string>, filters: Filter | Filter[]) {
const map: RelayQueryMap = {};
for (const relay of safeRelayUrls(relays)) map[relay] = Array.isArray(filters) ? filters : [filters];
return map;
}

View File

@ -1,11 +1,9 @@
import { useEffect, useMemo } from "react";
import { useUnmount } from "react-use";
import { NostrEvent } from "nostr-tools";
import { Filter, NostrEvent } from "nostr-tools";
import { NostrRequestFilter } from "../types/nostr-relay";
import timelineCacheService from "../services/timeline-cache";
import { EventFilter } from "../classes/timeline-loader";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
type Options = {
/** @deprecated */
@ -18,17 +16,23 @@ type Options = {
export default function useTimelineLoader(
key: string,
relays: Iterable<string>,
query: NostrRequestFilter | undefined,
filters: Filter | Filter[] | undefined,
opts?: Options,
) {
const timeline = useMemo(() => timelineCacheService.createTimeline(key), [key]);
useEffect(() => {
if (query) {
timeline.setQueryMap(createSimpleQueryMap(relays, query));
timeline.setRelays(relays);
timeline.triggerChunkLoad();
}, [Array.from(relays).join("|")]);
useEffect(() => {
if (filters) {
timeline.setFilters(Array.isArray(filters) ? filters : [filters]);
timeline.open();
timeline.triggerChunkLoad();
} else timeline.close();
}, [timeline, JSON.stringify(query), Array.from(relays).join("|")]);
}, [timeline, JSON.stringify(filters)]);
useEffect(() => {
timeline.setEventFilter(opts?.eventFilter);

View File

@ -1,4 +1,4 @@
import { Relay } from "nostr-tools";
import { AbstractRelay } from "nostr-tools";
import { PersistentSubject } from "../classes/subject";
import { getEventUID } from "../helpers/nostr/event";
import { NostrEvent } from "../types/nostr-event";
@ -22,7 +22,7 @@ function addRelay(id: string, relay: string) {
}
}
export function handleEventFromRelay(relay: Relay, event: NostrEvent) {
export function handleEventFromRelay(relay: AbstractRelay, event: NostrEvent) {
const uid = getEventUID(event);
addRelay(uid, relay.url);

View File

@ -1,9 +1,13 @@
import { CacheRelay, openDB } from "nostr-idb";
import { Relay } from "nostr-tools";
import { AbstractRelay, NostrEvent, VerifiedEvent, verifiedSymbol } from "nostr-tools";
import { logger } from "../helpers/debug";
import { safeRelayUrl } from "../helpers/relay";
import WasmRelay from "./wasm-relay";
function fakeVerify(event: NostrEvent): event is VerifiedEvent {
return (event[verifiedSymbol] = true);
}
// save the local relay from query params to localStorage
const params = new URLSearchParams(location.search);
const paramRelay = params.get("localRelay");
@ -17,7 +21,7 @@ if (paramRelay) {
export const NOSTR_RELAY_TRAY_URL = "ws://localhost:4869/";
export async function checkNostrRelayTray() {
return new Promise((res) => {
const test = new Relay(NOSTR_RELAY_TRAY_URL);
const test = new AbstractRelay(NOSTR_RELAY_TRAY_URL, { verifyEvent: fakeVerify });
test
.connect()
.then(() => {
@ -44,13 +48,15 @@ async function createRelay() {
} else if (localRelayURL.startsWith("nostr-idb://")) {
return createInternalRelay();
} else if (safeRelayUrl(localRelayURL)) {
return new Relay(safeRelayUrl(localRelayURL)!);
return new AbstractRelay(safeRelayUrl(localRelayURL)!, { verifyEvent: fakeVerify });
}
} else if (window.satellite) {
return new Relay(await window.satellite.getLocalRelay());
return new AbstractRelay(await window.satellite.getLocalRelay(), { verifyEvent: fakeVerify });
} else if (window.CACHE_RELAY_ENABLED) {
const protocol = location.protocol === "https:" ? "wss:" : "ws:";
return new Relay(new URL(protocol + location.host + "/local-relay").toString());
return new AbstractRelay(new URL(protocol + location.host + "/local-relay").toString(), {
verifyEvent: fakeVerify,
});
}
return createInternalRelay();
}

View File

@ -5,7 +5,6 @@ import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import NostrMultiSubscription from "../classes/nostr-multi-subscription";
import { getPubkeyFromDecodeResult, isHexKey, normalizeToHexPubkey } from "../helpers/nip19";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
import { logger } from "../helpers/debug";
import { DraftNostrEvent, NostrEvent, isPTag } from "../types/nostr-event";
import createDefer, { Deferred } from "../classes/deferred";
@ -84,12 +83,13 @@ export class NostrConnectClient {
this.publicKey = getPublicKey(hexToBytes(this.secretKey));
this.sub.onEvent.subscribe((e) => this.handleEvent(e));
this.sub.setQueryMap(
createSimpleQueryMap(this.relays, {
this.sub.setRelays(this.relays);
this.sub.setFilters([
{
kinds: [kinds.NostrConnect, 24134],
"#p": [this.publicKey],
}),
);
},
]);
}
async open() {

View File

@ -4,5 +4,3 @@ import { Filter } from "nostr-tools";
export type NostrQuery = Filter;
export type NostrRequestFilter = Filter | Filter[];
export type RelayQueryMap = Record<string, Filter[]>;

View File

@ -15,7 +15,7 @@ import {
Text,
useDisclosure,
} from "@chakra-ui/react";
import { NostrEvent, Relay, Subscription } from "nostr-tools";
import { AbstractRelay, NostrEvent, Relay, Subscription } from "nostr-tools";
import { useLocalStorage } from "react-use";
import { Subscription as IDBSubscription, CacheRelay } from "nostr-idb";
import _throttle from "lodash.throttle";
@ -86,7 +86,7 @@ export default function EventConsoleView() {
if (sub) sub.close();
let r: Relay | CacheRelay | WasmRelay = localRelay;
let r: Relay | AbstractRelay | CacheRelay | WasmRelay = localRelay;
if (queryRelay.isOpen) {
const url = validateRelayURL(relayURL);
if (!relay || relay.url !== url.toString()) {