cleanup core classes

This commit is contained in:
hzrd149 2024-02-16 12:33:55 +00:00
parent 8bf1db3d26
commit 21c9400c94
10 changed files with 166 additions and 145 deletions

View File

@ -0,0 +1,93 @@
import { Debugger } from "debug";
import { Filter, NostrEvent, matchFilters } from "nostr-tools";
import _throttle from "lodash.throttle";
import NostrRequest from "./nostr-request";
import Subject from "./subject";
import { logger } from "../helpers/debug";
import EventStore from "./event-store";
import deleteEventService from "../services/delete-events";
import { mergeFilter } from "../helpers/nostr/filter";
import { isATag, isETag } from "../types/nostr-event";
const DEFAULT_CHUNK_SIZE = 100;
export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
export default class ChunkedRequest {
relay: string;
filters: Filter[];
chunkSize = DEFAULT_CHUNK_SIZE;
private log: Debugger;
private subs: ZenObservable.Subscription[] = [];
loading = false;
events: EventStore;
/** set to true when the next chunk produces 0 events */
complete = false;
onChunkFinish = new Subject<number>();
constructor(relay: string, filters: Filter[], log?: Debugger) {
this.relay = relay;
this.filters = filters;
this.log = log || logger.extend(relay);
this.events = new EventStore(relay);
// TODO: find a better place for this
this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e)));
}
loadNextChunk() {
this.loading = true;
let filters: Filter[] = mergeFilter(this.filters, { limit: this.chunkSize });
let oldestEvent = this.getLastEvent();
if (oldestEvent) {
filters = mergeFilter(filters, { until: oldestEvent.created_at - 1 });
}
const request = new NostrRequest([this.relay]);
let gotEvents = 0;
request.onEvent.subscribe((e) => {
this.handleEvent(e);
gotEvents++;
});
request.onComplete.then(() => {
this.loading = false;
if (gotEvents === 0) {
this.complete = true;
this.log("Complete");
} else this.log(`Got ${gotEvents} events`);
this.onChunkFinish.next(gotEvents);
});
request.start(filters);
}
private handleEvent(event: NostrEvent) {
if (!matchFilters(this.filters, event)) return;
return this.events.addEvent(event);
}
private handleDeleteEvent(deleteEvent: NostrEvent) {
const cord = deleteEvent.tags.find(isATag)?.[1];
const eventId = deleteEvent.tags.find(isETag)?.[1];
if (cord) this.events.deleteEvent(cord);
if (eventId) this.events.deleteEvent(eventId);
}
cleanup() {
for (const sub of this.subs) sub.unsubscribe();
this.subs = [];
}
getFirstEvent(nth = 0, eventFilter?: EventFilter) {
return this.events.getFirstEvent(nth, eventFilter);
}
getLastEvent(nth = 0, eventFilter?: EventFilter) {
return this.events.getLastEvent(nth, eventFilter);
}
}

View File

@ -1,7 +1,6 @@
import { nanoid } from "nanoid";
import { Filter, NostrEvent } from "nostr-tools";
import { NostrEvent } from "../types/nostr-event";
import { NostrRequestFilter } from "../types/nostr-relay";
import relayPoolService from "../services/relay-pool";
import Relay, { CountResponse, IncomingCount, IncomingEOSE, IncomingEvent } from "./relay";
import createDefer from "./deferred";
@ -64,7 +63,7 @@ export default class NostrRequest {
}
}
start(filter: NostrRequestFilter, type: "REQ" | "COUNT" = "REQ") {
start(filter: Filter | Filter[], type: "REQ" | "COUNT" = "REQ") {
if (this.state !== NostrRequest.IDLE) {
throw new Error("cant restart a nostr request");
}

View File

@ -1,110 +1,25 @@
import dayjs from "dayjs";
import { Debugger } from "debug";
import { Filter, matchFilters } from "nostr-tools";
import { Filter, NostrEvent } from "nostr-tools";
import _throttle from "lodash.throttle";
import { NostrEvent, isATag, isETag } from "../types/nostr-event";
import { NostrRequestFilter, RelayQueryMap } from "../types/nostr-relay";
import NostrRequest from "./nostr-request";
import { RelayQueryMap } from "../types/nostr-relay";
import NostrMultiSubscription from "./nostr-multi-subscription";
import Subject, { PersistentSubject } from "./subject";
import { PersistentSubject } from "./subject";
import { logger } from "../helpers/debug";
import EventStore from "./event-store";
import { isReplaceable } from "../helpers/nostr/event";
import replaceableEventsService from "../services/replaceable-events";
import deleteEventService from "../services/delete-events";
import {
addQueryToFilter,
isFilterEqual,
isQueryMapEqual,
mapQueryMap,
stringifyFilter,
} from "../helpers/nostr/filter";
import { mergeFilter, isFilterEqual, isQueryMapEqual, mapQueryMap, stringifyFilter } from "../helpers/nostr/filter";
import { localRelay } from "../services/local-relay";
import { relayRequest } from "../helpers/relay";
import SuperMap from "./super-map";
import ChunkedRequest from "./chunked-request";
const BLOCK_SIZE = 100;
export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
export class RelayBlockLoader {
relay: string;
filter: NostrRequestFilter;
blockSize = BLOCK_SIZE;
private log: Debugger;
private subs: ZenObservable.Subscription[] = [];
loading = false;
events: EventStore;
/** set to true when the next block produces 0 events */
complete = false;
onBlockFinish = new Subject<number>();
constructor(relay: string, filter: NostrRequestFilter, log?: Debugger) {
this.relay = relay;
this.filter = filter;
this.log = log || logger.extend(relay);
this.events = new EventStore(relay);
this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e)));
}
loadNextBlock() {
this.loading = true;
let filter: NostrRequestFilter = addQueryToFilter(this.filter, { limit: this.blockSize });
let oldestEvent = this.getLastEvent();
if (oldestEvent) {
filter = addQueryToFilter(filter, { until: oldestEvent.created_at - 1 });
}
const request = new NostrRequest([this.relay]);
let gotEvents = 0;
request.onEvent.subscribe((e) => {
this.handleEvent(e);
gotEvents++;
});
request.onComplete.then(() => {
this.loading = false;
if (gotEvents === 0) {
this.complete = true;
this.log("Complete");
} else this.log(`Got ${gotEvents} events`);
this.onBlockFinish.next(gotEvents);
});
request.start(filter);
}
private handleEvent(event: NostrEvent) {
if (!matchFilters(Array.isArray(this.filter) ? this.filter : [this.filter], event)) return;
return this.events.addEvent(event);
}
private handleDeleteEvent(deleteEvent: NostrEvent) {
const cord = deleteEvent.tags.find(isATag)?.[1];
const eventId = deleteEvent.tags.find(isETag)?.[1];
if (cord) this.events.deleteEvent(cord);
if (eventId) this.events.deleteEvent(eventId);
}
cleanup() {
for (const sub of this.subs) sub.unsubscribe();
this.subs = [];
}
getFirstEvent(nth = 0, eventFilter?: EventFilter) {
return this.events.getFirstEvent(nth, eventFilter);
}
getLastEvent(nth = 0, eventFilter?: EventFilter) {
return this.events.getLastEvent(nth, eventFilter);
}
}
export default class TimelineLoader {
cursor = dayjs().unix();
queryMap: RelayQueryMap = {};
@ -121,7 +36,7 @@ export default class TimelineLoader {
private log: Debugger;
private subscription: NostrMultiSubscription;
private blockLoaders = new Map<string, RelayBlockLoader>();
private chunkLoaders = new Map<string, ChunkedRequest>();
constructor(name: string) {
this.name = name;
@ -152,20 +67,23 @@ export default class TimelineLoader {
this.events.addEvent(event);
if (cache) localRelay.publish(event);
}
private blockLoaderSubs = new SuperMap<RelayBlockLoader, ZenObservable.Subscription[]>(() => []);
private connectToBlockLoader(loader: RelayBlockLoader) {
this.events.connect(loader.events);
const subs = this.blockLoaderSubs.get(loader);
subs.push(loader.onBlockFinish.subscribe(this.updateLoading.bind(this)));
subs.push(loader.onBlockFinish.subscribe(this.updateComplete.bind(this)));
private handleChunkFinished() {
this.updateLoading();
this.updateComplete();
}
private disconnectToBlockLoader(loader: RelayBlockLoader) {
private chunkLoaderSubs = new SuperMap<ChunkedRequest, ZenObservable.Subscription[]>(() => []);
private connectToChunkLoader(loader: ChunkedRequest) {
this.events.connect(loader.events);
const subs = this.chunkLoaderSubs.get(loader);
subs.push(loader.onChunkFinish.subscribe(this.handleChunkFinished.bind(this)));
}
private disconnectToChunkLoader(loader: ChunkedRequest) {
loader.cleanup();
this.events.disconnect(loader.events);
const subs = this.blockLoaderSubs.get(loader);
const subs = this.chunkLoaderSubs.get(loader);
for (const sub of subs) sub.unsubscribe();
this.blockLoaderSubs.delete(loader);
this.chunkLoaderSubs.delete(loader);
}
private loadQueriesFromCache(queryMap: RelayQueryMap) {
@ -189,26 +107,26 @@ export default class TimelineLoader {
// remove relays
for (const relay of Object.keys(this.queryMap)) {
const loader = this.blockLoaders.get(relay);
const loader = this.chunkLoaders.get(relay);
if (!loader) continue;
if (!queryMap[relay]) {
this.disconnectToBlockLoader(loader);
this.blockLoaders.delete(relay);
this.disconnectToChunkLoader(loader);
this.chunkLoaders.delete(relay);
}
}
for (const [relay, filter] of Object.entries(queryMap)) {
// remove outdated loaders
if (this.queryMap[relay] && !isFilterEqual(this.queryMap[relay], filter)) {
const old = this.blockLoaders.get(relay)!;
this.disconnectToBlockLoader(old);
this.blockLoaders.delete(relay);
const old = this.chunkLoaders.get(relay)!;
this.disconnectToChunkLoader(old);
this.chunkLoaders.delete(relay);
}
if (!this.blockLoaders.has(relay)) {
const loader = new RelayBlockLoader(relay, filter, this.log.extend(relay));
this.blockLoaders.set(relay, loader);
this.connectToBlockLoader(loader);
if (!this.chunkLoaders.has(relay)) {
const loader = new ChunkedRequest(relay, Array.isArray(filter) ? filter : [filter], this.log.extend(relay));
this.chunkLoaders.set(relay, loader);
this.connectToChunkLoader(loader);
}
}
@ -219,10 +137,10 @@ export default class TimelineLoader {
// update the subscription query map and add limit
this.subscription.setQueryMap(
mapQueryMap(this.queryMap, (filter) => addQueryToFilter(filter, { limit: BLOCK_SIZE / 2 })),
mapQueryMap(this.queryMap, (filter) => mergeFilter(filter, { limit: BLOCK_SIZE / 2 })),
);
this.triggerBlockLoads();
this.triggerChunkLoad();
}
setEventFilter(filter?: EventFilter) {
@ -231,33 +149,33 @@ export default class TimelineLoader {
}
setCursor(cursor: number) {
this.cursor = cursor;
this.triggerBlockLoads();
this.triggerChunkLoad();
}
triggerBlockLoads() {
triggerChunkLoad() {
let triggeredLoad = false;
for (const [relay, loader] of this.blockLoaders) {
for (const [relay, loader] of this.chunkLoaders) {
if (loader.complete || loader.loading) continue;
const event = loader.getLastEvent(this.loadNextBlockBuffer, this.eventFilter);
if (!event || event.created_at >= this.cursor) {
loader.loadNextBlock();
loader.loadNextChunk();
triggeredLoad = true;
}
}
if (triggeredLoad) this.updateLoading();
}
loadNextBlock() {
loadAllNextChunks() {
let triggeredLoad = false;
for (const [relay, loader] of this.blockLoaders) {
for (const [relay, loader] of this.chunkLoaders) {
if (loader.complete || loader.loading) continue;
loader.loadNextBlock();
loader.loadNextChunk();
triggeredLoad = true;
}
if (triggeredLoad) this.updateLoading();
}
private updateLoading() {
for (const [relay, loader] of this.blockLoaders) {
for (const [relay, loader] of this.chunkLoaders) {
if (loader.loading) {
if (!this.loading.value) {
this.loading.next(true);
@ -268,7 +186,7 @@ export default class TimelineLoader {
if (this.loading.value) this.loading.next(false);
}
private updateComplete() {
for (const [relay, loader] of this.blockLoaders) {
for (const [relay, loader] of this.chunkLoaders) {
if (!loader.complete) {
this.complete.next(false);
return;
@ -290,8 +208,8 @@ export default class TimelineLoader {
}
reset() {
this.cursor = dayjs().unix();
for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader);
this.blockLoaders.clear();
for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader);
this.chunkLoaders.clear();
this.forgetEvents();
}
@ -299,8 +217,8 @@ export default class TimelineLoader {
cleanup() {
this.close();
for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader);
this.blockLoaders.clear();
for (const [_, loader] of this.chunkLoaders) this.disconnectToChunkLoader(loader);
this.chunkLoaders.clear();
this.events.cleanup();
}

View File

@ -21,7 +21,14 @@ export default function TimelineActionAndStatus({ timeline }: { timeline: Timeli
}
return (
<Button onClick={() => timeline.loadNextBlock()} flexShrink={0} size="lg" mx="auto" colorScheme="primary" my="4">
<Button
onClick={() => timeline.loadAllNextChunks()}
flexShrink={0}
size="lg"
mx="auto"
colorScheme="primary"
my="4"
>
Load More
</Button>
);

View File

@ -4,7 +4,7 @@ import { useDnsIdentity } from "../../hooks/use-dns-identity";
import { useUserMetadata } from "../../hooks/use-user-metadata";
import { VerificationFailed, VerificationMissing, VerifiedIcon } from "../icons";
export const UserDnsIdentityIcon = ({ pubkey, onlyIcon }: { pubkey: string; onlyIcon?: boolean }) => {
export function UserDnsIdentityIcon({ pubkey, onlyIcon }: { pubkey: string; onlyIcon?: boolean }) {
const metadata = useUserMetadata(pubkey);
const identity = useDnsIdentity(metadata?.nip05);
@ -32,4 +32,6 @@ export const UserDnsIdentityIcon = ({ pubkey, onlyIcon }: { pubkey: string; only
{metadata.nip05.startsWith("_@") ? metadata.nip05.substr(2) : metadata.nip05} {renderIcon()}
</Text>
);
};
}
export default UserDnsIdentityIcon;

View File

@ -1,19 +1,21 @@
import stringify from "json-stringify-deterministic";
import { NostrRequestFilter, RelayQueryMap } from "../../types/nostr-relay";
import { RelayQueryMap } from "../../types/nostr-relay";
import { Filter } from "nostr-tools";
import { safeRelayUrls } from "../relay";
export function addQueryToFilter(filter: NostrRequestFilter, query: Filter) {
export function mergeFilter(filter: Filter, query: Filter): Filter;
export function mergeFilter(filter: Filter[], query: Filter): Filter[];
export function mergeFilter(filter: Filter | Filter[], query: Filter) {
if (Array.isArray(filter)) {
return filter.map((f) => ({ ...f, ...query }));
}
return { ...filter, ...query };
}
export function stringifyFilter(filter: NostrRequestFilter) {
export function stringifyFilter(filter: Filter | Filter[]) {
return stringify(filter);
}
export function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) {
export function isFilterEqual(a: Filter | Filter[], b: Filter | Filter[]) {
return stringifyFilter(a) === stringifyFilter(b);
}
@ -21,14 +23,14 @@ export function isQueryMapEqual(a: RelayQueryMap, b: RelayQueryMap) {
return stringify(a) === stringify(b);
}
export function mapQueryMap(queryMap: RelayQueryMap, fn: (filter: NostrRequestFilter) => NostrRequestFilter) {
export function mapQueryMap(queryMap: RelayQueryMap, fn: (filters: Filter[]) => Filter[]) {
const newMap: RelayQueryMap = {};
for (const [relay, filter] of Object.entries(queryMap)) newMap[relay] = fn(filter);
for (const [relay, filters] of Object.entries(queryMap)) newMap[relay] = fn(filters);
return newMap;
}
export function createSimpleQueryMap(relays: Iterable<string>, filter: NostrRequestFilter) {
export function createSimpleQueryMap(relays: Iterable<string>, filters: Filter | Filter[]) {
const map: RelayQueryMap = {};
for (const relay of safeRelayUrls(relays)) map[relay] = filter;
for (const relay of safeRelayUrls(relays)) map[relay] = Array.isArray(filters) ? filters : [filters];
return map;
}

View File

@ -7,7 +7,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader)
// if the cursor is set too far ahead and the last block did not overlap with the cursor
// we need to keep loading blocks until the timeline is complete or the blocks pass the cursor
useInterval(() => {
timeline.triggerBlockLoads();
timeline.triggerChunkLoad();
}, 1000);
return useIntersectionMapCallback(
@ -25,7 +25,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader)
if (oldestEvent) {
timeline.setCursor(oldestEvent.created_at);
timeline.triggerBlockLoads();
timeline.triggerChunkLoad();
}
},
[timeline],

View File

@ -1,10 +1,10 @@
import { useEffect, useMemo } from "react";
import { useUnmount } from "react-use";
import { NostrEvent } from "nostr-tools";
import { NostrRequestFilter } from "../types/nostr-relay";
import timelineCacheService from "../services/timeline-cache";
import { EventFilter } from "../classes/timeline-loader";
import { NostrEvent } from "../types/nostr-event";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
type Options = {

View File

@ -1,5 +1,5 @@
import _throttle from "lodash/throttle";
import { Filter, NostrEvent } from "nostr-tools";
import _throttle from "lodash.throttle";
import SuperMap from "../classes/super-map";
import { logger } from "../helpers/debug";

View File

@ -5,4 +5,4 @@ export type NostrQuery = Filter;
export type NostrRequestFilter = Filter | Filter[];
export type RelayQueryMap = Record<string, NostrRequestFilter>;
export type RelayQueryMap = Record<string, Filter[]>;