Rebuild observable class

This commit is contained in:
hzrd149 2024-02-15 22:27:39 +00:00
parent c1017ae25b
commit df094b2c4f
40 changed files with 375 additions and 345 deletions

View File

@ -0,0 +1,5 @@
---
"nostrudel": patch
---
Rebuild observable class

View File

@ -1,3 +1,5 @@
{
"tabWidth": 2,
"useTabs": false,
"printWidth": 120
}

View File

@ -13,5 +13,6 @@
"webln"
],
"typescript.enablePromptUseWorkspaceTsdk": true,
"typescript.tsdk": "node_modules/typescript/lib"
"typescript.tsdk": "node_modules/typescript/lib",
"deno.enable": false
}

View File

@ -79,7 +79,8 @@
"three-spritetext": "^1.8.1",
"three-stdlib": "^2.29.4",
"webln": "^0.3.2",
"yet-another-react-lightbox": "^3.15.6"
"yet-another-react-lightbox": "^3.15.6",
"zen-observable": "^0.10.0"
},
"devDependencies": {
"@changesets/cli": "^2.27.1",
@ -95,6 +96,7 @@
"@types/react-dom": "^18.2.18",
"@types/three": "^0.160.0",
"@types/webscopeio__react-textarea-autocomplete": "^4.7.5",
"@types/zen-observable": "^0.8.7",
"@vitejs/plugin-react": "^4.2.1",
"camelcase": "^8.0.0",
"prettier": "^3.1.1",

View File

@ -0,0 +1,64 @@
import Observable from "zen-observable";
export default class ControlledObservable<T> implements Observable<T> {
private observable: Observable<T>;
private subscriptions = new Set<ZenObservable.SubscriptionObserver<T>>();
private _complete = false;
get closed() {
return this._complete;
}
get used() {
return this.subscriptions.size > 0;
}
constructor(subscriber?: ZenObservable.Subscriber<T>) {
this.observable = new Observable((observer) => {
this.subscriptions.add(observer);
const cleanup = subscriber && subscriber(observer);
return () => {
this.subscriptions.delete(observer);
if (typeof cleanup === "function") cleanup();
else if (cleanup?.unsubscribe) cleanup.unsubscribe();
};
});
this.subscribe = this.observable.subscribe.bind(this.observable);
this.map = this.observable.map.bind(this.observable);
this.flatMap = this.observable.flatMap.bind(this.observable);
this.forEach = this.observable.forEach.bind(this.observable);
this.reduce = this.observable.reduce.bind(this.observable);
this.filter = this.observable.filter.bind(this.observable);
this.concat = this.observable.concat.bind(this.observable);
}
next(v: T) {
if (this._complete) return;
for (const observer of this.subscriptions) {
observer.next(v);
}
}
error(err: any) {
if (this._complete) return;
for (const observer of this.subscriptions) {
observer.error(err);
}
}
complete() {
if (this._complete) return;
this._complete = true;
for (const observer of this.subscriptions) {
observer.complete();
}
}
[Symbol.observable]() {
return this.observable;
}
subscribe: Observable<T>["subscribe"];
map: Observable<T>["map"];
flatMap: Observable<T>["flatMap"];
forEach: Observable<T>["forEach"];
reduce: Observable<T>["reduce"];
filter: Observable<T>["filter"];
concat: Observable<T>["concat"];
}

View File

@ -1,7 +1,8 @@
import { getEventUID, isReplaceable, sortByDate } from "../helpers/nostr/events";
import replaceableEventLoaderService from "../services/replaceable-event-requester";
import { NostrEvent, isDTag } from "../types/nostr-event";
import Subject from "./subject";
import { NostrEvent } from "nostr-tools";
import { getEventUID, sortByDate } from "../helpers/nostr/events";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";
import deleteEventService from "../services/delete-events";
export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
@ -11,20 +12,27 @@ export default class EventStore {
customSort?: typeof sortByDate;
private deleteSub: ZenObservable.Subscription;
constructor(name?: string, customSort?: typeof sortByDate) {
this.name = name;
this.customSort = customSort;
this.deleteSub = deleteEventService.stream.subscribe((event) => {
const uid = getEventUID(event);
this.deleteEvent(uid);
if (uid !== event.id) this.deleteEvent(event.id);
});
}
getSortedEvents() {
return Array.from(this.events.values()).sort(this.customSort || sortByDate);
}
onEvent = new Subject<NostrEvent>(undefined, false);
onDelete = new Subject<string>(undefined, false);
onClear = new Subject(undefined, false);
onEvent = new ControlledObservable<NostrEvent>();
onDelete = new ControlledObservable<string>();
onClear = new ControlledObservable();
private replaceableEventSubs = new Map<string, Subject<NostrEvent>>();
private handleEvent(event: NostrEvent) {
const id = getEventUID(event);
const existing = this.events.get(id);
@ -37,16 +45,6 @@ export default class EventStore {
addEvent(event: NostrEvent) {
const id = getEventUID(event);
this.handleEvent(event);
if (isReplaceable(event.kind)) {
// pass the event on
replaceableEventLoaderService.handleEvent(event);
// subscribe to any future changes
const sub = replaceableEventLoaderService.getEvent(event.kind, event.pubkey, event.tags.find(isDTag)?.[1]);
sub.subscribe(this.handleEvent, this);
this.replaceableEventSubs.set(id, sub);
}
}
getEvent(id: string) {
return this.events.get(id);
@ -56,32 +54,36 @@ export default class EventStore {
this.events.delete(id);
this.onDelete.next(id);
}
if (this.replaceableEventSubs.has(id)) {
this.replaceableEventSubs.get(id)?.unsubscribe(this.handleEvent, this);
this.replaceableEventSubs.delete(id);
}
}
clear() {
this.events.clear();
this.onClear.next(undefined);
for (const [_, sub] of this.replaceableEventSubs) {
sub.unsubscribe(this.handleEvent, this);
}
}
cleanup() {
this.clear();
}
connect(other: EventStore) {
other.onEvent.subscribe(this.addEvent, this);
other.onDelete.subscribe(this.deleteEvent, this);
private storeSubs = new SuperMap<EventStore, ZenObservable.Subscription[]>(() => []);
connect(other: EventStore, fullSync = true) {
const subs = this.storeSubs.get(other);
subs.push(
other.onEvent.subscribe((e) => {
if (fullSync || this.events.has(getEventUID(e))) this.addEvent(e);
}),
);
subs.push(other.onDelete.subscribe(this.deleteEvent.bind(this)));
}
disconnect(other: EventStore) {
other.onEvent.unsubscribe(this.addEvent, this);
other.onDelete.unsubscribe(this.deleteEvent, this);
const subs = this.storeSubs.get(other);
for (const sub of subs) sub.unsubscribe();
this.storeSubs.delete(other);
}
cleanup() {
this.clear();
for (const [_, subs] of this.storeSubs) {
for (const sub of subs) sub.unsubscribe();
}
this.storeSubs.clear();
this.deleteSub.unsubscribe();
}
getFirstEvent(nth = 0, filter?: EventFilter) {

View File

@ -1,11 +1,12 @@
import { nanoid } from "nanoid";
import { Subject } from "./subject";
import { NostrEvent } from "../types/nostr-event";
import { NostrOutgoingRequest, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query";
import Relay, { IncomingEvent } from "./relay";
import relayPoolService from "../services/relay-pool";
import { isFilterEqual, isQueryMapEqual } from "../helpers/nostr/filter";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";
export default class NostrMultiSubscription {
static INIT = "initial";
@ -18,7 +19,7 @@ export default class NostrMultiSubscription {
relays: Relay[] = [];
state = NostrMultiSubscription.INIT;
onEvent = new Subject<NostrEvent>();
onEvent = new ControlledObservable<NostrEvent>();
seenEvents = new Set<string>();
constructor(name?: string) {
@ -36,18 +37,20 @@ export default class NostrMultiSubscription {
}
}
private relaySubs = new SuperMap<Relay, ZenObservable.Subscription[]>(() => []);
/** listen for event and open events from relays */
private connectToRelay(relay: Relay) {
relay.onEvent.subscribe(this.handleEvent, this);
relay.onOpen.subscribe(this.handleRelayConnect, this);
relay.onClose.subscribe(this.handleRelayDisconnect, this);
const subs = this.relaySubs.get(relay);
subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this)));
subs.push(relay.onOpen.subscribe(this.handleRelayConnect.bind(this)));
subs.push(relay.onClose.subscribe(this.handleRelayDisconnect.bind(this)));
relayPoolService.addClaim(relay.url, this);
}
/** stop listing to events from relays */
private disconnectFromRelay(relay: Relay) {
relay.onEvent.unsubscribe(this.handleEvent, this);
relay.onOpen.unsubscribe(this.handleRelayConnect, this);
relay.onClose.unsubscribe(this.handleRelayDisconnect, this);
const subs = this.relaySubs.get(relay);
for (const sub of subs) sub.unsubscribe();
this.relaySubs.delete(relay);
relayPoolService.removeClaim(relay.url, this);
// if the subscription is open and had sent a request to the relay

View File

@ -4,7 +4,9 @@ import { NostrEvent } from "nostr-tools";
import relayPoolService from "../services/relay-pool";
import createDefer from "./deferred";
import Relay, { IncomingCommandResult } from "./relay";
import Subject, { PersistentSubject } from "./subject";
import { PersistentSubject } from "./subject";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";
export default class NostrPublishAction {
id = nanoid();
@ -13,10 +15,11 @@ export default class NostrPublishAction {
event: NostrEvent;
results = new PersistentSubject<IncomingCommandResult[]>([]);
onResult = new Subject<IncomingCommandResult>(undefined, false);
onResult = new ControlledObservable<IncomingCommandResult>();
onComplete = createDefer<IncomingCommandResult[]>();
private remaining = new Set<Relay>();
private relayResultSubs = new SuperMap<Relay, ZenObservable.Subscription[]>(() => []);
constructor(label: string, relays: Iterable<string>, event: NostrEvent, timeout: number = 5000) {
this.label = label;
@ -26,7 +29,7 @@ export default class NostrPublishAction {
for (const url of relays) {
const relay = relayPoolService.requestRelay(url);
this.remaining.add(relay);
relay.onCommandResult.subscribe(this.handleResult, this);
this.relayResultSubs.get(relay).push(relay.onCommandResult.subscribe(this.handleResult.bind(this)));
// send event
relay.send(["EVENT", event]);
@ -42,7 +45,8 @@ export default class NostrPublishAction {
this.onResult.next(result);
relay.onCommandResult.unsubscribe(this.handleResult, this);
this.relayResultSubs.get(relay).forEach((s) => s.unsubscribe());
this.relayResultSubs.delete(relay);
this.remaining.delete(relay);
if (this.remaining.size === 0) this.onComplete.resolve(this.results.value);
}

View File

@ -3,8 +3,9 @@ import { CountResponse, NostrEvent } from "../types/nostr-event";
import { NostrRequestFilter } from "../types/nostr-query";
import relayPoolService from "../services/relay-pool";
import Relay, { IncomingCount, IncomingEOSE, IncomingEvent } from "./relay";
import Subject from "./subject";
import createDefer from "./deferred";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";
const REQUEST_DEFAULT_TIMEOUT = 1000 * 5;
export default class NostrRequest {
@ -12,23 +13,26 @@ export default class NostrRequest {
static RUNNING = "running";
static COMPLETE = "complete";
id: string;
id = nanoid();
timeout: number;
relays: Set<Relay>;
state = NostrRequest.IDLE;
onEvent = new Subject<NostrEvent>(undefined, false);
onCount = new Subject<CountResponse>(undefined, false);
onEvent = new ControlledObservable<NostrEvent>();
onCount = new ControlledObservable<CountResponse>();
/** @deprecated */
onComplete = createDefer<void>();
seenEvents = new Set<string>();
private relaySubs: SuperMap<Relay, ZenObservable.Subscription[]> = new SuperMap(() => []);
constructor(relayUrls: Iterable<string>, timeout?: number) {
this.id = nanoid();
this.relays = new Set(Array.from(relayUrls).map((url) => relayPoolService.requestRelay(url)));
for (const relay of this.relays) {
relay.onEOSE.subscribe(this.handleEOSE, this);
relay.onEvent.subscribe(this.handleEvent, this);
relay.onCount.subscribe(this.handleCount, this);
const subs = this.relaySubs.get(relay);
subs.push(relay.onEOSE.subscribe(this.handleEOSE.bind(this)));
subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this)));
subs.push(relay.onCount.subscribe(this.handleCount.bind(this)));
}
this.timeout = timeout ?? REQUEST_DEFAULT_TIMEOUT;
@ -40,8 +44,8 @@ export default class NostrRequest {
this.relays.delete(relay);
relay.send(["CLOSE", this.id]);
relay.onEOSE.unsubscribe(this.handleEOSE, this);
relay.onEvent.unsubscribe(this.handleEvent, this);
this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe());
this.relaySubs.delete(relay);
if (this.relays.size === 0) {
this.state = NostrRequest.COMPLETE;
@ -87,9 +91,9 @@ export default class NostrRequest {
this.state = NostrRequest.COMPLETE;
for (const relay of this.relays) {
relay.send(["CLOSE", this.id]);
relay.onEOSE.unsubscribe(this.handleEOSE, this);
relay.onEvent.unsubscribe(this.handleEvent, this);
this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe());
}
this.relaySubs.clear();
this.onComplete.resolve();
return this;

View File

@ -4,7 +4,7 @@ import { NostrEvent } from "../types/nostr-event";
import { NostrOutgoingMessage, NostrRequestFilter } from "../types/nostr-query";
import Relay, { IncomingEOSE } from "./relay";
import relayPoolService from "../services/relay-pool";
import { Subject } from "./subject";
import ControlledObservable from "./controlled-observable";
export default class NostrSubscription {
static INIT = "initial";
@ -16,8 +16,10 @@ export default class NostrSubscription {
query?: NostrRequestFilter;
relay: Relay;
state = NostrSubscription.INIT;
onEvent = new Subject<NostrEvent>();
onEOSE = new Subject<IncomingEOSE>();
onEvent = new ControlledObservable<NostrEvent>();
onEOSE = new ControlledObservable<IncomingEOSE>();
private subs: ZenObservable.Subscription[] = [];
constructor(relayUrl: string | URL, query?: NostrRequestFilter, name?: string) {
this.id = nanoid();
@ -26,14 +28,18 @@ export default class NostrSubscription {
this.relay = relayPoolService.requestRelay(relayUrl);
this.onEvent.connectWithHandler(this.relay.onEvent, (event, next) => {
if (this.state === NostrSubscription.OPEN) {
next(event.body);
}
});
this.onEOSE.connectWithHandler(this.relay.onEOSE, (eose, next) => {
if (this.state === NostrSubscription.OPEN) next(eose);
});
this.subs.push(
this.relay.onEvent.subscribe((message) => {
if (this.state === NostrSubscription.OPEN && message.subId === this.id) {
this.onEvent.next(message.body);
}
}),
);
this.subs.push(
this.relay.onEOSE.subscribe((eose) => {
if (this.state === NostrSubscription.OPEN && eose.subId === this.id) this.onEOSE.next(eose);
}),
);
}
send(message: NostrOutgoingMessage) {
@ -72,6 +78,9 @@ export default class NostrSubscription {
// unsubscribe from relay messages
relayPoolService.removeClaim(this.relay.url, this);
for (const sub of this.subs) sub.unsubscribe();
this.subs = [];
return this;
}
}

View File

@ -1,6 +1,5 @@
import { relaysFromContactsEvent } from "../helpers/nostr/contacts";
import { getRelaysFromMailbox } from "../helpers/nostr/mailbox";
import { safeJson } from "../helpers/parse";
import { safeRelayUrl } from "../helpers/relay";
import relayPoolService from "../services/relay-pool";
import { NostrEvent } from "../types/nostr-event";

View File

@ -2,8 +2,9 @@ import { offlineMode } from "../services/offline-mode";
import relayScoreboardService from "../services/relay-scoreboard";
import { RawIncomingNostrEvent, NostrEvent, CountResponse } from "../types/nostr-event";
import { NostrOutgoingMessage } from "../types/nostr-query";
import ControlledObservable from "./controlled-observable";
import createDefer, { Deferred } from "./deferred";
import { PersistentSubject, Subject } from "./subject";
import { PersistentSubject } from "./subject";
export type IncomingEvent = {
type: "EVENT";
@ -46,13 +47,13 @@ const CONNECTION_TIMEOUT = 1000 * 30;
export default class Relay {
url: string;
status = new PersistentSubject<number>(WebSocket.CLOSED);
onOpen = new Subject<Relay>(undefined, false);
onClose = new Subject<Relay>(undefined, false);
onEvent = new Subject<IncomingEvent>(undefined, false);
onNotice = new Subject<IncomingNotice>(undefined, false);
onCount = new Subject<IncomingCount>(undefined, false);
onEOSE = new Subject<IncomingEOSE>(undefined, false);
onCommandResult = new Subject<IncomingCommandResult>(undefined, false);
onOpen = new ControlledObservable<Relay>();
onClose = new ControlledObservable<Relay>();
onEvent = new ControlledObservable<IncomingEvent>();
onNotice = new ControlledObservable<IncomingNotice>();
onCount = new ControlledObservable<IncomingCount>();
onEOSE = new ControlledObservable<IncomingEOSE>();
onCommandResult = new ControlledObservable<IncomingCommandResult>();
ws?: WebSocket;
private connectionPromises: Deferred<void>[] = [];

View File

@ -1,111 +1,62 @@
export type ListenerFn<T> = (value: T) => void;
interface Connectable<Value> {
value?: Value;
subscribe(listener: ListenerFn<Value>, ctx?: Object): this;
unsubscribe(listener: ListenerFn<Value>, ctx?: Object): this;
}
interface ConnectableApi<T> {
connect(connectable: Connectable<T>): this;
disconnect(connectable: Connectable<T>): this;
}
export type Connection<From, To = From, Prev = To> = (value: From, next: (value: To) => any, prevValue: Prev) => void;
import Observable from "zen-observable";
import { nanoid } from "nanoid";
import ControlledObservable from "./controlled-observable";
export class Subject<Value> implements Connectable<Value> {
listeners: [ListenerFn<Value>, Object | undefined][] = [];
/** An observable that is always open and stores the last value */
export default class Subject<T> {
private observable: ControlledObservable<T>;
id = nanoid(8);
value: T | undefined;
value?: Value;
cacheValue: boolean;
constructor(value?: Value, cacheValue = true) {
this.cacheValue = cacheValue;
if (this.cacheValue) this.value = value;
constructor(value?: T) {
this.observable = new ControlledObservable();
this.value = value;
this.subscribe = this.observable.subscribe.bind(this.observable);
}
next(value: Value) {
if (this.value === value) return;
if (this.cacheValue) this.value = value;
for (const [listener, ctx] of this.listeners) {
if (ctx) listener.call(ctx, value);
else listener(value);
}
return this;
next(v: T) {
this.value = v;
this.observable.next(v);
}
error(err: any) {
this.observable.error(err);
}
private findListener(callback: ListenerFn<Value>, ctx?: Object) {
return this.listeners.find((l) => {
return l[0] === callback && l[1] === ctx;
[Symbol.observable]() {
return this.observable;
}
subscribe: Observable<T>["subscribe"];
map<R>(callback: (value: T) => R, defaultValue?: R): Subject<R> {
const child = new Subject(defaultValue);
this.subscribe((value) => {
try {
child.next(callback(value));
} catch (e) {
child.error(e);
}
});
return child;
}
/** @deprecated */
connectWithMapper<R>(
subject: Subject<R>,
map: (value: R, next: (value: T) => void, current: T | undefined) => void,
): ZenObservable.Subscription {
return subject.subscribe((value) => {
map(value, (v) => this.next(v), this.value);
});
}
subscribe(listener: ListenerFn<Value>, ctx?: Object, initCall = true) {
if (!this.findListener(listener, ctx)) {
this.listeners.push([listener, ctx]);
if (initCall) {
if (this.value !== undefined) {
if (ctx) listener.call(ctx, this.value);
else listener(this.value);
}
}
}
return this;
}
unsubscribe(listener: ListenerFn<Value>, ctx?: Object) {
const entry = this.findListener(listener, ctx);
if (entry) {
this.listeners = this.listeners.filter((l) => l !== entry);
}
return this;
}
get hasListeners() {
return this.listeners.length > 0;
}
upstream = new Map<Connectable<any>, ListenerFn<any>>();
connect(connectable: Connectable<Value>) {
if (!this.upstream.has(connectable)) {
const handler = this.next.bind(this);
this.upstream.set(connectable, handler);
connectable.subscribe(handler, this);
if (connectable.value !== undefined) {
handler(connectable.value);
}
}
return this;
}
connectWithHandler<From>(connectable: Connectable<From>, connection: Connection<From, Value, typeof this.value>) {
if (!this.upstream.has(connectable)) {
const handler = (value: From) => {
connection(value, this.next.bind(this), this.value);
};
this.upstream.set(connectable, handler);
connectable.subscribe(handler, this);
}
return this;
}
disconnect(connectable: Connectable<any>) {
const handler = this.upstream.get(connectable);
if (handler) {
this.upstream.delete(connectable);
connectable.unsubscribe(handler, this);
}
return this;
}
disconnectAll() {
for (const [connectable, listener] of this.upstream) {
this.disconnect(connectable);
}
}
}
export class PersistentSubject<Value> extends Subject<Value> implements ConnectableApi<Value> {
value: Value;
constructor(value: Value) {
super(value, true);
export class PersistentSubject<T> extends Subject<T> {
value: T;
constructor(value: T) {
super();
this.value = value;
}
}
export default Subject;

View File

@ -22,6 +22,7 @@ import {
} from "../helpers/nostr/filter";
import { localRelay } from "../services/local-relay";
import { relayRequest } from "../helpers/relay";
import SuperMap from "./super-map";
const BLOCK_SIZE = 100;
@ -32,6 +33,7 @@ export class RelayBlockLoader {
filter: NostrRequestFilter;
blockSize = BLOCK_SIZE;
private log: Debugger;
private subs: ZenObservable.Subscription[] = [];
loading = false;
events: EventStore;
@ -47,7 +49,7 @@ export class RelayBlockLoader {
this.log = log || logger.extend(relay);
this.events = new EventStore(relay);
deleteEventService.stream.subscribe(this.handleDeleteEvent, this);
this.subs.push(deleteEventService.stream.subscribe((e) => this.handleDeleteEvent(e)));
}
loadNextBlock() {
@ -91,7 +93,8 @@ export class RelayBlockLoader {
}
cleanup() {
deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this);
for (const sub of this.subs) sub.unsubscribe();
this.subs = [];
}
getFirstEvent(nth = 0, eventFilter?: EventFilter) {
@ -124,16 +127,15 @@ export default class TimelineLoader {
this.name = name;
this.log = logger.extend("TimelineLoader:" + name);
this.events = new EventStore(name);
this.events.connect(replaceableEventLoaderService.events, false);
this.subscription = new NostrMultiSubscription(name);
this.subscription.onEvent.subscribe(this.handleEvent, this);
this.subscription.onEvent.subscribe(this.handleEvent.bind(this));
// update the timeline when there are new events
this.events.onEvent.subscribe(this.throttleUpdateTimeline, this);
this.events.onDelete.subscribe(this.throttleUpdateTimeline, this);
this.events.onClear.subscribe(this.throttleUpdateTimeline, this);
deleteEventService.stream.subscribe(this.handleDeleteEvent, this);
this.events.onEvent.subscribe(this.throttleUpdateTimeline.bind(this));
this.events.onDelete.subscribe(this.throttleUpdateTimeline.bind(this));
this.events.onClear.subscribe(this.throttleUpdateTimeline.bind(this));
}
private throttleUpdateTimeline = _throttle(this.updateTimeline, 10);
@ -150,24 +152,20 @@ export default class TimelineLoader {
this.events.addEvent(event);
if (cache) localRelay.publish(event);
}
private handleDeleteEvent(deleteEvent: NostrEvent) {
const cord = deleteEvent.tags.find(isATag)?.[1];
const eventId = deleteEvent.tags.find(isETag)?.[1];
if (cord) this.events.deleteEvent(cord);
if (eventId) this.events.deleteEvent(eventId);
}
private blockLoaderSubs = new SuperMap<RelayBlockLoader, ZenObservable.Subscription[]>(() => []);
private connectToBlockLoader(loader: RelayBlockLoader) {
this.events.connect(loader.events);
loader.onBlockFinish.subscribe(this.updateLoading, this);
loader.onBlockFinish.subscribe(this.updateComplete, this);
const subs = this.blockLoaderSubs.get(loader);
subs.push(loader.onBlockFinish.subscribe(this.updateLoading.bind(this)));
subs.push(loader.onBlockFinish.subscribe(this.updateComplete.bind(this)));
}
private disconnectToBlockLoader(loader: RelayBlockLoader) {
loader.cleanup();
this.events.disconnect(loader.events);
loader.onBlockFinish.unsubscribe(this.updateLoading, this);
loader.onBlockFinish.unsubscribe(this.updateComplete, this);
const subs = this.blockLoaderSubs.get(loader);
for (const sub of subs) sub.unsubscribe();
this.blockLoaderSubs.delete(loader);
}
private loadQueriesFromCache(queryMap: RelayQueryMap) {
@ -305,7 +303,5 @@ export default class TimelineLoader {
this.blockLoaders.clear();
this.events.cleanup();
deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this);
}
}

View File

@ -132,9 +132,9 @@ function GenericNoteTimeline({ timeline }: { timeline: TimelineLoader }) {
}
};
intersectionSubject.subscribe(listener);
const sub = intersectionSubject.subscribe(listener);
return () => {
intersectionSubject.unsubscribe(listener);
sub.unsubscribe();
};
}, [
setPinDate,

View File

@ -1,8 +1,9 @@
import { useEffect, useMemo, useState } from "react";
import { useMemo, useState } from "react";
import eventReactionsService from "../services/event-reactions";
import { useReadRelays } from "./use-client-relays";
import { NostrEvent } from "../types/nostr-event";
import Subject from "../classes/subject";
import useSubjects from "./use-subjects";
export default function useEventsReactions(
eventIds: string[],
@ -29,17 +30,7 @@ export default function useEventsReactions(
const [_, update] = useState(0);
// subscribe to subjects
useEffect(() => {
const listener = () => update((v) => v + 1);
for (const [_, sub] of Object.entries(subjects)) {
sub?.subscribe(listener, undefined, false);
}
return () => {
for (const [_, sub] of Object.entries(subjects)) {
sub?.unsubscribe(listener, undefined);
}
};
}, [subjects, update]);
useSubjects(Object.values(subjects));
return reactions;
}

View File

@ -1,16 +1,16 @@
import { useEffect, useState } from "react";
import { PersistentSubject, Subject } from "../classes/subject";
import { useEffect, useRef, useState } from "react";
import Subject, { PersistentSubject } from "../classes/subject";
function useSubject<Value extends unknown>(subject: PersistentSubject<Value>): Value;
function useSubject<Value extends unknown>(subject?: PersistentSubject<Value>): Value | undefined;
function useSubject<Value extends unknown>(subject?: Subject<Value>): Value | undefined;
function useSubject<Value extends unknown>(subject?: Subject<Value>) {
const [_, setValue] = useState(subject?.value);
const subRef = useRef(subject);
useEffect(() => {
subject?.subscribe(setValue, undefined, false);
return () => {
subject?.unsubscribe(setValue, undefined);
};
if (subject?.value !== undefined) setValue(subject?.value);
const sub = subject?.subscribe((v) => setValue(v));
return () => sub?.unsubscribe();
}, [subject, setValue]);
return subject?.value;

View File

@ -1,5 +1,5 @@
import { useEffect, useState } from "react";
import { PersistentSubject, Subject } from "../classes/subject";
import Subject, { PersistentSubject } from "../classes/subject";
function useSubjects<Value extends unknown>(
subjects: (Subject<Value> | PersistentSubject<Value> | undefined)[] = [],
@ -9,13 +9,9 @@ function useSubjects<Value extends unknown>(
useEffect(() => {
const listener = () => update((v) => v + 1);
for (const sub of subjects) {
sub?.subscribe(listener, undefined, false);
}
const subs = subjects.map((s) => s?.subscribe(listener));
return () => {
for (const sub of subjects) {
sub?.unsubscribe(listener, undefined);
}
for (const sub of subs) sub?.unsubscribe();
};
}, [subjects, update]);

View File

@ -75,7 +75,7 @@ export default function IntersectionObserverProvider({
callback: ExtendedIntersectionObserverCallback;
}) {
const elementIds = useMemo(() => new WeakMap<Element, string>(), []);
const [subject] = useState(() => new Subject<ExtendedIntersectionObserverEntry[]>([], false));
const [subject] = useState(() => new Subject<ExtendedIntersectionObserverEntry[]>([]));
const handleIntersection = useCallback<IntersectionObserverCallback>(
(entries, observer) => {

View File

@ -30,7 +30,7 @@ const RELAY_REQUEST_BATCH_TIME = 1000;
/** This class is ued to batch requests to a single relay */
class ChannelMetadataRelayLoader {
private subscription: NostrSubscription;
private events = new SuperMap<Pubkey, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
private events = new SuperMap<string, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
private requestNext = new Set<string>();
private requested = new Map<string, Date>();
@ -234,7 +234,7 @@ class ChannelMetadataService {
for (const relay of relayUrls) {
const request = this.loaders.get(relay).requestMetadata(channelId);
sub.connectWithHandler(request, (event, next, current) => {
sub.connectWithMapper(request, (event, next, current) => {
if (!current || event.created_at > current.created_at) {
next(event);
this.saveToCache(channelId, event);

View File

@ -203,6 +203,7 @@ export async function deleteDatabase() {
db.close();
log("Deleting");
await deleteDB(dbName);
localDatabase.close();
await nostrIDBDelete();
window.location.reload();
}

View File

@ -1,10 +1,10 @@
import { kinds } from "nostr-tools";
import Subject from "../classes/subject";
import { getEventUID } from "../helpers/nostr/events";
import { NostrEvent } from "../types/nostr-event";
import ControlledObservable from "../classes/controlled-observable";
const deleteEventStream = new Subject<NostrEvent>();
const deleteEventStream = new ControlledObservable<NostrEvent>();
function handleEvent(deleteEvent: NostrEvent) {
if (deleteEvent.kind !== kinds.EventDeletion) return;

View File

@ -74,7 +74,7 @@ class EventReactionsService {
if (filters.length > 0) {
const request = new NostrRequest([relay]);
request.onEvent.subscribe(this.handleEvent, this);
request.onEvent.subscribe((e) => this.handleEvent(e));
request.start(filters);
}
}

View File

@ -74,7 +74,7 @@ class EventZapsService {
if (filter.length > 0) {
const request = new NostrRequest([relay]);
request.onEvent.subscribe(this.handleEvent, this);
request.onEvent.subscribe((e) => this.handleEvent(e));
request.start(filter);
}
}

View File

@ -1,7 +1,6 @@
import { CacheRelay, openDB, pruneLastUsed } from "nostr-idb";
import { CacheRelay, openDB } from "nostr-idb";
import { Relay } from "nostr-tools";
import { logger } from "../helpers/debug";
import _throttle from "lodash.throttle";
import { safeRelayUrl } from "../helpers/relay";
// save the local relay from query params to localStorage

View File

@ -1,6 +1,7 @@
import { finalizeEvent, generateSecretKey, getPublicKey, kinds, nip04, nip19 } from "nostr-tools";
import dayjs from "dayjs";
import { nanoid } from "nanoid";
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import NostrMultiSubscription from "../classes/nostr-multi-subscription";
import { getPubkeyFromDecodeResult, isHexKey, normalizeToHexPubkey } from "../helpers/nip19";
@ -10,7 +11,6 @@ import { DraftNostrEvent, NostrEvent, isPTag } from "../types/nostr-event";
import createDefer, { Deferred } from "../classes/deferred";
import { truncatedId } from "../helpers/nostr/events";
import { NostrConnectAccount } from "./account";
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import { safeRelayUrl } from "../helpers/relay";
export function isErrorResponse(response: any): response is NostrConnectErrorResponse {
@ -79,7 +79,7 @@ export class NostrConnectClient {
this.secretKey = secretKey || bytesToHex(generateSecretKey());
this.publicKey = getPublicKey(hexToBytes(this.secretKey));
this.sub.onEvent.subscribe(this.handleEvent, this);
this.sub.onEvent.subscribe((e) => this.handleEvent(e));
this.sub.setQueryMap(
createSimpleQueryMap(this.relays, {
kinds: [kinds.NostrConnect, 24134],

View File

@ -51,7 +51,7 @@ class RelayStatsService {
if (!info.pubkey) return sub.next(null);
const request = new NostrRequest([relay, MONITOR_RELAY]);
request.onEvent.subscribe(this.handleEvent, this);
request.onEvent.subscribe((e) => this.handleEvent(e));
request.start({ kinds: [SELF_REPORTED_KIND], authors: [info.pubkey] });
});
}
@ -75,7 +75,7 @@ class RelayStatsService {
const relays = Array.from(this.pendingMonitorStats);
const request = new NostrRequest([MONITOR_RELAY]);
request.onEvent.subscribe(this.handleEvent, this);
request.onEvent.subscribe((e) => this.handleEvent(e));
request.start({ since: 1704196800, kinds: [MONITOR_STATS_KIND], "#d": relays, authors: [MONITOR_PUBKEY] });
this.pendingMonitorStats.clear();

View File

@ -6,7 +6,6 @@ import { Filter } from "nostr-tools";
import NostrSubscription from "../classes/nostr-subscription";
import SuperMap from "../classes/super-map";
import { NostrEvent } from "../types/nostr-event";
import Subject from "../classes/subject";
import { NostrQuery } from "../types/nostr-query";
import { logger } from "../helpers/debug";
import { nameOrPubkey } from "./user-metadata";
@ -14,6 +13,8 @@ import { getEventCoordinate } from "../helpers/nostr/events";
import createDefer, { Deferred } from "../classes/deferred";
import { localRelay } from "./local-relay";
import { relayRequest } from "../helpers/relay";
import EventStore from "../classes/event-store";
import Subject from "../classes/subject";
type Pubkey = string;
type Relay = string;
@ -39,7 +40,7 @@ const RELAY_REQUEST_BATCH_TIME = 500;
/** This class is ued to batch requests to a single relay */
class ReplaceableEventRelayLoader {
private subscription: NostrSubscription;
private events = new SuperMap<Pubkey, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
events = new EventStore();
private requestNext = new Set<string>();
private requested = new Map<string, Date>();
@ -58,14 +59,12 @@ class ReplaceableEventRelayLoader {
private handleEvent(event: NostrEvent) {
const cord = getEventCoordinate(event);
// remove the pubkey from the waiting list
// remove the cord from the waiting list
this.requested.delete(cord);
const sub = this.events.get(cord);
const current = sub.value;
const current = this.events.getEvent(cord);
if (!current || event.created_at > current.created_at) {
sub.next(event);
this.events.addEvent(event);
}
}
private handleEOSE() {
@ -73,19 +72,13 @@ class ReplaceableEventRelayLoader {
this.requested.clear();
}
getEvent(kind: number, pubkey: string, d?: string) {
return this.events.get(createCoordinate(kind, pubkey, d));
}
requestEvent(kind: number, pubkey: string, d?: string) {
const cord = createCoordinate(kind, pubkey, d);
const event = this.events.get(cord);
if (!event.value) {
const event = this.events.getEvent(cord);
if (!event) {
this.requestNext.add(cord);
this.updateThrottle();
}
return event;
}
@ -152,11 +145,14 @@ const READ_CACHE_BATCH_TIME = 250;
const WRITE_CACHE_BATCH_TIME = 250;
class ReplaceableEventLoaderService {
private events = new SuperMap<Pubkey, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
private subjects = new SuperMap<Pubkey, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
private loaders = new SuperMap<Relay, ReplaceableEventRelayLoader>((relay) => {
const loader = new ReplaceableEventRelayLoader(relay, this.log.extend(relay));
loader.events.onEvent.subscribe((e) => this.handleEvent(e));
return loader;
});
private loaders = new SuperMap<Relay, ReplaceableEventRelayLoader>(
(relay) => new ReplaceableEventRelayLoader(relay, this.log.extend(relay)),
);
events = new EventStore();
log = logger.extend("ReplaceableEventLoader");
dbLog = this.log.extend("database");
@ -164,18 +160,17 @@ class ReplaceableEventLoaderService {
handleEvent(event: NostrEvent, saveToCache = true) {
const cord = getEventCoordinate(event);
const sub = this.events.get(cord);
const current = sub.value;
const subject = this.subjects.get(cord);
const current = subject.value;
if (!current || event.created_at > current.created_at) {
sub.next(event);
if (saveToCache) {
this.saveToCache(cord, event);
}
subject.next(event);
this.events.addEvent(event);
if (saveToCache) this.saveToCache(cord, event);
}
}
getEvent(kind: number, pubkey: string, d?: string) {
return this.events.get(createCoordinate(kind, pubkey, d));
return this.subjects.get(createCoordinate(kind, pubkey, d));
}
private readFromCachePromises = new Map<string, Deferred<boolean>>();
@ -248,25 +243,16 @@ class ReplaceableEventLoaderService {
private requestEventFromRelays(relays: Iterable<string>, kind: number, pubkey: string, d?: string) {
const cord = createCoordinate(kind, pubkey, d);
const sub = this.events.get(cord);
const sub = this.subjects.get(cord);
for (const relay of relays) {
const request = this.loaders.get(relay).requestEvent(kind, pubkey, d);
sub.connectWithHandler(request, (event, next, current) => {
if (!current || event.created_at > current.created_at) {
next(event);
this.saveToCache(cord, event);
}
});
}
for (const relay of relays) this.loaders.get(relay).requestEvent(kind, pubkey, d);
return sub;
}
requestEvent(relays: Iterable<string>, kind: number, pubkey: string, d?: string, opts: RequestOptions = {}) {
const cord = createCoordinate(kind, pubkey, d);
const sub = this.events.get(cord);
const sub = this.subjects.get(cord);
if (!sub.value) {
this.loadFromCache(cord).then((loaded) => {

View File

@ -12,6 +12,7 @@ appSettings.subscribe((event) => {
log(`Changed`, event);
});
let accountSub: ZenObservable.Subscription;
accountService.current.subscribe(() => {
const account = accountService.current.value;
@ -20,7 +21,7 @@ accountService.current.subscribe(() => {
return;
}
appSettings.disconnectAll();
if (accountSub) accountSub.unsubscribe();
if (account.localSettings) {
appSettings.next(account.localSettings);
@ -30,8 +31,8 @@ accountService.current.subscribe(() => {
const subject = userAppSettings.requestAppSettings(account.pubkey, clientRelaysService.readRelays.value, {
alwaysRequest: true,
});
appSettings.next(defaultSettings);
appSettings.connect(subject);
appSettings.next(subject.value || defaultSettings);
accountSub = subject.subscribe((s) => appSettings.next(s));
});
// clientRelaysService.relays.subscribe(() => {

View File

@ -26,7 +26,7 @@ class UserAppSettings {
SETTING_EVENT_IDENTIFIER,
opts,
);
sub.connectWithHandler(requestSub, (event, next) => next(parseAppSettings(event)));
sub.connectWithMapper(requestSub, (event, next) => next(parseAppSettings(event)));
return sub;
}

View File

@ -181,7 +181,7 @@ class SigningService {
await client.ensureConnected();
return await client.nip04Encrypt(pubkey, text);
default:
throw new Error("Unknown connection type");
throw new Error("Unknown account type");
}
}
}

View File

@ -1,22 +1,22 @@
import _throttle from "lodash.throttle";
import NostrRequest from "../classes/nostr-request";
import Subject from "../classes/subject";
import SuperMap from "../classes/super-map";
import { NostrEvent } from "../types/nostr-event";
import { localRelay } from "./local-relay";
import { relayRequest, safeRelayUrls } from "../helpers/relay";
import { logger } from "../helpers/debug";
import Subject from "../classes/subject";
const RELAY_REQUEST_BATCH_TIME = 500;
class SingleEventService {
private cache = new SuperMap<string, Subject<NostrEvent>>(() => new Subject());
private subjects = new SuperMap<string, Subject<NostrEvent>>(() => new Subject<NostrEvent>());
pending = new Map<string, string[]>();
log = logger.extend("SingleEvent");
requestEvent(id: string, relays: Iterable<string>) {
const subject = this.cache.get(id);
const subject = this.subjects.get(id);
if (subject.value) return subject;
const safeURLs = safeRelayUrls(Array.from(relays));
@ -28,8 +28,7 @@ class SingleEventService {
}
handleEvent(event: NostrEvent, cache = true) {
this.cache.get(event.id).next(event);
this.subjects.get(event.id).next(event);
if (cache) localRelay.publish(event);
}
@ -62,7 +61,7 @@ class SingleEventService {
for (const [relay, ids] of Object.entries(idsFromRelays)) {
const request = new NostrRequest([relay]);
request.onEvent.subscribe(this.handleEvent, this);
request.onEvent.subscribe((event) => this.handleEvent(event));
request.start({ ids });
}
this.pending.clear();
@ -71,4 +70,9 @@ class SingleEventService {
const singleEventService = new SingleEventService();
if (import.meta.env.DEV) {
//@ts-expect-error
window.singleEventService = singleEventService;
}
export default singleEventService;

View File

@ -29,18 +29,19 @@ function nip65ToUserMailboxes(event: NostrEvent): UserMailboxes {
}
class UserMailboxesService {
private subjects = new SuperMap<string, Subject<UserMailboxes>>(() => new Subject<UserMailboxes>());
private subjects = new SuperMap<string, Subject<UserMailboxes>>((pubkey) =>
replaceableEventLoaderService.getEvent(kinds.RelayList, pubkey).map(nip65ToUserMailboxes),
);
getMailboxes(pubkey: string) {
return this.subjects.get(pubkey);
}
requestMailboxes(pubkey: string, relays: Iterable<string>, opts: RequestOptions = {}) {
const sub = this.subjects.get(pubkey);
const requestSub = replaceableEventLoaderService.requestEvent(relays, kinds.RelayList, pubkey, undefined, opts);
sub.connectWithHandler(requestSub, (event, next) => next(nip65ToUserMailboxes(event)));
replaceableEventLoaderService.requestEvent(relays, kinds.RelayList, pubkey, undefined, opts);
// also fetch the relays from the users contacts
const contactsSub = replaceableEventLoaderService.requestEvent(relays, kinds.Contacts, pubkey, undefined, opts);
sub.connectWithHandler(contactsSub, (event, next, value) => {
sub.connectWithMapper(contactsSub, (event, next, value) => {
// NOTE: only use relays from contact list if the user dose not have a NIP-65 relay list
const relays = relaysFromContactsEvent(event);
if (relays.length > 0 && !value) {
@ -60,12 +61,8 @@ class UserMailboxesService {
async loadFromCache(pubkey: string) {
const sub = this.subjects.get(pubkey);
// load from cache
await replaceableEventLoaderService.loadFromCache(createCoordinate(kinds.RelayList, pubkey));
const requestSub = replaceableEventLoaderService.getEvent(kinds.RelayList, pubkey);
sub.connectWithHandler(requestSub, (event, next) => next(nip65ToUserMailboxes(event)));
return sub;
}
receiveEvent(event: NostrEvent) {

View File

@ -1,53 +1,22 @@
import db from "./db";
import { kinds } from "nostr-tools";
import _throttle from "lodash.throttle";
import { Kind0ParsedContent, getSearchNames, parseKind0Event } from "../helpers/user-metadata";
import { Kind0ParsedContent, parseKind0Event } from "../helpers/user-metadata";
import SuperMap from "../classes/super-map";
import Subject from "../classes/subject";
import replaceableEventLoaderService, { RequestOptions } from "./replaceable-event-requester";
const WRITE_USER_SEARCH_BATCH_TIME = 500;
class UserMetadataService {
private metadata = new SuperMap<string, Subject<Kind0ParsedContent>>((pubkey) => {
const sub = new Subject<Kind0ParsedContent>();
sub.subscribe((metadata) => {
if (metadata) {
this.writeSearchQueue.add(pubkey);
this.writeSearchDataThrottle();
}
});
return sub;
return replaceableEventLoaderService.getEvent(0, pubkey).map(parseKind0Event);
});
getSubject(pubkey: string) {
return this.metadata.get(pubkey);
}
requestMetadata(pubkey: string, relays: Iterable<string>, opts: RequestOptions = {}) {
const sub = this.metadata.get(pubkey);
const requestSub = replaceableEventLoaderService.requestEvent(relays, kinds.Metadata, pubkey, undefined, opts);
sub.connectWithHandler(requestSub, (event, next) => next(parseKind0Event(event)));
return sub;
}
private writeSearchQueue = new Set<string>();
private writeSearchDataThrottle = _throttle(this.writeSearchData.bind(this), WRITE_USER_SEARCH_BATCH_TIME);
private async writeSearchData() {
if (this.writeSearchQueue.size === 0) return;
const keys = Array.from(this.writeSearchQueue);
this.writeSearchQueue.clear();
const transaction = db.transaction("userSearch", "readwrite");
for (const pubkey of keys) {
const metadata = this.getSubject(pubkey).value;
if (metadata) {
const names = getSearchNames(metadata);
transaction.objectStore("userSearch").put({ pubkey, names });
}
}
transaction.commit();
await transaction.done;
const subject = this.metadata.get(pubkey);
replaceableEventLoaderService.requestEvent(relays, kinds.Metadata, pubkey, undefined, opts);
return subject;
}
}

View File

@ -0,0 +1,33 @@
import _throttle from "lodash.throttle";
import { getSearchNames } from "../helpers/user-metadata";
import db from "./db";
import replaceableEventLoaderService from "./replaceable-event-requester";
import userMetadataService from "./user-metadata";
const WRITE_USER_SEARCH_BATCH_TIME = 500;
const writeSearchQueue = new Set<string>();
const writeSearchData = _throttle(async () => {
if (writeSearchQueue.size === 0) return;
const keys = Array.from(writeSearchQueue);
writeSearchQueue.clear();
const transaction = db.transaction("userSearch", "readwrite");
for (const pubkey of keys) {
const metadata = userMetadataService.getSubject(pubkey).value;
if (metadata) {
const names = getSearchNames(metadata);
transaction.objectStore("userSearch").put({ pubkey, names });
}
}
transaction.commit();
await transaction.done;
}, WRITE_USER_SEARCH_BATCH_TIME);
replaceableEventLoaderService.events.onEvent.subscribe((event) => {
if (event.kind === 0) {
writeSearchQueue.add(event.pubkey);
writeSearchData();
}
});

View File

@ -155,9 +155,11 @@ export default function RelayDetailsTab({ relay }: { relay: string }) {
const loadMore = useCallback(() => {
setLoading(true);
const request = new NostrRequest([relay]);
request.onEvent.subscribe(store.addEvent, store);
const throttle = _throttle(() => update({}), 100);
request.onEvent.subscribe(() => throttle());
request.onEvent.subscribe((e) => {
store.addEvent(e);
throttle();
});
request.onComplete.then(() => setLoading(false));
const query: NostrQuery = { limit: 500 };

View File

@ -1,8 +1,8 @@
import { ReactNode, memo, useMemo, useState } from "react";
import { Button, ButtonGroup, Divider, Flex, Heading } from "@chakra-ui/react";
import dayjs from "dayjs";
import { useInterval, useObservable } from "react-use";
import useSubject from "../../../hooks/use-subject";
import useCurrentAccount from "../../../hooks/use-current-account";
import useStreamChatTimeline from "../stream/stream-chat/use-stream-chat-timeline";
import UserAvatar from "../../../components/user-avatar";
@ -12,7 +12,6 @@ import { useMuteModalContext } from "../../../providers/route/mute-modal-provide
import useUserMuteList from "../../../hooks/use-user-mute-list";
import { isPubkeyInList } from "../../../helpers/nostr/lists";
import { ParsedStream } from "../../../helpers/nostr/stream";
import { useInterval } from "react-use";
function Countdown({ time }: { time: number }) {
const [now, setNow] = useState(dayjs().unix());
@ -61,7 +60,7 @@ function UsersCard({ stream }: { stream: ParsedStream }) {
const streamChatTimeline = useStreamChatTimeline(stream);
// refresh when a new event
useSubject(streamChatTimeline.events.onEvent);
useObservable(streamChatTimeline.events.onEvent);
const chatEvents = streamChatTimeline.events.getSortedEvents();
const muteList = useUserMuteList(account.pubkey);

View File

@ -1,8 +1,8 @@
import { memo } from "react";
import { Flex } from "@chakra-ui/react";
import { kinds } from "nostr-tools";
import { useObservable } from "react-use";
import useSubject from "../../../hooks/use-subject";
import useStreamChatTimeline from "../stream/stream-chat/use-stream-chat-timeline";
import ZapMessageMemo from "../stream/stream-chat/zap-message";
import { ParsedStream } from "../../../helpers/nostr/stream";
@ -11,7 +11,7 @@ function ZapsCard({ stream }: { stream: ParsedStream }) {
const streamChatTimeline = useStreamChatTimeline(stream);
// refresh when a new event
useSubject(streamChatTimeline.events.onEvent);
useObservable(streamChatTimeline.events.onEvent);
const zapMessages = streamChatTimeline.events.getSortedEvents().filter((event) => {
if (stream.starts && event.created_at < stream.starts) return false;
if (stream.ends && event.created_at > stream.ends) return false;

View File

@ -5,6 +5,7 @@ import ForceGraph, { LinkObject, NodeObject } from "react-force-graph-3d";
import { kinds } from "nostr-tools";
import dayjs from "dayjs";
import { useNavigate } from "react-router-dom";
import { useDebounce, useObservable } from "react-use";
import {
Group,
Mesh,
@ -25,8 +26,6 @@ import { useUserMetadata } from "../../hooks/use-user-metadata";
import EventStore from "../../classes/event-store";
import NostrRequest from "../../classes/nostr-request";
import { isPTag } from "../../types/nostr-event";
import { useDebounce } from "react-use";
import useSubject from "../../hooks/use-subject";
import { ChevronLeftIcon } from "../../components/icons";
import { useReadRelays } from "../../hooks/use-client-relays";
@ -53,7 +52,7 @@ function NetworkDMGraphPage() {
store.clear();
const request = new NostrRequest(relays);
request.onEvent.subscribe(store.addEvent, store);
request.onEvent.subscribe((e) => store.addEvent(e));
request.start({
authors: contactsPubkeys,
kinds: [kinds.EncryptedDirectMessage],
@ -71,7 +70,7 @@ function NetworkDMGraphPage() {
const selfMetadata = useUserMetadata(account.pubkey);
const usersMetadata = useUsersMetadata(contactsPubkeys);
const newEventTrigger = useSubject(store.onEvent);
const newEventTrigger = useObservable(store.onEvent);
const graphData = useMemo(() => {
if (store.events.size === 0) return { nodes: [], links: [] };

View File

@ -3012,6 +3012,11 @@
resolved "https://registry.yarnpkg.com/@types/webxr/-/webxr-0.5.13.tgz#5fd07863819c30869d66b765926d0b5a53a7e9e0"
integrity sha512-Hi4K3aTEoaa31Cep75AA9wK5q2iZgC1L70serPbI11L4YieoZpu5LvLr6FZXyIdqkkGPh1WMuDf6oSPHJXBkoA==
"@types/zen-observable@^0.8.7":
version "0.8.7"
resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.7.tgz#114e2ffc8d5be4915fdd5bc90668fc0ceaadb760"
integrity sha512-LKzNTjj+2j09wAo/vvVjzgw5qckJJzhdGgWHW7j69QIGdq/KnZrMAMIHQiWGl3Ccflh5/CudBAntTPYdprPltA==
"@uiw/codemirror-extensions-basic-setup@4.21.21":
version "4.21.21"
resolved "https://registry.yarnpkg.com/@uiw/codemirror-extensions-basic-setup/-/codemirror-extensions-basic-setup-4.21.21.tgz#243ef309cb53253b14187649a7abc0d996420a20"
@ -7204,6 +7209,11 @@ yocto-queue@^0.1.0:
resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b"
integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==
zen-observable@^0.10.0:
version "0.10.0"
resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.10.0.tgz#ee10eba75272897dbee5f152ab26bb5e0107f0c8"
integrity sha512-iI3lT0iojZhKwT5DaFy2Ce42n3yFcLdFyOh01G7H0flMY60P8MJuVFEoJoNwXlmAyQ45GrjL6AcZmmlv8A5rbw==
zustand@^4.4.7:
version "4.5.0"
resolved "https://registry.yarnpkg.com/zustand/-/zustand-4.5.0.tgz#141354af56f91de378aa6c4b930032ab338f3ef0"