From 0c25242c180e651457568686ebdba5f13fc9f67f Mon Sep 17 00:00:00 2001 From: hzrd149 Date: Tue, 7 Feb 2023 17:04:18 -0600 Subject: [PATCH] rework metadata and contacts service --- .prettierrc | 3 + README.md | 4 + .../request.ts => classes/nostr-request.ts} | 27 +-- .../nostr-subscription.ts} | 100 +++++++---- src/classes/pubkey-request-list.ts | 41 +++++ src/classes/pubkey-subject-cache.ts | 28 +++ src/classes/request-manager.ts | 93 ++++++++++ src/components/connected-relays.tsx | 13 +- src/components/dev-modal.tsx | 47 +++++ src/components/following-list.tsx | 4 +- src/components/post/index.tsx | 2 +- src/components/post/post-cc.tsx | 10 +- src/components/profile-button.tsx | 2 +- src/components/user-avatar-link.tsx | 2 +- src/components/user-avatar.tsx | 21 +-- src/helpers/array.ts | 3 + src/helpers/nostr-event.ts | 40 ++++- src/helpers/thread.ts | 47 +++++ src/hooks/use-event-dir.ts | 4 +- src/hooks/use-subscription.ts | 19 +- src/hooks/use-user-contacts.ts | 7 +- src/hooks/use-user-metadata.ts | 18 +- src/services/db/index.ts | 26 ++- src/services/db/schema.ts | 8 +- src/services/events.ts | 56 ++++++ src/services/relays/relay-pool.ts | 1 - src/services/user-contacts.ts | 148 ++++++++------- src/services/user-metadata.ts | 170 ++++++++---------- src/types/nostr-event.ts | 22 ++- src/views/event.tsx | 28 +-- src/views/global/index.tsx | 6 +- src/views/home/discover-tab.tsx | 12 +- src/views/profile/edit.tsx | 4 +- src/views/user/following.tsx | 19 +- src/views/user/index.tsx | 29 +-- src/views/user/relays.tsx | 24 +-- 36 files changed, 734 insertions(+), 354 deletions(-) rename src/{services/request.ts => classes/nostr-request.ts} (73%) rename src/{services/subscriptions.ts => classes/nostr-subscription.ts} (51%) create mode 100644 src/classes/pubkey-request-list.ts create mode 100644 src/classes/pubkey-subject-cache.ts create mode 100644 src/classes/request-manager.ts create mode 100644 src/components/dev-modal.tsx create mode 100644 src/helpers/array.ts create mode 100644 src/helpers/thread.ts create mode 100644 src/services/events.ts diff --git a/.prettierrc b/.prettierrc index e69de29bb..963354f23 100644 --- a/.prettierrc +++ b/.prettierrc @@ -0,0 +1,3 @@ +{ + "printWidth": 120 +} diff --git a/README.md b/README.md index a32b5f1c3..fc25cb0c6 100644 --- a/README.md +++ b/README.md @@ -14,3 +14,7 @@ - add emoji reaction button - save relay list as note - load relays from note + +create a subscription manager that takes a "canMerge" function and batches requests +create a template for a cached subscription service +create a template for a cached request service diff --git a/src/services/request.ts b/src/classes/nostr-request.ts similarity index 73% rename from src/services/request.ts rename to src/classes/nostr-request.ts index 14711f24c..729dad12d 100644 --- a/src/services/request.ts +++ b/src/classes/nostr-request.ts @@ -1,11 +1,14 @@ import { Subject, Subscription as RxSubscription } from "rxjs"; import { NostrEvent } from "../types/nostr-event"; import { NostrQuery } from "../types/nostr-query"; -import { Relay } from "./relays"; -import relayPool from "./relays/relay-pool"; +import { Relay } from "../services/relays"; +import relayPool from "../services/relays/relay-pool"; +import { IncomingEvent } from "../services/relays/relay"; + +let lastId = 0; const REQUEST_DEFAULT_TIMEOUT = 1000 * 20; -export class Request { +export class NostrRequest { static IDLE = "idle"; static RUNNING = "running"; static COMPLETE = "complete"; @@ -14,11 +17,12 @@ export class Request { timeout: number; relays: Set; relayCleanup = new Map(); - state = Request.IDLE; + state = NostrRequest.IDLE; onEvent = new Subject(); + seenEvents = new Set(); constructor(relayUrls: string[], timeout?: number) { - this.id = String(Math.floor(Math.random() * 1000000)); + this.id = `request-${lastId++}`; this.relays = new Set(relayUrls.map((url) => relayPool.requestRelay(url))); for (const relay of this.relays) { @@ -34,8 +38,9 @@ export class Request { cleanup.push( relay.onEvent.subscribe((event) => { - if (event.subId === this.id) { + if (this.state === NostrRequest.RUNNING && event.subId === this.id && !this.seenEvents.has(event.body.id)) { this.onEvent.next(event.body); + this.seenEvents.add(event.body.id); } }) ); @@ -54,15 +59,15 @@ export class Request { for (const fn of cleanup) fn.unsubscribe(); if (this.relays.size === 0) { - this.state = Request.COMPLETE; + this.state = NostrRequest.COMPLETE; this.onEvent.complete(); } } start(query: NostrQuery) { - if (this.state !== Request.IDLE) return this; + if (this.state !== NostrRequest.IDLE) return this; - this.state = Request.RUNNING; + this.state = NostrRequest.RUNNING; for (const relay of this.relays) { relay.send(["REQ", this.id, query]); } @@ -74,9 +79,9 @@ export class Request { return this; } cancel() { - if (this.state !== Request.COMPLETE) return this; + if (this.state !== NostrRequest.COMPLETE) return this; - this.state = Request.COMPLETE; + this.state = NostrRequest.COMPLETE; for (const relay of this.relays) { relay.send(["CLOSE", this.id]); } diff --git a/src/services/subscriptions.ts b/src/classes/nostr-subscription.ts similarity index 51% rename from src/services/subscriptions.ts rename to src/classes/nostr-subscription.ts index 45d558073..de4b7f956 100644 --- a/src/services/subscriptions.ts +++ b/src/classes/nostr-subscription.ts @@ -1,11 +1,14 @@ import { Subject, SubscriptionLike } from "rxjs"; import { NostrEvent } from "../types/nostr-event"; import { NostrOutgoingMessage, NostrQuery } from "../types/nostr-query"; -import { Relay } from "./relays"; -import { IncomingEvent } from "./relays/relay"; -import relayPool from "./relays/relay-pool"; +import { Relay } from "../services/relays"; +import { IncomingEvent } from "../services/relays/relay"; +import relayPool from "../services/relays/relay-pool"; -export class Subscription { +let lastId = 0; + +export class NostrSubscription { + static INIT = "initial"; static OPEN = "open"; static CLOSED = "closed"; @@ -14,12 +17,12 @@ export class Subscription { query?: NostrQuery; relayUrls: string[]; relays: Relay[]; - state = Subscription.CLOSED; + state = NostrSubscription.INIT; onEvent = new Subject(); - cleanup: SubscriptionLike[] = []; + seenEvents = new Set(); constructor(relayUrls: string[], query?: NostrQuery, name?: string) { - this.id = String(Math.floor(Math.random() * 1000000)); + this.id = String(name||lastId++); this.query = query; this.name = name; this.relayUrls = relayUrls; @@ -27,13 +30,15 @@ export class Subscription { this.relays = relayUrls.map((url) => relayPool.requestRelay(url)); } handleOpen(relay: Relay) { - if (!this.query) return; - // when the relay connects send the req event - relay.send(["REQ", this.id, this.query]); + if (this.query) { + // when the relay connects send the req event + relay.send(["REQ", this.id, this.query]); + } } handleEvent(event: IncomingEvent) { - if (event.subId === this.id) { + if (this.state === NostrSubscription.OPEN && event.subId === this.id && !this.seenEvents.has(event.body.id)) { this.onEvent.next(event.body); + this.seenEvents.add(event.body.id); } } send(message: NostrOutgoingMessage) { @@ -42,19 +47,9 @@ export class Subscription { } } - setQuery(query: NostrQuery) { - this.query = query; - - // if open, than update remote subscription - if (this.state === Subscription.OPEN) { - this.send(["REQ", this.id, this.query]); - } - } - open() { - if (this.state === Subscription.OPEN || !this.query) return; - this.state = Subscription.OPEN; - this.send(["REQ", this.id, this.query]); - + cleanup: SubscriptionLike[] = []; + /** listen for event and open events from relays */ + private subscribeToRelays() { for (const relay of this.relays) { this.cleanup.push(relay.onEvent.subscribe(this.handleEvent.bind(this))); this.cleanup.push(relay.onOpen.subscribe(this.handleOpen.bind(this))); @@ -63,24 +58,63 @@ export class Subscription { for (const url of this.relayUrls) { relayPool.addClaim(url, this); } - - if (import.meta.env.DEV) { - console.info(`Subscription: "${this.name || this.id}" opened`); - } } - close() { - if (this.state === Subscription.CLOSED) return; - this.state = Subscription.CLOSED; - this.send(["CLOSE", this.id]); - + /** listen for event and open events from relays */ + private unsubscribeToRelays() { this.cleanup.forEach((sub) => sub.unsubscribe()); for (const url of this.relayUrls) { relayPool.removeClaim(url, this); } + } + + open() { + if (!this.query) throw new Error("cant open without a query"); + if (this.state === NostrSubscription.OPEN) return this; + + this.state = NostrSubscription.OPEN; + this.send(["REQ", this.id, this.query]); + + this.subscribeToRelays(); + + if (import.meta.env.DEV) { + console.info(`Subscription: "${this.name || this.id}" opened`); + } + + return this; + } + update(query: NostrQuery) { + this.query = query; + if (this.state === NostrSubscription.OPEN) { + this.send(["REQ", this.id, this.query]); + } + return this; + } + setRelays(relays: string[]) { + this.unsubscribeToRelays(); + + // get new relays + this.relayUrls = relays; + this.relays = relays.map((url) => relayPool.requestRelay(url)); + + this.subscribeToRelays(); + } + close() { + if (this.state !== NostrSubscription.OPEN) return this; + + // set state + this.state = NostrSubscription.CLOSED; + // send close message + this.send(["CLOSE", this.id]); + // forget all seen events + this.seenEvents.clear(); + // unsubscribe from relay messages + this.unsubscribeToRelays(); if (import.meta.env.DEV) { console.info(`Subscription: "${this.name || this.id}" closed`); } + + return this; } } diff --git a/src/classes/pubkey-request-list.ts b/src/classes/pubkey-request-list.ts new file mode 100644 index 000000000..3cc40fe82 --- /dev/null +++ b/src/classes/pubkey-request-list.ts @@ -0,0 +1,41 @@ +import { unique } from "../helpers/array"; + +export class PubkeyRequestList { + needsFlush = false; + requests = new Map>(); + + hasPubkey(pubkey: string) { + return this.requests.has(pubkey); + } + addPubkey(pubkey: string, relays: string[] = []) { + const pending = this.requests.get(pubkey); + if (pending) { + if (relays.length > 0) { + this.needsFlush = true; + // get or create the list of relays + const r = this.requests.get(pubkey) ?? new Set(); + // add new relay urls to set + relays.forEach((url) => r.add(url)); + this.requests.set(pubkey, r); + } + } else { + this.needsFlush = true; + this.requests.set(pubkey, new Set(relays)); + } + } + removePubkey(pubkey: string) { + this.requests.delete(pubkey); + } + + flush() { + this.needsFlush = false; + const pubkeys = Array.from(this.requests.keys()); + const relays = unique( + Array.from(this.requests.values()) + .map((set) => Array.from(set)) + .flat() + ); + + return { pubkeys, relays }; + } +} diff --git a/src/classes/pubkey-subject-cache.ts b/src/classes/pubkey-subject-cache.ts new file mode 100644 index 000000000..a388c78e8 --- /dev/null +++ b/src/classes/pubkey-subject-cache.ts @@ -0,0 +1,28 @@ +import { BehaviorSubject } from "rxjs"; + +export class PubkeySubjectCache { + subjects = new Map>(); + + hasSubject(pubkey: string) { + return this.subjects.has(pubkey); + } + getSubject(pubkey: string) { + let subject = this.subjects.get(pubkey); + if (!subject) { + subject = new BehaviorSubject(null); + this.subjects.set(pubkey, subject); + } + return subject; + } + + prune() { + const prunedKeys: string[] = []; + for (const [key, subject] of this.subjects) { + if (!subject.observed) { + this.subjects.delete(key); + prunedKeys.push(key); + } + } + return prunedKeys; + } +} diff --git a/src/classes/request-manager.ts b/src/classes/request-manager.ts new file mode 100644 index 000000000..acfa6cd5e --- /dev/null +++ b/src/classes/request-manager.ts @@ -0,0 +1,93 @@ +import { Subject } from "rxjs"; +import { NostrEvent } from "../types/nostr-event"; +import { NostrQuery } from "../types/nostr-query"; +import { NostrRequest } from "./nostr-request"; + +function mergeSets(to: Set, from: Iterable) { + for (const el of from) { + to.add(el); + } +} + +export type getQueryKeyFn = (query: QueryT) => string; +export type mergeQueriesFn = (a: QueryT, b: QueryT) => QueryT | undefined | null; +export type getEventQueryKeyFn = (event: NostrEvent) => string; + +type PendingRequest = { + query: QueryT; + subject: Subject; + relays: Set; +}; + +/** @deprecated incomplete */ +export class RequestManager { + private getQueryKey: getQueryKeyFn; + private mergeQueries: mergeQueriesFn; + private getEventQueryKey: getEventQueryKeyFn; + + private runningRequests = new Map(); + private requestQueue = new Map>(); + + constructor( + getQueryKey: getQueryKeyFn, + mergeQueries: mergeQueriesFn, + getEventQueryKey: getEventQueryKeyFn + ) { + this.getQueryKey = getQueryKey; + this.mergeQueries = mergeQueries; + this.getEventQueryKey = getEventQueryKey; + } + + request(query: QueryT, relays: string[]) { + const key = this.getQueryKey(query); + if (this.runningRequests.has(key)) throw new Error("requesting a currently running query"); + + const pending = this.requestQueue.get(key); + if (pending) { + mergeSets(pending.relays, relays); + return pending.subject; + } + + const subject = new Subject(); + this.requestQueue.set(key, { + query, + relays: new Set(relays), + subject, + }); + + return subject; + } + + batch() { + const requests: PendingRequest[] = []; + + for (const [key, pending] of this.requestQueue) { + let wasMerged = false; + if (requests.length > 0) { + for (const request of requests) { + const merged = this.mergeQueries(request.query, pending.query); + if (merged) { + request.query = merged; + request.subject.subscribe(pending.subject); + mergeSets(request.relays, pending.relays); + wasMerged = true; + break; + } + } + } + + // if there are no requests. or pending failed to merge create new request + if (!wasMerged) { + const subject = new Subject(); + subject.subscribe(pending.subject); + requests.push({ query: pending.query, subject, relays: pending.relays }); + } + } + + for (const request of requests) { + const r = new NostrRequest(Array.from(request.relays)); + r.onEvent.subscribe(request.subject); + r.start(request.query); + } + } +} diff --git a/src/components/connected-relays.tsx b/src/components/connected-relays.tsx index b118f8425..336c219fa 100644 --- a/src/components/connected-relays.tsx +++ b/src/components/connected-relays.tsx @@ -1,11 +1,13 @@ import { useState } from "react"; -import { Text } from "@chakra-ui/react"; +import { Button, Text, useDisclosure } from "@chakra-ui/react"; import { useInterval } from "react-use"; import { Relay } from "../services/relays"; import relayPool from "../services/relays/relay-pool"; +import { DevModel } from "./dev-modal"; export const ConnectedRelays = () => { const [relays, setRelays] = useState(relayPool.getRelays()); + const { onOpen, onClose, isOpen } = useDisclosure(); useInterval(() => { setRelays(relayPool.getRelays()); @@ -15,8 +17,11 @@ export const ConnectedRelays = () => { const disconnected = relays.filter((relay) => !relay.okay); return ( - - {connected.length}/{relays.length} of relays connected - + <> + + {isOpen && } + ); }; diff --git a/src/components/dev-modal.tsx b/src/components/dev-modal.tsx new file mode 100644 index 000000000..a56df4478 --- /dev/null +++ b/src/components/dev-modal.tsx @@ -0,0 +1,47 @@ +import { + Modal, + ModalOverlay, + ModalContent, + ModalHeader, + ModalBody, + ModalCloseButton, + ModalProps, + StatGroup, + Stat, + StatLabel, + StatNumber, + useForceUpdate, +} from "@chakra-ui/react"; +import { useAsync, useInterval } from "react-use"; +import db from "../services/db"; + +export const DevModel = (props: Omit) => { + const update = useForceUpdate(); + useInterval(update, 1000 * 5); + + const { value: eventsSeen } = useAsync(() => db.count("events-seen"), []); + const { value: usersSeen } = useAsync(() => db.count("user-metadata"), []); + + return ( + + + + Stats + + + + + Events Seen + {eventsSeen ?? "loading..."} + + + + Users Seen + {usersSeen ?? "loading..."} + + + + + + ); +}; diff --git a/src/components/following-list.tsx b/src/components/following-list.tsx index 18d69ebd8..c893016bc 100644 --- a/src/components/following-list.tsx +++ b/src/components/following-list.tsx @@ -9,9 +9,9 @@ import identity from "../services/identity"; import { UserAvatar } from "./user-avatar"; const FollowingListItem = ({ pubkey }: { pubkey: string }) => { - const { metadata, loading } = useUserMetadata(pubkey); + const metadata = useUserMetadata(pubkey); - if (loading || !metadata) return ; + if (!metadata) return ; return (