use local cache relay for single events

This commit is contained in:
hzrd149 2024-01-11 21:10:57 +00:00
parent 3204258e36
commit a92b93f28c
13 changed files with 82 additions and 118 deletions

View File

@ -6,6 +6,8 @@ volumes:
services:
relay:
image: scsibug/nostr-rs-relay:0.8.13
ports:
- 5000:8080
volumes:
- data:/usr/src/app/db
app:

View File

@ -54,7 +54,7 @@
"match-sorter": "^6.3.1",
"nanoid": "^5.0.4",
"ngeohash": "^0.6.3",
"nostr-idb": "/home/robert/Projects/nostr-idb/nostr-idb-v0.1.2.tgz",
"nostr-idb": "^0.2.0",
"nostr-tools": "^2.1.3",
"react": "^18.2.0",
"react-chartjs-2": "^5.2.0",

View File

@ -1,15 +1,11 @@
import { nanoid } from "nanoid";
import stringify from "json-stringify-deterministic";
import { Subject } from "./subject";
import { NostrEvent } from "../types/nostr-event";
import { NostrOutgoingRequest, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query";
import Relay, { IncomingEvent } from "./relay";
import relayPoolService from "../services/relay-pool";
function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) {
return stringify(a) === stringify(b);
}
import { isFilterEqual } from "../helpers/nostr/filter";
export default class NostrMultiSubscription {
static INIT = "initial";

View File

@ -11,7 +11,10 @@ import EventStore from "./event-store";
import { isReplaceable } from "../helpers/nostr/events";
import replaceableEventLoaderService from "../services/replaceable-event-requester";
import deleteEventService from "../services/delete-events";
import { addQueryToFilter, isFilterEqual, mapQueryMap } from "../helpers/nostr/filter";
import { addQueryToFilter, isFilterEqual, mapQueryMap, stringifyFilter } from "../helpers/nostr/filter";
import { localCacheRelay } from "../services/local-cache-relay";
import { SimpleSubscription } from "nostr-idb";
import { Filter } from "nostr-tools";
const BLOCK_SIZE = 100;
@ -106,6 +109,7 @@ export default class TimelineLoader {
name: string;
private log: Debugger;
private subscription: NostrMultiSubscription;
private cacheSubscription?: SimpleSubscription;
private blockLoaders = new Map<string, RelayBlockLoader>();
@ -131,12 +135,12 @@ export default class TimelineLoader {
this.timeline.next(this.events.getSortedEvents().filter((e) => filter(e, this.events)));
} else this.timeline.next(this.events.getSortedEvents());
}
private handleEvent(event: NostrEvent) {
private handleEvent(event: NostrEvent, cache = true) {
// if this is a replaceable event, mirror it over to the replaceable event service
if (isReplaceable(event.kind)) {
replaceableEventLoaderService.handleEvent(event);
}
if (isReplaceable(event.kind)) replaceableEventLoaderService.handleEvent(event);
this.events.addEvent(event);
if (cache) localCacheRelay.publish(event);
}
private handleDeleteEvent(deleteEvent: NostrEvent) {
const cord = deleteEvent.tags.find(isATag)?.[1];
@ -158,6 +162,21 @@ export default class TimelineLoader {
loader.onBlockFinish.unsubscribe(this.updateComplete, this);
}
private loadQueriesFromCache(queryMap: RelayQueryMap) {
const queries: Record<string, Filter[]> = {};
for (const [url, filters] of Object.entries(queryMap)) {
const key = stringifyFilter(filters);
if (!queries[key]) queries[key] = Array.isArray(filters) ? filters : [filters];
}
for (const filters of Object.values(queries)) {
const sub: SimpleSubscription = localCacheRelay.subscribe(filters, {
onevent: (e) => this.handleEvent(e, false),
oneose: () => sub.close(),
});
}
}
setQueryMap(queryMap: RelayQueryMap) {
if (isFilterEqual(this.queryMap, queryMap)) return;
@ -190,6 +209,9 @@ export default class TimelineLoader {
this.queryMap = queryMap;
// load all filters from cache relay
this.loadQueriesFromCache(queryMap);
// update the subscription query map and add limit
this.subscription.setQueryMap(
mapQueryMap(this.queryMap, (filter) => addQueryToFilter(filter, { limit: BLOCK_SIZE / 2 })),

View File

@ -1,6 +1,6 @@
import stringify from "json-stringify-deterministic";
import { NostrQuery, NostrRequestFilter, RelayQueryMap } from "../../types/nostr-query";
import localCacheRelayService, { LOCAL_CACHE_RELAY } from "../../services/local-cache-relay";
import { LOCAL_CACHE_RELAY, LOCAL_CACHE_RELAY_ENABLED } from "../../services/local-cache-relay";
export function addQueryToFilter(filter: NostrRequestFilter, query: NostrQuery) {
if (Array.isArray(filter)) {
@ -9,8 +9,11 @@ export function addQueryToFilter(filter: NostrRequestFilter, query: NostrQuery)
return { ...filter, ...query };
}
export function stringifyFilter(filter: NostrRequestFilter) {
return stringify(filter);
}
export function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) {
return stringify(a) === stringify(b);
return stringifyFilter(a) === stringifyFilter(b);
}
export function mapQueryMap(queryMap: RelayQueryMap, fn: (filter: NostrRequestFilter) => NostrRequestFilter) {
@ -23,7 +26,7 @@ export function createSimpleQueryMap(relays: string[], filter: NostrRequestFilte
const map: RelayQueryMap = {};
// if the local cache relay is enabled, also ask it
if (localCacheRelayService.enabled) {
if (LOCAL_CACHE_RELAY_ENABLED) {
map[LOCAL_CACHE_RELAY] = filter;
}

View File

@ -12,7 +12,7 @@ import { logger } from "../helpers/debug";
import db from "./db";
import createDefer, { Deferred } from "../classes/deferred";
import { getChannelPointer } from "../helpers/nostr/channel";
import localCacheRelayService, { LOCAL_CACHE_RELAY } from "./local-cache-relay";
import { LOCAL_CACHE_RELAY, LOCAL_CACHE_RELAY_ENABLED } from "./local-cache-relay";
type Pubkey = string;
type Relay = string;
@ -231,7 +231,7 @@ class ChannelMetadataService {
const sub = this.metadata.get(channelId);
const relayUrls = Array.from(relays);
if (localCacheRelayService.enabled) {
if (LOCAL_CACHE_RELAY_ENABLED) {
relayUrls.unshift(LOCAL_CACHE_RELAY);
}
for (const relay of relayUrls) {

View File

@ -1,29 +0,0 @@
import { NostrEvent } from "../../types/nostr-event";
import { logger } from "../../helpers/debug";
const log = logger.extend("WriteCache");
const writeCache: NostrEvent[] = [];
export function addEventToCache(event: NostrEvent) {
writeCache.push(event);
}
export function addEventsToCache(events: NostrEvent[]) {
for (const event of events) {
writeCache.push(event);
}
}
async function writeChunk(size = 1000) {
const events: NostrEvent[] = [];
for (let i = 0; i < size; i++) {
if (writeCache.length === 0) break;
const e = writeCache.pop();
if (e) events.push(e);
}
if (events.length > 0) {
log(`Wrote ${events.length} to cache`);
}
}
setInterval(writeChunk, 1000);

View File

@ -8,7 +8,7 @@ import relayScoreboardService from "./relay-scoreboard";
import { logger } from "../helpers/debug";
import { matchFilter, matchFilters } from "nostr-tools";
import { NostrEvent } from "../types/nostr-event";
import localCacheRelayService, { LOCAL_CACHE_RELAY } from "./local-cache-relay";
import { LOCAL_CACHE_RELAY, LOCAL_CACHE_RELAY_ENABLED } from "./local-cache-relay";
function hashFilter(filter: NostrRequestFilter) {
// const encoder = new TextEncoder();
@ -43,7 +43,7 @@ class EventExistsService {
if (sub.value !== true) {
const relayUrls = Array.from(relays);
if (localCacheRelayService.enabled) relayUrls.unshift(LOCAL_CACHE_RELAY);
if (LOCAL_CACHE_RELAY_ENABLED) relayUrls.unshift(LOCAL_CACHE_RELAY);
for (const url of relayUrls) {
if (!asked.has(url) && !pending.has(url)) {

View File

@ -1,10 +1,9 @@
import { CacheRelay, openDB } from "nostr-idb";
import { Relay } from "nostr-tools";
import { logger } from "../helpers/debug";
import { NostrEvent } from "../types/nostr-event";
import relayPoolService from "./relay-pool";
import _throttle from "lodash.throttle";
const log = logger.extend(`LocalCacheRelay`);
const params = new URLSearchParams(location.search);
const paramRelay = params.get("cacheRelay");
@ -19,69 +18,34 @@ const storedCacheRelayURL = localStorage.getItem("cacheRelay");
const url = (storedCacheRelayURL && new URL(storedCacheRelayURL)) || new URL("/cache-relay", location.href);
url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
export const CACHE_RELAY_ENABLED = !!window.CACHE_RELAY_ENABLED || !!localStorage.getItem("cacheRelay");
export const LOCAL_CACHE_RELAY_ENABLED = !!window.CACHE_RELAY_ENABLED || !!localStorage.getItem("cacheRelay");
export const LOCAL_CACHE_RELAY = url.toString();
export const localCacheDatabase = await openDB();
export const localCacheRelay = CACHE_RELAY_ENABLED ? new Relay(LOCAL_CACHE_RELAY) : new CacheRelay(localCacheDatabase);
await localCacheRelay.connect();
function createRelay() {
if (LOCAL_CACHE_RELAY_ENABLED) {
log(`Using ${LOCAL_CACHE_RELAY}`);
return new Relay(LOCAL_CACHE_RELAY);
} else {
log(`Using IndexedDB`);
return new CacheRelay(localCacheDatabase);
}
}
export const localCacheRelay = createRelay();
// connect without waiting
localCacheRelay.connect().then(() => {
log("Connected");
});
// keep the relay connection alive
setInterval(() => {
if (!localCacheRelay.connected) localCacheRelay.connect();
if (!localCacheRelay.connected) localCacheRelay.connect().then(() => log("Reconnected"));
}, 1000 * 5);
const wroteEvents = new Set<string>();
const writeQueue: NostrEvent[] = [];
const BATCH_WRITE = 100;
const log = logger.extend(`LocalCacheRelay`);
async function flush() {
for (let i = 0; i < BATCH_WRITE; i++) {
const e = writeQueue.pop();
if (!e) continue;
relayPoolService.requestRelay(LOCAL_CACHE_RELAY).send(["EVENT", e]);
}
}
function report() {
if (writeQueue.length) {
log(`${writeQueue.length} events in write queue`);
}
}
function addToQueue(e: NostrEvent) {
if (!CACHE_RELAY_ENABLED) return;
if (!wroteEvents.has(e.id)) {
wroteEvents.add(e.id);
writeQueue.push(e);
}
}
if (CACHE_RELAY_ENABLED) {
log("Enabled");
relayPoolService.onRelayCreated.subscribe((relay) => {
if (relay.url !== LOCAL_CACHE_RELAY) {
relay.onEvent.subscribe((incomingEvent) => addToQueue(incomingEvent.body));
}
});
}
const localCacheRelayService = {
enabled: CACHE_RELAY_ENABLED,
addToQueue,
};
setInterval(() => {
if (CACHE_RELAY_ENABLED) flush();
}, 1000);
setInterval(() => {
if (CACHE_RELAY_ENABLED) report();
}, 1000 * 10);
if (import.meta.env.DEV) {
//@ts-ignore
window.localCacheRelayService = localCacheRelayService;
window.localCacheRelay = localCacheRelay;
}
export default localCacheRelayService;

View File

@ -255,7 +255,9 @@ class RelayScoreboardService {
const relayScoreboardService = new RelayScoreboardService();
relayScoreboardService.loadStats();
setTimeout(() => {
relayScoreboardService.loadStats();
}, 0);
setInterval(() => {
relayScoreboardService.saveStats();

View File

@ -12,7 +12,7 @@ import db from "./db";
import { nameOrPubkey } from "./user-metadata";
import { getEventCoordinate } from "../helpers/nostr/events";
import createDefer, { Deferred } from "../classes/deferred";
import localCacheRelayService, { LOCAL_CACHE_RELAY } from "./local-cache-relay";
import { LOCAL_CACHE_RELAY, LOCAL_CACHE_RELAY_ENABLED, localCacheRelay } from "./local-cache-relay";
type Pubkey = string;
type Relay = string;
@ -33,7 +33,7 @@ export function createCoordinate(kind: number, pubkey: string, d?: string) {
return `${kind}:${pubkey}${d ? ":" + d : ""}`;
}
const RELAY_REQUEST_BATCH_TIME = 1000;
const RELAY_REQUEST_BATCH_TIME = 500;
/** This class is ued to batch requests to a single relay */
class ReplaceableEventRelayLoader {
@ -225,6 +225,8 @@ class ReplaceableEventLoaderService {
this.dbLog(`Writing ${this.writeCacheQueue.size} events to database`);
const transaction = db.transaction("replaceableEvents", "readwrite");
for (const [cord, event] of this.writeCacheQueue) {
localCacheRelay.publish(event);
// TODO: remove this
transaction.objectStore("replaceableEvents").put({ addr: cord, event, created: dayjs().unix() });
}
this.writeCacheQueue.clear();
@ -236,6 +238,7 @@ class ReplaceableEventLoaderService {
this.writeToCacheThrottle();
}
/** @deprecated */
async pruneDatabaseCache() {
const keys = await db.getAllKeysFromIndex(
"replaceableEvents",
@ -257,7 +260,8 @@ class ReplaceableEventLoaderService {
const sub = this.events.get(cord);
const relayUrls = Array.from(relays);
if (localCacheRelayService.enabled) relayUrls.unshift(LOCAL_CACHE_RELAY);
// TODO: use localCacheRelay instead
if (LOCAL_CACHE_RELAY_ENABLED) relayUrls.unshift(LOCAL_CACHE_RELAY);
for (const relay of relayUrls) {
const request = this.loaders.get(relay).requestEvent(kind, pubkey, d);

View File

@ -6,10 +6,9 @@ import Subject from "../classes/subject";
import SuperMap from "../classes/super-map";
import { safeRelayUrls } from "../helpers/url";
import { NostrEvent } from "../types/nostr-event";
import localCacheRelayService, { LOCAL_CACHE_RELAY, localCacheRelay } from "./local-cache-relay";
import { addEventToCache } from "./db/cache";
import { localCacheRelay } from "./local-cache-relay";
const RELAY_REQUEST_BATCH_TIME = 1000;
const RELAY_REQUEST_BATCH_TIME = 500;
class SingleEventService {
private cache = new SuperMap<string, Subject<NostrEvent>>(() => new Subject());
@ -20,16 +19,16 @@ class SingleEventService {
if (subject.value) return subject;
const newUrls = safeRelayUrls(relays);
if (localCacheRelayService.enabled) newUrls.push(LOCAL_CACHE_RELAY);
this.pending.set(id, this.pending.get(id)?.concat(newUrls) ?? newUrls);
this.batchRequestsThrottle();
return subject;
}
handleEvent(event: NostrEvent) {
handleEvent(event: NostrEvent, cache = true) {
this.cache.get(event.id).next(event);
addEventToCache(event);
if (cache) localCacheRelay.publish(event);
}
private batchRequestsThrottle = _throttle(this.batchRequests, RELAY_REQUEST_BATCH_TIME);
@ -38,7 +37,7 @@ class SingleEventService {
// load events from local cache relay
const sub: SimpleSubscription = localCacheRelay.subscribe([{ ids: Array.from(this.pending.keys()) }], {
onevent: (e) => this.handleEvent(e),
onevent: (e) => this.handleEvent(e, false),
oneose: () => sub.close(),
});

View File

@ -5262,9 +5262,10 @@ normalize-package-data@^2.5.0:
semver "2 || 3 || 4 || 5"
validate-npm-package-license "^3.0.1"
"nostr-idb@file:../nostr-idb/nostr-idb-v0.1.2.tgz":
version "0.1.2"
resolved "file:../nostr-idb/nostr-idb-v0.1.2.tgz#4b9f4c4f6fe5d3c22347ac7e9758208b05ef5e68"
nostr-idb@^0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/nostr-idb/-/nostr-idb-0.2.0.tgz#50b74b078333d187be871c3d9086dfaebfa92183"
integrity sha512-BLqLemCzGR88Wa2gVPobmsdWondpDvbMwSgPsCjZlvflQNXpmCXCN1xJbq0j+RyC88guciW3D6yj7+477xg/9g==
dependencies:
idb "^8.0.0"
nostr-tools "^1.17.0"