rebuild timeline loader to use query maps

This commit is contained in:
hzrd149
2023-11-28 17:11:04 -06:00
parent 7d4693571c
commit abc70e2db2
16 changed files with 276 additions and 350 deletions

View File

@@ -1,10 +1,16 @@
import { nanoid } from "nanoid";
import stringify from "json-stringify-deterministic";
import { Subject } from "./subject";
import { NostrEvent } from "../types/nostr-event";
import { NostrOutgoingMessage, NostrOutgoingRequest, NostrRequestFilter } from "../types/nostr-query";
import { NostrOutgoingRequest, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query";
import Relay, { IncomingEvent } from "./relay";
import relayPoolService from "../services/relay-pool";
function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) {
return stringify(a) === stringify(b);
}
export default class NostrMultiSubscription {
static INIT = "initial";
static OPEN = "open";
@@ -12,20 +18,16 @@ export default class NostrMultiSubscription {
id: string;
name?: string;
query?: NostrRequestFilter;
relayUrls: string[];
relays: Relay[];
queryMap: RelayQueryMap = {};
relays: Relay[] = [];
state = NostrMultiSubscription.INIT;
onEvent = new Subject<NostrEvent>();
seenEvents = new Set<string>();
constructor(relayUrls: string[], query?: NostrRequestFilter, name?: string) {
constructor(name?: string) {
this.id = nanoid();
this.query = query;
this.name = name;
this.relayUrls = relayUrls;
this.relays = this.relayUrls.map((url) => relayPoolService.requestRelay(url));
}
private handleEvent(event: IncomingEvent) {
if (this.state === NostrMultiSubscription.OPEN && event.subId === this.id && !this.seenEvents.has(event.body.id)) {
@@ -34,16 +36,67 @@ export default class NostrMultiSubscription {
}
}
/** listen for event and open events from relays */
private connectToRelay(relay: Relay) {
relay.onEvent.subscribe(this.handleEvent, this);
relay.onOpen.subscribe(this.handleRelayConnect, this);
relay.onClose.subscribe(this.handleRelayDisconnect, this);
relayPoolService.addClaim(relay.url, this);
}
/** stop listing to events from relays */
private disconnectFromRelay(relay: Relay) {
relay.onEvent.unsubscribe(this.handleEvent, this);
relay.onOpen.unsubscribe(this.handleRelayConnect, this);
relay.onClose.unsubscribe(this.handleRelayDisconnect, this);
relayPoolService.removeClaim(relay.url, this);
// if the subscription is open and had sent a request to the relay
if (this.state === NostrMultiSubscription.OPEN && this.relayQueries.has(relay)) {
relay.send(["CLOSE", this.id]);
}
this.relayQueries.delete(relay);
}
setQueryMap(queryMap: RelayQueryMap) {
if (isFilterEqual(this.queryMap, queryMap)) return;
// add and remove relays
for (const url of Object.keys(queryMap)) {
if (!this.queryMap[url]) {
if (this.relays.some((r) => r.url === url)) continue;
// add relay
const relay = relayPoolService.requestRelay(url);
this.relays.push(relay);
this.connectToRelay(relay);
}
}
for (const url of Object.keys(this.queryMap)) {
if (!queryMap[url]) {
const relay = this.relays.find((r) => r.url === url);
if (!relay) continue;
this.relays = this.relays.filter((r) => r !== relay);
this.disconnectFromRelay(relay);
}
}
this.queryMap = queryMap;
this.updateRelayQueries();
}
private relayQueries = new WeakMap<Relay, NostrRequestFilter>();
private updateRelayQueries() {
if (!this.query || this.state !== NostrMultiSubscription.OPEN) return;
const message: NostrOutgoingRequest = Array.isArray(this.query)
? ["REQ", this.id, ...this.query]
: ["REQ", this.id, this.query];
if (this.state !== NostrMultiSubscription.OPEN) return;
for (const relay of this.relays) {
if (this.relayQueries.get(relay) !== this.query) {
const filter = this.queryMap[relay.url];
const message: NostrOutgoingRequest = Array.isArray(filter)
? ["REQ", this.id, ...filter]
: ["REQ", this.id, filter];
const currentFilter = this.relayQueries.get(relay);
if (!currentFilter || !isFilterEqual(currentFilter, filter)) {
this.relayQueries.set(relay, filter);
relay.send(message);
}
}
@@ -54,80 +107,27 @@ export default class NostrMultiSubscription {
private handleRelayDisconnect(relay: Relay) {
this.relayQueries.delete(relay);
}
sendToAll(message: NostrOutgoingMessage) {
for (const relay of this.relays) {
relay.send(message);
}
}
/** listen for event and open events from relays */
private connectToRelays() {
for (const relay of this.relays) {
relay.onEvent.subscribe(this.handleEvent, this);
relay.onOpen.subscribe(this.handleRelayConnect, this);
relay.onClose.subscribe(this.handleRelayDisconnect, this);
relayPoolService.addClaim(relay.url, this);
}
}
/** stop listing to events from relays */
private disconnectFromRelays() {
for (const relay of this.relays) {
relay.onEvent.unsubscribe(this.handleEvent, this);
relay.onOpen.unsubscribe(this.handleRelayConnect, this);
relay.onClose.unsubscribe(this.handleRelayDisconnect, this);
relayPoolService.removeClaim(relay.url, this);
}
}
open() {
if (!this.query) throw new Error("Cant open without a query");
if (this.state === NostrMultiSubscription.OPEN) return this;
this.state = NostrMultiSubscription.OPEN;
this.connectToRelays();
// reconnect to all relays
for (const relay of this.relays) this.connectToRelay(relay);
// send queries
this.updateRelayQueries();
return this;
}
setQuery(query: NostrRequestFilter) {
this.query = query;
this.updateRelayQueries();
return this;
}
setRelays(relayUrls: string[]) {
this.disconnectFromRelays();
const newRelays = relayUrls.map((url) => relayPoolService.requestRelay(url));
for (const relay of this.relays) {
if (!newRelays.includes(relay)) {
// if the subscription is open and the relay is connected
if (this.state === NostrMultiSubscription.OPEN && relay.connected) {
// close the connection to this relay
relay.send(["CLOSE", this.id]);
}
}
}
// set new relays
this.relayUrls = relayUrls;
this.relays = newRelays;
if (this.state === NostrMultiSubscription.OPEN) {
this.connectToRelays();
this.updateRelayQueries();
}
}
close() {
if (this.state !== NostrMultiSubscription.OPEN) return this;
// forget all seen events
this.forgetEvents();
// unsubscribe from relay messages
for (const relay of this.relays) this.disconnectFromRelay(relay);
// set state
this.state = NostrMultiSubscription.CLOSED;
// send close message
this.sendToAll(["CLOSE", this.id]);
// forget all seen events
this.seenEvents.clear();
// unsubscribe from relay messages
this.disconnectFromRelays();
return this;
}

View File

@@ -3,7 +3,9 @@ import { NostrEvent } from "../types/nostr-event";
import NostrRequest from "./nostr-request";
import NostrMultiSubscription from "./nostr-multi-subscription";
import { PersistentSubject } from "./subject";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
/** @deprecated */
export default class ThreadLoader {
loading = new PersistentSubject(false);
focusId = new PersistentSubject<string>("");
@@ -16,7 +18,7 @@ export default class ThreadLoader {
constructor(relays: string[], eventId: string) {
this.relays = relays;
this.subscription = new NostrMultiSubscription(relays);
this.subscription = new NostrMultiSubscription();
this.subscription.onEvent.subscribe((event) => {
this.events.next({ ...this.events.value, [event.id]: event });
@@ -68,13 +70,13 @@ export default class ThreadLoader {
setRelays(relays: string[]) {
this.relays = relays;
this.subscription.setRelays(relays);
this.subscription.setQueryMap(createSimpleQueryMap(this.relays, { "#e": [this.rootId.value], kinds: [1] }));
this.loadEvent();
}
private updateSubscription() {
if (this.rootId.value) {
this.subscription.setQuery({ "#e": [this.rootId.value], kinds: [1] });
this.subscription.setQueryMap(createSimpleQueryMap(this.relays, { "#e": [this.rootId.value], kinds: [1] }));
if (this.subscription.state !== NostrMultiSubscription.OPEN) {
this.subscription.open();
}

View File

@@ -1,7 +1,9 @@
import dayjs from "dayjs";
import { Debugger } from "debug";
import stringify from "json-stringify-deterministic";
import { NostrEvent, isATag, isETag } from "../types/nostr-event";
import { NostrQuery, NostrRequestFilter } from "../types/nostr-query";
import { NostrQuery, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query";
import NostrRequest from "./nostr-request";
import NostrMultiSubscription from "./nostr-multi-subscription";
import Subject, { PersistentSubject } from "./subject";
@@ -10,13 +12,7 @@ import EventStore from "./event-store";
import { isReplaceable } from "../helpers/nostr/events";
import replaceableEventLoaderService from "../services/replaceable-event-requester";
import deleteEventService from "../services/delete-events";
function addToQuery(filter: NostrRequestFilter, query: NostrQuery) {
if (Array.isArray(filter)) {
return filter.map((f) => ({ ...f, ...query }));
}
return { ...filter, ...query };
}
import { addQueryToFilter, isFilterEqual, mapQueryMap } from "../helpers/nostr/filter";
const BLOCK_SIZE = 30;
@@ -24,7 +20,7 @@ export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;
export class RelayBlockLoader {
relay: string;
query: NostrRequestFilter;
filter: NostrRequestFilter;
blockSize = BLOCK_SIZE;
private log: Debugger;
@@ -35,9 +31,9 @@ export class RelayBlockLoader {
onBlockFinish = new Subject<void>();
constructor(relay: string, query: NostrRequestFilter, log?: Debugger) {
constructor(relay: string, filter: NostrRequestFilter, log?: Debugger) {
this.relay = relay;
this.query = query;
this.filter = filter;
this.log = log || logger.extend(relay);
this.events = new EventStore(relay);
@@ -47,10 +43,10 @@ export class RelayBlockLoader {
loadNextBlock() {
this.loading = true;
let query: NostrRequestFilter = addToQuery(this.query, { limit: this.blockSize });
let filter: NostrRequestFilter = addQueryToFilter(this.filter, { limit: this.blockSize });
let oldestEvent = this.getLastEvent();
if (oldestEvent) {
query = addToQuery(query, { until: oldestEvent.created_at - 1 });
filter = addQueryToFilter(filter, { until: oldestEvent.created_at - 1 });
}
const request = new NostrRequest([this.relay]);
@@ -70,7 +66,11 @@ export class RelayBlockLoader {
this.onBlockFinish.next();
});
request.start(query);
request.start(filter);
}
private handleEvent(event: NostrEvent) {
return this.events.addEvent(event);
}
private handleDeleteEvent(deleteEvent: NostrEvent) {
@@ -81,10 +81,6 @@ export class RelayBlockLoader {
if (eventId) this.events.deleteEvent(eventId);
}
private handleEvent(event: NostrEvent) {
return this.events.addEvent(event);
}
cleanup() {
deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this);
}
@@ -99,8 +95,7 @@ export class RelayBlockLoader {
export default class TimelineLoader {
cursor = dayjs().unix();
query?: NostrRequestFilter;
relays: string[] = [];
queryMap: RelayQueryMap = {};
events: EventStore;
timeline = new PersistentSubject<NostrEvent[]>([]);
@@ -114,14 +109,14 @@ export default class TimelineLoader {
private log: Debugger;
private subscription: NostrMultiSubscription;
blockLoaders = new Map<string, RelayBlockLoader>();
private blockLoaders = new Map<string, RelayBlockLoader>();
constructor(name: string) {
this.name = name;
this.log = logger.extend("TimelineLoader:" + name);
this.events = new EventStore(name);
this.subscription = new NostrMultiSubscription([], undefined, name);
this.subscription = new NostrMultiSubscription(name);
this.subscription.onEvent.subscribe(this.handleEvent, this);
// update the timeline when there are new events
@@ -153,72 +148,71 @@ export default class TimelineLoader {
if (eventId) this.events.deleteEvent(eventId);
}
private createBlockLoaders() {
if (!this.query) return;
for (const relay of this.relays) {
if (!this.blockLoaders.has(relay)) {
const loader = new RelayBlockLoader(relay, this.query, this.log.extend(relay));
this.blockLoaders.set(relay, loader);
private connectToBlockLoader(loader: RelayBlockLoader) {
this.events.connect(loader.events);
loader.onBlockFinish.subscribe(this.updateLoading, this);
loader.onBlockFinish.subscribe(this.updateComplete, this);
}
}
}
private removeBlockLoaders(filter?: (loader: RelayBlockLoader) => boolean) {
for (const [relay, loader] of this.blockLoaders) {
if (!filter || filter(loader)) {
private disconnectToBlockLoader(loader: RelayBlockLoader) {
loader.cleanup();
this.events.disconnect(loader.events);
loader.onBlockFinish.unsubscribe(this.updateLoading, this);
loader.onBlockFinish.unsubscribe(this.updateComplete, this);
}
setQueryMap(queryMap: RelayQueryMap) {
if (isFilterEqual(this.queryMap, queryMap)) return;
this.log("set query map", queryMap);
// remove relays
for (const relay of Object.keys(this.queryMap)) {
const loader = this.blockLoaders.get(relay);
if (!loader) continue;
if (!queryMap[relay]) {
this.disconnectToBlockLoader(loader);
this.blockLoaders.delete(relay);
}
}
for (const [relay, filter] of Object.entries(queryMap)) {
// remove outdated loaders
if (this.queryMap[relay] && !isFilterEqual(this.queryMap[relay], filter)) {
const old = this.blockLoaders.get(relay)!;
this.disconnectToBlockLoader(old);
this.blockLoaders.delete(relay);
}
setRelays(relays: string[]) {
if (this.relays.sort().join("|") === relays.sort().join("|")) return;
// remove loaders
this.removeBlockLoaders((loader) => !relays.includes(loader.relay));
this.relays = relays;
this.createBlockLoaders();
this.subscription.setRelays(relays);
this.updateComplete();
if (!this.blockLoaders.has(relay)) {
const loader = new RelayBlockLoader(relay, filter, this.log.extend(relay));
this.blockLoaders.set(relay, loader);
this.connectToBlockLoader(loader);
}
}
setQuery(query: NostrRequestFilter) {
if (JSON.stringify(this.query) === JSON.stringify(query)) return;
// remove all loaders
this.removeBlockLoaders();
this.queryMap = queryMap;
this.log("set query", query);
this.query = query;
// update the subscription query map and add limit
this.subscription.setQueryMap(
mapQueryMap(this.queryMap, (filter) => addQueryToFilter(filter, { limit: BLOCK_SIZE / 2 })),
);
// forget all events
// TODO: maybe smartly prune the events based on the new filter
this.forgetEvents();
// create any missing loaders
this.createBlockLoaders();
// update the complete flag
this.updateComplete();
// update the subscription with the new query
this.subscription.setQuery(addToQuery(query, { limit: BLOCK_SIZE / 2 }));
this.triggerBlockLoads();
}
setFilter(filter?: EventFilter) {
setEventFilter(filter?: EventFilter) {
this.eventFilter = filter;
this.updateTimeline();
}
setCursor(cursor: number) {
this.cursor = cursor;
this.loadNextBlocks();
this.triggerBlockLoads();
}
loadNextBlocks() {
triggerBlockLoads() {
let triggeredLoad = false;
for (const [relay, loader] of this.blockLoaders) {
if (loader.complete || loader.loading) continue;
@@ -230,8 +224,7 @@ export default class TimelineLoader {
}
if (triggeredLoad) this.updateLoading();
}
/** @deprecated */
loadMore() {
loadNextBlock() {
let triggeredLoad = false;
for (const [relay, loader] of this.blockLoaders) {
if (loader.complete || loader.loading) continue;
@@ -268,25 +261,27 @@ export default class TimelineLoader {
this.subscription.close();
}
forgetEvents() {
this.events.clear();
this.timeline.next([]);
this.subscription.forgetEvents();
}
reset() {
this.cursor = dayjs().unix();
this.removeBlockLoaders();
for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader);
this.blockLoaders.clear();
this.forgetEvents();
}
/** close the subscription and remove any event listeners for this timeline */
cleanup() {
this.close();
this.removeBlockLoaders();
for (const [_, loader] of this.blockLoaders) this.disconnectToBlockLoader(loader);
this.blockLoaders.clear();
this.events.cleanup();
deleteEventService.stream.unsubscribe(this.handleDeleteEvent, this);
}
// TODO: this is only needed because the current logic dose not remove events when the relay they where fetched from is removed
/** @deprecated */
forgetEvents() {
this.events.clear();
this.timeline.next([]);
this.subscription.forgetEvents();
}
}

View File

@@ -31,7 +31,7 @@ export default function ContactsWindow({
const [expanded, setExpanded] = useState(true);
// TODO: find a better way to load recent contacts
const [from, setFrom] = useState(() => dayjs().subtract(2, "days"));
const [from, setFrom] = useState(() => dayjs().subtract(2, "days").unix());
const conversations = useSubject(directMessagesService.conversations);
useEffect(() => directMessagesService.loadDateRange(from), [from]);
const sortedConversations = useMemo(() => {

View File

@@ -21,7 +21,7 @@ export default function TimelineActionAndStatus({ timeline }: { timeline: Timeli
}
return (
<Button onClick={() => timeline.loadMore()} flexShrink={0} size="lg" mx="auto" colorScheme="primary" my="4">
<Button onClick={() => timeline.loadNextBlock()} flexShrink={0} size="lg" mx="auto" colorScheme="primary" my="4">
Load More
</Button>
);

View File

@@ -88,6 +88,7 @@ function EventRow({
export default function TimelineHealth({ timeline }: { timeline: TimelineLoader }) {
const events = useSubject(timeline.timeline);
const relays = Array.from(Object.keys(timeline.queryMap));
return (
<>
@@ -103,7 +104,7 @@ export default function TimelineHealth({ timeline }: { timeline: TimelineLoader
</Th>
<Th p="2">Content</Th>
<Th />
{timeline.relays.map((relay) => (
{relays.map((relay) => (
<Th key={relay} title={relay} w="0.1rem" p="0">
<Tooltip label={relay}>
<Box p="2">
@@ -116,7 +117,7 @@ export default function TimelineHealth({ timeline }: { timeline: TimelineLoader
</Thead>
<Tbody>
{events.map((event) => (
<EventRow key={event.id} event={event} relays={timeline.relays} />
<EventRow key={event.id} event={event} relays={relays} />
))}
</Tbody>
</Table>

View File

@@ -0,0 +1,25 @@
import stringify from "json-stringify-deterministic";
import { NostrQuery, NostrRequestFilter, RelayQueryMap } from "../../types/nostr-query";
export function addQueryToFilter(filter: NostrRequestFilter, query: NostrQuery) {
if (Array.isArray(filter)) {
return filter.map((f) => ({ ...f, ...query }));
}
return { ...filter, ...query };
}
export function isFilterEqual(a: NostrRequestFilter, b: NostrRequestFilter) {
return stringify(a) === stringify(b);
}
export function mapQueryMap(queryMap: RelayQueryMap, fn: (filter: NostrRequestFilter) => NostrRequestFilter) {
const newMap: RelayQueryMap = {};
for (const [relay, filter] of Object.entries(queryMap)) newMap[relay] = fn(filter);
return newMap;
}
export function createSimpleQueryMap(relays: string[], filter: NostrRequestFilter) {
const map: RelayQueryMap = {};
for (const relay of relays) map[relay] = filter;
return map;
}

View File

@@ -14,9 +14,6 @@ export const Trackers = [
"udp://tracker.opentrackr.org:1337",
"udp://explodie.org:6969",
"udp://tracker.empire-js.us:1337",
"wss://tracker.btorrent.xyz",
"wss://tracker.openwebtorrent.com",
":wss://tracker.fastcast.nze",
];
export function getTorrentTitle(torrent: NostrEvent) {

View File

@@ -1,4 +1,5 @@
import { RelayConfig } from "../classes/relay";
import { NostrQuery, NostrRequestFilter } from "../types/nostr-query";
import { safeRelayUrl } from "./url";
export function normalizeRelayConfigs(relays: RelayConfig[]) {
@@ -12,3 +13,42 @@ export function normalizeRelayConfigs(relays: RelayConfig[]) {
return newArr;
}, [] as RelayConfig[]);
}
export function splitNostrFilterByPubkeys(
filter: NostrRequestFilter,
relayPubkeyMap: Record<string, string[]>,
): Record<string, NostrRequestFilter> {
if (Array.isArray(filter)) {
const dir: Record<string, NostrQuery[]> = {};
for (const query of filter) {
const split = splitQueryByPubkeys(query, relayPubkeyMap);
for (const [relay, splitQuery] of Object.entries(split)) {
dir[relay] = dir[relay] || [];
dir[relay].push(splitQuery);
}
}
return dir;
} else return splitQueryByPubkeys(filter, relayPubkeyMap);
}
export function splitQueryByPubkeys(query: NostrQuery, relayPubkeyMap: Record<string, string[]>) {
const filtersByRelay: Record<string, NostrQuery> = {};
const allPubkeys = new Set(Object.values(relayPubkeyMap).flat());
for (const [relay, pubkeys] of Object.entries(relayPubkeyMap)) {
if (query.authors || query["#p"]) {
filtersByRelay[relay] = {
...query,
...filtersByRelay[relay],
};
if (query.authors)
filtersByRelay[relay].authors = query.authors.filter((p) => !allPubkeys.has(p)).concat(pubkeys);
if (query["#p"]) filtersByRelay[relay]["#p"] = query["#p"].filter((p) => !allPubkeys.has(p)).concat(pubkeys);
} else filtersByRelay[relay] = query;
}
return filtersByRelay;
}

View File

@@ -7,7 +7,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader)
// if the cursor is set too far ahead and the last block did not overlap with the cursor
// we need to keep loading blocks until the timeline is complete or the blocks pass the cursor
useInterval(() => {
timeline.loadNextBlocks();
timeline.triggerBlockLoads();
}, 1000);
return useIntersectionMapCallback(
@@ -25,7 +25,7 @@ export function useTimelineCurserIntersectionCallback(timeline: TimelineLoader)
if (oldestEvent) {
timeline.setCursor(oldestEvent.created_at);
timeline.loadNextBlocks();
timeline.triggerBlockLoads();
}
},
[timeline],

View File

@@ -1,9 +1,11 @@
import { useEffect, useMemo } from "react";
import { useUnmount } from "react-use";
import { NostrRequestFilter } from "../types/nostr-query";
import timelineCacheService from "../services/timeline-cache";
import { EventFilter } from "../classes/timeline-loader";
import { NostrEvent } from "../types/nostr-event";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
type Options = {
enabled?: boolean;
@@ -16,13 +18,10 @@ export default function useTimelineLoader(key: string, relays: string[], query:
const timeline = useMemo(() => timelineCacheService.createTimeline(key), [key]);
useEffect(() => {
timeline.setQuery(query);
}, [timeline, JSON.stringify(query)]);
timeline.setQueryMap(createSimpleQueryMap(relays, query));
}, [timeline, JSON.stringify(query), relays.join("|")]);
useEffect(() => {
timeline.setRelays(relays);
}, [timeline, relays.join("|")]);
useEffect(() => {
timeline.setFilter(opts?.eventFilter);
timeline.setEventFilter(opts?.eventFilter);
}, [timeline, opts?.eventFilter]);
useEffect(() => {
if (opts?.cursor !== undefined) {
@@ -36,7 +35,6 @@ export default function useTimelineLoader(key: string, relays: string[], query:
const enabled = opts?.enabled ?? true;
useEffect(() => {
if (enabled) {
timeline.setQuery(query);
timeline.open();
} else timeline.close();
}, [timeline, enabled]);

View File

@@ -7,7 +7,7 @@ import clientRelaysService from "./client-relays";
import SuperMap from "../classes/super-map";
import { PersistentSubject } from "../classes/subject";
import accountService from "./account";
import { NostrQuery } from "../types/nostr-query";
import { createSimpleQueryMap } from "../helpers/nostr/filter";
export function getMessageRecipient(event: NostrEvent): string | undefined {
return event.tags.find(isPTag)?.[1];
@@ -19,20 +19,13 @@ class DirectMessagesService {
outgoingSub: NostrMultiSubscription;
conversations = new PersistentSubject<string[]>([]);
messages = new SuperMap<string, PersistentSubject<NostrEvent[]>>(() => new PersistentSubject<NostrEvent[]>([]));
from = dayjs().subtract(2, "day").unix();
constructor() {
this.incomingSub = new NostrMultiSubscription(
clientRelaysService.getReadUrls(),
undefined,
"incoming-direct-messages",
);
this.incomingSub = new NostrMultiSubscription("incoming-direct-messages");
this.incomingSub.onEvent.subscribe(this.receiveEvent, this);
this.outgoingSub = new NostrMultiSubscription(
clientRelaysService.getReadUrls(),
undefined,
"outgoing-direct-messages",
);
this.outgoingSub = new NostrMultiSubscription("outgoing-direct-messages");
this.outgoingSub.onEvent.subscribe(this.receiveEvent, this);
// reset the messages when the account changes
@@ -41,32 +34,36 @@ class DirectMessagesService {
this.conversations.next([]);
if (!newAccount) return;
// update subscriptions
if (this.incomingSub.query) {
this.incomingSub.setQuery({
...this.incomingSub.query,
"#p": [newAccount.pubkey],
since: dayjs().subtract(1, "day").unix(),
});
}
if (this.outgoingSub.query) {
this.outgoingSub.setQuery({
...this.outgoingSub.query,
authors: [newAccount.pubkey],
since: dayjs().subtract(1, "day").unix(),
});
}
this.updateSubscriptions();
});
// update relays when they change
clientRelaysService.readRelays.subscribe((relays) => {
const urls = relays.map((r) => r.url);
this.incomingSub.setRelays(urls);
this.outgoingSub.setRelays(urls);
clientRelaysService.readRelays.subscribe(() => {
this.updateSubscriptions();
});
}
private updateSubscriptions() {
const account = accountService.current.value;
if (!account) return;
const readRelays = clientRelaysService.getReadUrls();
this.incomingSub.setQueryMap(
createSimpleQueryMap(readRelays, {
"#p": [account.pubkey],
kinds: [Kind.EncryptedDirectMessage],
since: this.from,
}),
);
this.outgoingSub.setQueryMap(
createSimpleQueryMap(readRelays, {
authors: [account.pubkey],
kinds: [Kind.EncryptedDirectMessage],
since: this.from,
}),
);
}
receiveEvent(event: NostrEvent) {
const from = event.pubkey;
const to = getMessageRecipient(event);
@@ -91,35 +88,13 @@ class DirectMessagesService {
return this.messages.size;
}
loadDateRange(from: dayjs.ConfigType) {
loadDateRange(from: number) {
const account = accountService.current.value;
if (!account) return;
if (dayjs.unix(this.from).isBefore(this.from)) return;
if (
!Array.isArray(this.incomingSub.query) &&
this.incomingSub.query?.since &&
dayjs.unix(this.incomingSub.query.since).isBefore(from)
) {
// "since" is already set on the subscription and its older than "from"
return;
}
const incomingQuery: NostrQuery = {
kinds: [Kind.EncryptedDirectMessage],
"#p": [account.pubkey],
since: dayjs(from).unix(),
};
this.incomingSub.setQuery(incomingQuery);
const outgoingQuery: NostrQuery = {
kinds: [Kind.EncryptedDirectMessage],
authors: [account.pubkey],
since: dayjs(from).unix(),
};
this.outgoingSub.setQuery(outgoingQuery);
this.incomingSub.setRelays(clientRelaysService.getReadUrls());
this.outgoingSub.setRelays(clientRelaysService.getReadUrls());
this.from = from;
this.updateSubscriptions();
if (this.incomingSub.state !== NostrMultiSubscription.OPEN) {
this.incomingSub.open();
@@ -133,4 +108,9 @@ class DirectMessagesService {
/** @deprecated */
const directMessagesService = new DirectMessagesService();
if (import.meta.env.DEV) {
// @ts-ignore
window.directMessagesService = directMessagesService;
}
export default directMessagesService;

View File

@@ -1,114 +0,0 @@
import { RelayMode } from "../classes/relay";
import Subject, { PersistentSubject } from "../classes/subject";
import SuperMap from "../classes/super-map";
import { unique } from "../helpers/array";
import accountService from "./account";
import clientRelaysService from "./client-relays";
import relayScoreboardService from "./relay-scoreboard";
import userContactsService, { UserContacts } from "./user-contacts";
import userRelaysService, { ParsedUserRelays } from "./user-relays";
type pubkey = string;
type relay = string;
class PubkeyRelayAssignmentService {
pubkeys = new Map<pubkey, relay[]>();
pubkeyRelays = new SuperMap<string, Subject<ParsedUserRelays>>(() => new Subject());
assignments = new PersistentSubject<Record<pubkey, relay[]>>({});
constructor() {
let sub: Subject<UserContacts>;
accountService.current.subscribe((account) => {
if (sub) {
sub.unsubscribe(this.handleUserContacts, this);
}
if (account) {
this.pubkeys.clear();
this.pubkeyRelays.clear();
const contactsSub = userContactsService.requestContacts(account.pubkey, account.relays ?? []);
contactsSub.subscribe(this.handleUserContacts, this);
sub = contactsSub;
}
});
}
private handleUserContacts(contacts: UserContacts) {
for (const pubkey of contacts.contacts) {
const relay = contacts.contactRelay[pubkey];
pubkeyRelayAssignmentService.addPubkey(pubkey, relay ? [relay] : []);
}
}
addPubkey(pubkey: string, relays: string[] = []) {
if (this.pubkeys.has(pubkey)) return;
this.pubkeys.set(pubkey, relays);
const readRelays = clientRelaysService.getReadUrls();
const subject = userRelaysService.requestRelays(pubkey, unique([...readRelays, ...relays]));
this.pubkeyRelays.set(pubkey, subject);
// subject.subscribe(this.updateAssignments, this);
}
removePubkey(pubkey: string) {
if (!this.pubkeys.has(pubkey)) return;
this.pubkeys.delete(pubkey);
this.pubkeyRelays.delete(pubkey);
}
updateAssignments() {
const allRelays = new Set<relay>();
for (const [pubkey, userRelays] of this.pubkeyRelays) {
if (!userRelays.value) continue;
for (const relayConfig of userRelays.value.relays) {
// only use relays the users are writing to
if (relayConfig.mode & RelayMode.WRITE) {
allRelays.add(relayConfig.url);
}
}
}
const relayScores = new Map<relay, number>();
for (const relay of allRelays) {
relayScores.set(relay, relayScoreboardService.getRelayScore(relay));
}
const readRelays = clientRelaysService.getReadUrls();
const assignments: Record<pubkey, relay[]> = {};
for (const [pubkey] of this.pubkeys) {
let userRelays =
this.pubkeyRelays
.get(pubkey)
.value?.relays.filter((r) => r.mode & RelayMode.WRITE)
.map((r) => r.url) ?? [];
if (userRelays.length === 0) userRelays = Array.from(readRelays);
const rankedOptions = Array.from(userRelays).sort(
(a, b) => (relayScores.get(b) ?? 0) - (relayScores.get(a) ?? 0),
);
assignments[pubkey] = rankedOptions.slice(0, 3);
for (const relay of assignments[pubkey]) {
relayScores.set(relay, (relayScores.get(relay) ?? 0) + 1);
}
}
this.assignments.next(assignments);
}
}
const pubkeyRelayAssignmentService = new PubkeyRelayAssignmentService();
setInterval(() => {
pubkeyRelayAssignmentService.updateAssignments();
}, 1000 * 5);
if (import.meta.env.DEV) {
//@ts-ignore
window.pubkeyRelayAssignmentService = pubkeyRelayAssignmentService;
}
export default pubkeyRelayAssignmentService;

View File

@@ -28,3 +28,5 @@ export type NostrQuery = {
};
export type NostrRequestFilter = NostrQuery | NostrQuery[];
export type RelayQueryMap = Record<string, NostrRequestFilter>;

View File

@@ -47,7 +47,7 @@ function ContactCard({ pubkey }: { pubkey: string }) {
}
function DirectMessagesPage() {
const [from, setFrom] = useState(dayjs().subtract(2, "days"));
const [from, setFrom] = useState(dayjs().subtract(2, "days").unix());
const conversations = useSubject(directMessagesService.conversations);
useEffect(() => directMessagesService.loadDateRange(from), [from]);
@@ -55,7 +55,7 @@ function DirectMessagesPage() {
const [loading, setLoading] = useState(false);
const loadMore = () => {
setLoading(true);
setFrom((date) => dayjs(date).subtract(2, "days"));
setFrom((date) => dayjs(date).subtract(2, "days").unix());
setTimeout(() => {
setLoading(false);
}, 1000);

View File

@@ -84,7 +84,7 @@ function TorrentsPage() {
);
const query = useMemo(
() => (tags.length > 0 ? { ...filter, kinds: [TORRENT_KIND], "#t": tags } : { ...filter, kinds: [TORRENT_KIND] }),
[tags.join(",")],
[tags.join(","), filter],
);
const timeline = useTimelineLoader(`${listId}-torrents`, relays, query, { eventFilter, enabled: !!filter });