fix eose timeout on persistent subscription

This commit is contained in:
hzrd149 2024-05-03 18:43:21 -05:00
parent 2e6b79ef6a
commit 7df7de7ac7
7 changed files with 29 additions and 36 deletions

View File

@ -25,6 +25,7 @@ export default class BatchKindLoader {
process: Process;
private requestNext = new Set<string>();
private requested = new Map<string, Date>();
private promises = new Map<string, Deferred<NostrEvent | null>>();
@ -44,36 +45,22 @@ export default class BatchKindLoader {
// remove the key from the waiting list
this.requested.delete(key);
const defer = this.promises.get(key);
if (defer) this.promises.delete(key);
const current = this.events.getEvent(key);
if (!current || event.created_at > current.created_at) {
this.events.addEvent(event);
// if there is a promise waiting, resolve with event
const defer = this.promises.get(key);
if (defer) {
this.promises.delete(key);
defer.resolve(event);
}
}
if (defer) defer.resolve(event);
} else if (defer) defer.resolve(null);
}
private handleEOSE() {
// relays says it has nothing left
this.requested.clear();
// prune requests
const timeout = dayjs().subtract(1, "minute");
for (const [key, date] of this.requested) {
if (dayjs(date).isBefore(timeout)) {
this.requested.delete(key);
// if there is a promise waiting for this event, resolve null
const defer = this.promises.get(key);
if (defer) {
this.promises.delete(key);
defer.resolve(null);
}
}
for (const [key, defer] of this.promises) {
this.requested.delete(key);
defer.resolve(null);
}
this.promises.clear();
}
requestEvent(kind: number, pubkey: string, d?: string): Promise<NostrEvent | null> {
@ -97,6 +84,9 @@ export default class BatchKindLoader {
updateThrottle = _throttle(this.update, RELAY_REQUEST_BATCH_TIME);
async update() {
// skip the update if the subscription is running
if (this.subscription?.closed === false && !this.subscription.eosed) return;
let needsUpdate = false;
for (const key of this.requestNext) {
if (!this.requested.has(key)) {
@ -143,7 +133,7 @@ export default class BatchKindLoader {
}
this.subscription.filters = query;
this.subscription.fire();
this.subscription.update();
this.process.active = true;
} else if (this.subscription) {
this.subscription.close();

View File

@ -97,7 +97,7 @@ export default class MultiSubscription {
if (subscription) {
subscription.filters = this.filters;
subscription.fire();
subscription.update();
}
}
}
@ -113,7 +113,7 @@ export default class MultiSubscription {
// update cache sub filters if they changed
if (this.cacheSubscription && !isFilterEqual(this.cacheSubscription.filters, this.filters)) {
this.cacheSubscription.filters = this.filters;
this.cacheSubscription.fire();
this.cacheSubscription.update();
}
}

View File

@ -33,18 +33,19 @@ export default class PersistentSubscription {
processManager.registerProcess(this.process);
}
async fire() {
async update() {
if (!this.filters || this.filters.length === 0) return this;
if (!(await relayPoolService.waitForOpen(this.relay))) return;
// recreate the subscription since strfry and other relays reject subscription updates
// recreate the subscription since nostream and other relays reject subscription updates
// if (this.subscription?.closed === false) {
// this.closed = true;
// this.subscription.close();
// }
this.closed = false;
this.eosed = false;
this.process.active = true;
// recreate the subscription if its closed since nostr-tools cant reopen a sub
@ -66,11 +67,13 @@ export default class PersistentSubscription {
this.params.onclose?.(reason);
},
});
} else {
this.subscription.filters = this.filters;
// NOTE: reset the eosed flag since nostr-tools dose not
this.subscription.eosed = false;
this.subscription.fire();
}
this.subscription.filters = this.filters;
this.subscription.fire();
return this;
}
close() {

View File

@ -77,6 +77,9 @@ async function connectRelay() {
log("Connected");
if (relay instanceof AbstractRelay) {
// set the base timeout to 1 second
relay.baseEoseTimeout = 1000;
relayPoolService.relays.set(relay.url, relay);
relay.onnotice = (notice) => relayPoolService.handleRelayNotice(relay, notice);
}

View File

@ -55,7 +55,7 @@ class ReplaceableEventsService {
if (localRelay) {
this.cacheLoader = new BatchKindLoader(localRelay as AbstractRelay, this.log.extend("cache-relay"));
this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e));
this.cacheLoader.events.onEvent.subscribe((e) => this.handleEvent(e, false));
this.process.addChild(this.cacheLoader.process);
}
}
@ -113,7 +113,6 @@ class ReplaceableEventsService {
}
if (opts?.alwaysRequest || !this.cacheLoader || (!sub.value && opts.ignoreCache)) {
this.log("Skipping cache for", key);
this.requestEventFromRelays(relays, kind, pubkey, d);
}

View File

@ -75,7 +75,7 @@ class SingleEventLoader {
} else {
// TODO: might be good to check if the ids have changed since last filter
subscription.filters = [{ ids: Array.from(ids) }];
subscription.fire();
subscription.update();
}
}
}

View File

@ -29,8 +29,6 @@ function VerifyEventSettings() {
raw: true,
});
console.log(selectedMethod, verifyEventMethod);
return (
<>
<FormControl>