diff --git a/src/classes/chunked-request.ts b/src/classes/chunked-request.ts new file mode 100644 index 000000000..7bf7ee360 --- /dev/null +++ b/src/classes/chunked-request.ts @@ -0,0 +1,93 @@ +import { Debugger } from "debug"; +import { Filter, NostrEvent, matchFilters } from "nostr-tools"; +import _throttle from "lodash.throttle"; + +import NostrRequest from "./nostr-request"; +import Subject from "./subject"; +import { logger } from "../helpers/debug"; +import EventStore from "./event-store"; +import deleteEventService from "../services/delete-events"; +import { mergeFilter } from "../helpers/nostr/filter"; +import { isATag, isETag } from "../types/nostr-event"; + +const DEFAULT_CHUNK_SIZE = 100; + +export type EventFilter = (event: NostrEvent, store: EventStore) => boolean; + +export default class ChunkedRequest { + relay: string; + filters: Filter[]; + chunkSize = DEFAULT_CHUNK_SIZE; + private log: Debugger; + private subs: ZenObservable.Subscription[] = []; + + loading = false; + events: EventStore; + /** set to true when the next chunk produces 0 events */ + complete = false; + + onChunkFinish = new Subject(); + + constructor(relay: string, filters: Filter[], log?: Debugger) { + this.relay = relay; + this.filters = filters; + + this.log = log || logger.extend(relay); + this.events = new EventStore(relay); + + // TODO: find a better place for this + this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e))); + } + + loadNextChunk() { + this.loading = true; + let filters: Filter[] = mergeFilter(this.filters, { limit: this.chunkSize }); + let oldestEvent = this.getLastEvent(); + if (oldestEvent) { + filters = mergeFilter(filters, { until: oldestEvent.created_at - 1 }); + } + + const request = new NostrRequest([this.relay]); + + let gotEvents = 0; + request.onEvent.subscribe((e) => { + this.handleEvent(e); + gotEvents++; + }); + request.onComplete.then(() => { + this.loading = false; + if (gotEvents === 0) { + this.complete = true; + this.log("Complete"); + } else this.log(`Got ${gotEvents} events`); + this.onChunkFinish.next(gotEvents); + }); + + request.start(filters); + } + + private handleEvent(event: NostrEvent) { + if (!matchFilters(this.filters, event)) return; + return this.events.addEvent(event); + } + + private handleDeleteEvent(deleteEvent: NostrEvent) { + const cord = deleteEvent.tags.find(isATag)?.[1]; + const eventId = deleteEvent.tags.find(isETag)?.[1]; + + if (cord) this.events.deleteEvent(cord); + if (eventId) this.events.deleteEvent(eventId); + } + + cleanup() { + for (const sub of this.subs) sub.unsubscribe(); + this.subs = []; + } + + getFirstEvent(nth = 0, eventFilter?: EventFilter) { + return this.events.getFirstEvent(nth, eventFilter); + } + getLastEvent(nth = 0, eventFilter?: EventFilter) { + return this.events.getLastEvent(nth, eventFilter); + } +} diff --git a/src/classes/nostr-request.ts b/src/classes/nostr-request.ts index 838148a61..264d421f3 100644 --- a/src/classes/nostr-request.ts +++ b/src/classes/nostr-request.ts @@ -1,7 +1,6 @@ import { nanoid } from "nanoid"; +import { Filter, NostrEvent } from "nostr-tools"; -import { NostrEvent } from "../types/nostr-event"; -import { NostrRequestFilter } from "../types/nostr-relay"; import relayPoolService from "../services/relay-pool"; import Relay, { CountResponse, IncomingCount, IncomingEOSE, IncomingEvent } from "./relay"; import createDefer from "./deferred"; @@ -64,7 +63,7 @@ export default class NostrRequest { } } - start(filter: NostrRequestFilter, type: "REQ" | "COUNT" = "REQ") { + start(filter: Filter | Filter[], type: "REQ" | "COUNT" = "REQ") { if (this.state !== NostrRequest.IDLE) { throw new Error("cant restart a nostr request"); } diff --git a/src/classes/timeline-loader.ts b/src/classes/timeline-loader.ts index 792831dfd..bb13119d7 100644 --- a/src/classes/timeline-loader.ts +++ b/src/classes/timeline-loader.ts @@ -1,110 +1,25 @@ import dayjs from "dayjs"; import { Debugger } from "debug"; -import { Filter, matchFilters } from "nostr-tools"; +import { Filter, NostrEvent } from "nostr-tools"; import _throttle from "lodash.throttle"; -import { NostrEvent, isATag, isETag } from "../types/nostr-event"; -import { NostrRequestFilter, RelayQueryMap } from "../types/nostr-relay"; -import NostrRequest from "./nostr-request"; +import { RelayQueryMap } from "../types/nostr-relay"; import NostrMultiSubscription from "./nostr-multi-subscription"; -import Subject, { PersistentSubject } from "./subject"; +import { PersistentSubject } from "./subject"; import { logger } from "../helpers/debug"; import EventStore from "./event-store"; import { isReplaceable } from "../helpers/nostr/event"; import replaceableEventsService from "../services/replaceable-events"; -import deleteEventService from "../services/delete-events"; -import { - addQueryToFilter, - isFilterEqual, - isQueryMapEqual, - mapQueryMap, - stringifyFilter, -} from "../helpers/nostr/filter"; +import { mergeFilter, isFilterEqual, isQueryMapEqual, mapQueryMap, stringifyFilter } from "../helpers/nostr/filter"; import { localRelay } from "../services/local-relay"; import { relayRequest } from "../helpers/relay"; import SuperMap from "./super-map"; +import ChunkedRequest from "./chunked-request"; const BLOCK_SIZE = 100; export type EventFilter = (event: NostrEvent, store: EventStore) => boolean; -export class RelayBlockLoader { - relay: string; - filter: NostrRequestFilter; - blockSize = BLOCK_SIZE; - private log: Debugger; - private subs: ZenObservable.Subscription[] = []; - - loading = false; - events: EventStore; - /** set to true when the next block produces 0 events */ - complete = false; - - onBlockFinish = new Subject(); - - constructor(relay: string, filter: NostrRequestFilter, log?: Debugger) { - this.relay = relay; - this.filter = filter; - - this.log = log || logger.extend(relay); - this.events = new EventStore(relay); - - this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e))); - } - - loadNextBlock() { - this.loading = true; - let filter: NostrRequestFilter = addQueryToFilter(this.filter, { limit: this.blockSize }); - let oldestEvent = this.getLastEvent(); - if (oldestEvent) { - filter = addQueryToFilter(filter, { until: oldestEvent.created_at - 1 }); - } - - const request = new NostrRequest([this.relay]); - - let gotEvents = 0; - request.onEvent.subscribe((e) => { - this.handleEvent(e); - gotEvents++; - }); - request.onComplete.then(() => { - this.loading = false; - if (gotEvents === 0) { - this.complete = true; - this.log("Complete"); - } else this.log(`Got ${gotEvents} events`); - this.onBlockFinish.next(gotEvents); - }); - - request.start(filter); - } - - private handleEvent(event: NostrEvent) { - if (!matchFilters(Array.isArray(this.filter) ? this.filter : [this.filter], event)) return; - return this.events.addEvent(event); - } - - private handleDeleteEvent(deleteEvent: NostrEvent) { - const cord = deleteEvent.tags.find(isATag)?.[1]; - const eventId = deleteEvent.tags.find(isETag)?.[1]; - - if (cord) this.events.deleteEvent(cord); - if (eventId) this.events.deleteEvent(eventId); - } - - cleanup() { - for (const sub of this.subs) sub.unsubscribe(); - this.subs = []; - } - - getFirstEvent(nth = 0, eventFilter?: EventFilter) { - return this.events.getFirstEvent(nth, eventFilter); - } - getLastEvent(nth = 0, eventFilter?: EventFilter) { - return this.events.getLastEvent(nth, eventFilter); - } -} - export default class TimelineLoader { cursor = dayjs().unix(); queryMap: RelayQueryMap = {}; @@ -121,7 +36,7 @@ export default class TimelineLoader { private log: Debugger; private subscription: NostrMultiSubscription; - private blockLoaders = new Map(); + private chunkLoaders = new Map(); constructor(name: string) { this.name = name; @@ -152,20 +67,23 @@ export default class TimelineLoader { this.events.addEvent(event); if (cache) localRelay.publish(event); } - - private blockLoaderSubs = new SuperMap(() => []); - private connectToBlockLoader(loader: RelayBlockLoader) { - this.events.connect(loader.events); - const subs = this.blockLoaderSubs.get(loader); - subs.push(loader.onBlockFinish.subscribe(this.updateLoading.bind(this))); - subs.push(loader.onBlockFinish.subscribe(this.updateComplete.bind(this))); + private handleChunkFinished() { + this.updateLoading(); + this.updateComplete(); } - private disconnectToBlockLoader(loader: RelayBlockLoader) { + + private chunkLoaderSubs = new SuperMap(() => []); + private connectToChunkLoader(loader: ChunkedRequest) { + this.events.connect(loader.events); + const subs = this.chunkLoaderSubs.get(loader); + subs.push(loader.onChunkFinish.subscribe(this.handleChunkFinished.bind(this))); + } + private disconnectToChunkLoader(loader: ChunkedRequest) { loader.cleanup(); this.events.disconnect(loader.events); - const subs = this.blockLoaderSubs.get(loader); + const subs = this.chunkLoaderSubs.get(loader); for (const sub of subs) sub.unsubscribe(); - this.blockLoaderSubs.delete(loader); + this.chunkLoaderSubs.delete(loader); } private loadQueriesFromCache(queryMap: RelayQueryMap) { @@ -189,26 +107,26 @@ export default class TimelineLoader { // remove relays for (const relay of Object.keys(this.queryMap)) { - const loader = this.blockLoaders.get(relay); + const loader = this.chunkLoaders.get(relay); if (!loader) continue; if (!queryMap[relay]) { - this.disconnectToBlockLoader(loader); - this.blockLoaders.delete(relay); + this.disconnectToChunkLoader(loader); + this.chunkLoaders.delete(relay); } } for (const [relay, filter] of Object.entries(queryMap)) { // remove outdated loaders if (this.queryMap[relay] && !isFilterEqual(this.queryMap[relay], filter)) { - const old = this.blockLoaders.get(relay)!; - this.disconnectToBlockLoader(old); - this.blockLoaders.delete(relay); + const old = this.chunkLoaders.get(relay)!; + this.disconnectToChunkLoader(old); + this.chunkLoaders.delete(relay); } - if (!this.blockLoaders.has(relay)) { - const loader = new RelayBlockLoader(relay, filter, this.log.extend(relay)); - this.blockLoaders.set(relay, loader); - this.connectToBlockLoader(loader); + if (!this.chunkLoaders.has(relay)) { + const loader = new ChunkedRequest(relay, Array.isArray(filter) ? filter : [filter], this.log.extend(relay)); + this.chunkLoaders.set(relay, loader); + this.connectToChunkLoader(loader); } } @@ -219,10 +137,10 @@ export default class TimelineLoader { // update the subscription query map and add limit this.subscription.setQueryMap( - mapQueryMap(this.queryMap, (filter) => addQueryToFilter(filter, { limit: BLOCK_SIZE / 2 })), + mapQueryMap(this.queryMap, (filter) => mergeFilter(filter, { limit: BLOCK_SIZE / 2 })), ); - this.triggerBlockLoads(); + this.triggerChunkLoad(); } setEventFilter(filter?: EventFilter) { @@ -231,33 +149,33 @@ export default class TimelineLoader { } setCursor(cursor: number) { this.cursor = cursor; - this.triggerBlockLoads(); + this.triggerChunkLoad(); } - triggerBlockLoads() { + triggerChunkLoad() { let triggeredLoad = false; - for (const [relay, loader] of this.blockLoaders) { + for (const [relay, loader] of this.chunkLoaders) { if (loader.complete || loader.loading) continue; const event = loader.getLastEvent(this.loadNextBlockBuffer, this.eventFilter); if (!event || event.created_at >= this.cursor) { - loader.loadNextBlock(); + loader.loadNextChunk(); triggeredLoad = true; } } if (triggeredLoad) this.updateLoading(); } - loadNextBlock() { + loadAllNextChunks() { let triggeredLoad = false; - for (const [relay, loader] of this.blockLoaders) { + for (const [relay, loader] of this.chunkLoaders) { if (loader.complete || loader.loading) continue; - loader.loadNextBlock(); + loader.loadNextChunk(); triggeredLoad = true; } if (triggeredLoad) this.updateLoading(); } private updateLoading() { - for (const [relay, loader] of this.blockLoaders) { + for (const [relay, loader] of this.chunkLoaders) { if (loader.loading) { if (!this.loading.value) { this.loading.next(true); @@ -268,7 +186,7 @@ export default class TimelineLoader { if (this.loading.value) this.loading.next(false); } private updateComplete() { - for (const [relay, loader] of this.blockLoaders) { + for (const [relay, loader] of this.chunkLoaders) { if (!loader.complete) { this.complete.next(false); return; @@ -290,8 +208,8 @@ export default class TimelineLoader { } reset() { this.cursor = dayjs().unix(); - for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader); - this.blockLoaders.clear(); + for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader); + this.chunkLoaders.clear(); this.forgetEvents(); } @@ -299,8 +217,8 @@ export default class TimelineLoader { cleanup() { this.close(); - for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader); - this.blockLoaders.clear(); + for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader); + this.chunkLoaders.clear(); this.events.cleanup(); } diff --git a/src/components/timeline-page/timeline-action-and-status.tsx b/src/components/timeline-page/timeline-action-and-status.tsx index caf18110b..81fef51a3 100644 --- a/src/components/timeline-page/timeline-action-and-status.tsx +++ b/src/components/timeline-page/timeline-action-and-status.tsx @@ -21,7 +21,14 @@ export default function TimelineActionAndStatus({ timeline }: { timeline: Timeli } return ( - ); diff --git a/src/components/user/user-dns-identity-icon.tsx b/src/components/user/user-dns-identity-icon.tsx index c930f7214..967908168 100644 --- a/src/components/user/user-dns-identity-icon.tsx +++ b/src/components/user/user-dns-identity-icon.tsx @@ -4,7 +4,7 @@ import { useDnsIdentity } from "../../hooks/use-dns-identity"; import { useUserMetadata } from "../../hooks/use-user-metadata"; import { VerificationFailed, VerificationMissing, VerifiedIcon } from "../icons"; -export const UserDnsIdentityIcon = ({ pubkey, onlyIcon }: { pubkey: string; onlyIcon?: boolean }) => { +export function UserDnsIdentityIcon({ pubkey, onlyIcon }: { pubkey: string; onlyIcon?: boolean }) { const metadata = useUserMetadata(pubkey); const identity = useDnsIdentity(metadata?.nip05); @@ -32,4 +32,6 @@ export const UserDnsIdentityIcon = ({ pubkey, onlyIcon }: { pubkey: string; only {metadata.nip05.startsWith("_@") ? metadata.nip05.substr(2) : metadata.nip05} {renderIcon()} ); -}; +} + +export default UserDnsIdentityIcon; diff --git a/src/helpers/nostr/filter.ts b/src/helpers/nostr/filter.ts index 87c33133e..38cffd7ea 100644 --- a/src/helpers/nostr/filter.ts +++ b/src/helpers/nostr/filter.ts @@ -1,19 +1,21 @@ import stringify from "json-stringify-deterministic"; -import { NostrRequestFilter, RelayQueryMap } from "../../types/nostr-relay"; +import { RelayQueryMap } from "../../types/nostr-relay"; import { Filter } from "nostr-tools"; import { safeRelayUrls } from "../relay"; -export function addQueryToFilter(filter: NostrRequestFilter, query: Filter) { +export function mergeFilter(filter: Filter, query: Filter): Filter; +export function mergeFilter(filter: Filter[], query: Filter): Filter[]; +export function mergeFilter(filter: Filter | Filter[], query: Filter) { if (Array.isArray(filter)) { return filter.map((f) => ({ ...f, ...query })); } return { ...filter, ...query }; } -export function stringifyFilter(filter: NostrRequestFilter) { +export function stringifyFilter(filter: Filter | Filter[]) { return stringify(filter); } -export function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) { +export function isFilterEqual(a: Filter | Filter[], b: Filter | Filter[]) { return stringifyFilter(a) === stringifyFilter(b); } @@ -21,14 +23,14 @@ export function isQueryMapEqual(a: RelayQueryMap, b: RelayQueryMap) { return stringify(a) === stringify(b); } -export function mapQueryMap(queryMap: RelayQueryMap, fn: (filter: NostrRequestFilter) => NostrRequestFilter) { +export function mapQueryMap(queryMap: RelayQueryMap, fn: (filters: Filter[]) => Filter[]) { const newMap: RelayQueryMap = {}; - for (const [relay, filter] of Object.entries(queryMap)) newMap[relay] = fn(filter); + for (const [relay, filters] of Object.entries(queryMap)) newMap[relay] = fn(filters); return newMap; } -export function createSimpleQueryMap(relays: Iterable, filter: NostrRequestFilter) { +export function createSimpleQueryMap(relays: Iterable, filters: Filter | Filter[]) { const map: RelayQueryMap = {}; - for (const relay of safeRelayUrls(relays)) map[relay] = filter; + for (const relay of safeRelayUrls(relays)) map[relay] = Array.isArray(filters) ? filters : [filters]; return map; } diff --git a/src/hooks/use-timeline-cursor-intersection-callback.ts b/src/hooks/use-timeline-cursor-intersection-callback.ts index d4014472f..a72d60c3b 100644 --- a/src/hooks/use-timeline-cursor-intersection-callback.ts +++ b/src/hooks/use-timeline-cursor-intersection-callback.ts @@ -7,7 +7,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader) // if the cursor is set too far ahead and the last block did not overlap with the cursor // we need to keep loading blocks until the timeline is complete or the blocks pass the cursor useInterval(() => { - timeline.triggerBlockLoads(); + timeline.triggerChunkLoad(); }, 1000); return useIntersectionMapCallback( @@ -25,7 +25,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader) if (oldestEvent) { timeline.setCursor(oldestEvent.created_at); - timeline.triggerBlockLoads(); + timeline.triggerChunkLoad(); } }, [timeline], diff --git a/src/hooks/use-timeline-loader.ts b/src/hooks/use-timeline-loader.ts index 8dcc593f6..13d214841 100644 --- a/src/hooks/use-timeline-loader.ts +++ b/src/hooks/use-timeline-loader.ts @@ -1,10 +1,10 @@ import { useEffect, useMemo } from "react"; import { useUnmount } from "react-use"; +import { NostrEvent } from "nostr-tools"; import { NostrRequestFilter } from "../types/nostr-relay"; import timelineCacheService from "../services/timeline-cache"; import { EventFilter } from "../classes/timeline-loader"; -import { NostrEvent } from "../types/nostr-event"; import { createSimpleQueryMap } from "../helpers/nostr/filter"; type Options = { diff --git a/src/services/replaceable-events.ts b/src/services/replaceable-events.ts index b7d5336eb..7648e42b1 100644 --- a/src/services/replaceable-events.ts +++ b/src/services/replaceable-events.ts @@ -1,5 +1,5 @@ -import _throttle from "lodash/throttle"; import { Filter, NostrEvent } from "nostr-tools"; +import _throttle from "lodash.throttle"; import SuperMap from "../classes/super-map"; import { logger } from "../helpers/debug"; diff --git a/src/types/nostr-relay.ts b/src/types/nostr-relay.ts index f9b5a1e99..ed6c89d3e 100644 --- a/src/types/nostr-relay.ts +++ b/src/types/nostr-relay.ts @@ -5,4 +5,4 @@ export type NostrQuery = Filter; export type NostrRequestFilter = Filter | Filter[]; -export type RelayQueryMap = Record; +export type RelayQueryMap = Record;