add eose timeout to wasm relay

This commit is contained in:
hzrd149
2024-07-29 10:36:37 -05:00
parent 44ab0894a9
commit a4313d83cc
2 changed files with 79 additions and 22 deletions

View File

@@ -10,6 +10,7 @@ import replaceableEventsService from "./replaceable-events";
import userAppSettings from "./settings/user-app-settings";
import userMailboxesService from "./user-mailboxes";
import userMetadataService from "./user-metadata";
import { USER_BLOSSOM_SERVER_LIST_KIND } from "blossom-client-sdk";
const log = logger.extend("user-event-sync");
@@ -21,6 +22,9 @@ function downloadEvents() {
userMetadataService.requestMetadata(account.pubkey, [...relays, COMMON_CONTACT_RELAY], { alwaysRequest: true });
userMailboxesService.requestMailboxes(account.pubkey, [...relays, COMMON_CONTACT_RELAY], { alwaysRequest: true });
userAppSettings.requestAppSettings(account.pubkey, relays, { alwaysRequest: true });
replaceableEventsService.requestEvent(relays, USER_BLOSSOM_SERVER_LIST_KIND, account.pubkey, undefined, {
alwaysRequest: true,
});
log("Loading contacts list");
replaceableEventsService.requestEvent(

View File

@@ -1,7 +1,9 @@
import { type WorkerRelayInterface } from "@snort/worker-relay";
import { nanoid } from "nanoid";
import { SimpleRelay, Subscription, SubscriptionOptions } from "nostr-idb";
import { SimpleRelay } from "nostr-idb";
import { Filter, NostrEvent } from "nostr-tools";
import { SubscriptionParams } from "nostr-tools/abstract-relay";
import { logger } from "../../helpers/debug";
import { WASM_RELAY_SUPPORTED } from "./supported";
@@ -13,12 +15,8 @@ export default class WasmRelay implements SimpleRelay {
static SUPPORTED = WASM_RELAY_SUPPORTED;
private subscriptions: Map<
string,
SubscriptionOptions & {
filters: Filter[];
}
> = new Map();
public baseEoseTimeout: number = 4400;
private subscriptions: Map<string, WasmRelaySubscription> = new Map();
async connect() {
if (this.connected || this.worker) return;
@@ -45,7 +43,7 @@ export default class WasmRelay implements SimpleRelay {
return await this.worker.count(["REQ", params.id || nanoid(8), ...filters]);
}
private async executeSubscription(sub: Subscription) {
async executeSubscription(sub: WasmRelaySubscription) {
if (!this.worker) throw new Error("Worker not setup");
const start = new Date().valueOf();
@@ -56,28 +54,24 @@ export default class WasmRelay implements SimpleRelay {
const delta = new Date().valueOf() - start;
this.log(`Finished ${sub.id} took ${delta}ms and got ${events.length} events`);
if (sub.onevent) {
for (const event of events) sub.onevent(event);
for (const event of events) {
if (!sub.alreadyHaveEvent || sub.alreadyHaveEvent(event.id)) {
sub.onevent(event);
sub.receivedEvent?.(this, event.id);
}
}
if (sub.oneose) sub.oneose();
});
}
subscribe(filters: Filter[], options: Partial<SubscriptionOptions>): Subscription {
subscribe(filters: Filter[], params: Partial<SubscriptionParams & { id: string }>): WasmRelaySubscription {
// remove any duplicate subscriptions
if (options.id && this.subscriptions.has(options.id)) {
this.subscriptions.delete(options.id);
if (params.id && this.subscriptions.has(params.id)) {
this.subscriptions.delete(params.id);
}
const id = options.id || nanoid(8);
const sub = {
id,
filters,
close: () => this.subscriptions.delete(id),
fire: () => this.executeSubscription(sub),
...options,
};
const id = params.id || nanoid(8);
const sub = new WasmRelaySubscription(this, id, filters, params);
this.subscriptions.set(id, sub);
@@ -94,3 +88,62 @@ export default class WasmRelay implements SimpleRelay {
}
}
}
class WasmRelaySubscription {
readonly relay: WasmRelay;
readonly id: string;
closed: boolean = false;
eosed: boolean = false;
filters: Filter[];
alreadyHaveEvent: ((id: string) => boolean) | undefined;
receivedEvent: ((relay: WasmRelay, id: string) => void) | undefined;
onevent: (evt: NostrEvent) => void;
oneose: (() => void) | undefined;
onclose: ((reason: string) => void) | undefined;
eoseTimeout: number;
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined;
constructor(relay: WasmRelay, id: string, filters: Filter[], params: SubscriptionParams) {
this.relay = relay;
this.filters = filters;
this.id = id;
this.alreadyHaveEvent = params.alreadyHaveEvent;
// @ts-expect-error
this.receivedEvent = params.receivedEvent;
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout;
this.oneose = params.oneose;
this.onclose = params.onclose;
this.onevent =
params.onevent ||
((event) => {
console.warn(
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
event,
);
});
}
public fire() {
this.relay.executeSubscription(this);
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout);
}
public receivedEose() {
if (this.eosed) return;
clearTimeout(this.eoseTimeoutHandle);
this.eosed = true;
this.oneose?.();
}
public close(reason: string = "closed by caller") {
if (!this.closed && this.relay.connected) {
this.relay.unsubscribe(this.id);
this.closed = true;
}
this.onclose?.(reason);
}
}