use rxjs for observable streams

This commit is contained in:
hzrd149 2023-02-07 17:04:18 -06:00
parent d7832cb893
commit 919e04b620
16 changed files with 322 additions and 149 deletions

View File

@ -18,12 +18,14 @@
"@emotion/styled": "^11.10.5",
"framer-motion": "^7.10.3",
"idb": "^7.1.1",
"moment": "^2.29.4",
"noble-secp256k1": "^1.2.14",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-error-boundary": "^3.1.4",
"react-markdown": "^8.0.4",
"react-router-dom": "^6.5.0",
"react-use": "^17.4.0"
"react-use": "^17.4.0",
"rxjs": "^7.8.0"
}
}

View File

@ -0,0 +1,89 @@
import {
Button,
Modal,
ModalOverlay,
ModalContent,
ModalHeader,
ModalFooter,
ModalBody,
ModalCloseButton,
Table,
Thead,
Tbody,
Tfoot,
Tr,
Th,
Td,
TableCaption,
TableContainer,
useDisclosure,
Badge,
} from "@chakra-ui/react";
import React, { useState } from "react";
import { useInterval } from "react-use";
import relayPool from "../services/relays/relay-pool";
const getRelayStatusText = (relay) => {
if (relay.connecting) return "Connecting...";
if (relay.connected) return "Connected";
if (relay.closing) return "Disconnecting...";
if (relay.closed) return "Disconnected";
};
export const ConnectedRelays = () => {
const { isOpen, onOpen, onClose } = useDisclosure();
const [relays, setRelays] = useState([]);
useInterval(() => {
setRelays(relayPool.getRelays());
}, 1000);
const connected = relays.filter((relay) => relay.okay);
const disconnected = relays.filter((relay) => !relay.okay);
return (
<>
<Button variant="link" onClick={onOpen}>
{connected.length}/{relays.length} of relays connected
</Button>
<Modal isOpen={isOpen} onClose={onClose} size="4xl">
<ModalOverlay />
<ModalContent>
<ModalHeader>Modal Title</ModalHeader>
<ModalCloseButton />
<ModalBody>
<TableContainer>
<Table variant="simple">
<Thead>
<Tr>
<Th>Url</Th>
<Th>status</Th>
</Tr>
</Thead>
<Tbody>
{relays.map((relay) => (
<Tr keyy={relay.url}>
<Td>{relay.url}</Td>
<Td>
<Badge colorScheme={relay.okay ? "green" : "red"}>
{getRelayStatusText(relay)}
</Badge>
</Td>
</Tr>
))}
</Tbody>
</Table>
</TableContainer>
</ModalBody>
<ModalFooter>
<Button colorScheme="blue" mr={3} onClick={onClose}>
Close
</Button>
</ModalFooter>
</ModalContent>
</Modal>
</>
);
};

View File

@ -2,6 +2,7 @@ import React from "react";
import { Box, Button, Container, HStack, VStack } from "@chakra-ui/react";
import { useNavigate } from "react-router-dom";
import { ErrorBoundary } from "./error-boundary";
import { ConnectedRelays } from "./connected-relays";
export const Page = ({ children }) => {
const navigate = useNavigate();
@ -13,6 +14,7 @@ export const Page = ({ children }) => {
<Button onClick={() => navigate("/")}>Home</Button>
<Button onClick={() => navigate("/global")}>Global</Button>
<Button onClick={() => navigate("/settings")}>Settings</Button>
<ConnectedRelays />
</VStack>
<Box flexGrow={1} overflow="auto">
<ErrorBoundary>{children}</ErrorBoundary>

View File

@ -10,6 +10,7 @@ import {
} from "@chakra-ui/react";
import ReactMarkdown from "react-markdown";
import { Link } from "react-router-dom";
import moment from "moment";
import { PostModal } from "./post-modal";
export const Post = ({ event }) => {
@ -22,7 +23,8 @@ export const Post = ({ event }) => {
<CardBody>
<VStack alignItems="flex-start" justifyContent="stretch">
<Heading size="sm">
<Link to={`/user/${event.pubkey}`}>{event.pubkey}</Link>
<Link to={`/user/${event.pubkey}`}>{event.pubkey}</Link>{" "}
<span>{moment(event.created_at * 1000).fromNow()}</span>
</Heading>
<Box maxHeight="10rem" overflow="hidden" width="100%">
<ReactMarkdown>{event.content}</ReactMarkdown>

View File

@ -1,30 +0,0 @@
export class Signal {
listeners = [];
connections = new Set();
emit(event) {
for (const [fn, ctx] of this.listeners) {
if (ctx) {
fn.apply(ctx, [event]);
} else fn(event);
}
for (const signal of this.connections) {
signal.emit(event);
}
}
addListener(fn, ctx) {
this.listeners.push([fn, ctx]);
}
removeListener(fn, ctx) {
this.listeners = this.listeners.filter(
(listener) => listener.fn !== fn && listener.ctx !== ctx
);
}
addConnection(signal) {
this.connections.add(signal);
}
removeConnection(signal) {
this.connections.delete(signal);
}
}

View File

@ -1,10 +0,0 @@
import { useCallback, useEffect } from "react";
export function useSignal(signal, fn, watch = []) {
const listener = useCallback(fn, watch);
useEffect(() => {
if (!signal) return;
signal.addListener(listener);
return () => signal.removeListener(listener);
}, [signal, listener]);
}

View File

@ -2,9 +2,9 @@ import { useRef } from "react";
import { useDeepCompareEffect, useMount, useUnmount } from "react-use";
import { Subscription } from "../services/subscriptions";
export function useSubscription(urls, query, watch = []) {
export function useSubscription(urls, query, name) {
const sub = useRef(null);
sub.current = sub.current || new Subscription(urls, query);
sub.current = sub.current || new Subscription(urls, query, name);
useMount(() => {
if (sub.current) sub.current.open();

View File

@ -4,6 +4,9 @@ export class RelayPool {
relays = new Map();
relayClaims = new Map();
getRelays() {
return Array.from(this.relays.values());
}
getRelayClaims(url) {
if (!this.relayClaims.has(url)) {
this.relayClaims.set(url, new Set());

View File

@ -1,21 +1,33 @@
import { Signal } from "../../helpers/signal";
import { Subject } from "rxjs";
export class Relay {
constructor(url) {
this.url = url;
this.onOpen = new Signal();
this.onClose = new Signal();
this.onEvent = new Signal();
this.onNotice = new Signal();
this.onOpen = new Subject();
this.onClose = new Subject();
this.onEvent = new Subject();
this.onNotice = new Subject();
}
open() {
if (this.okay) return;
this.ws = new WebSocket(this.url);
this.ws.onopen = this.handleOpen.bind(this);
this.ws.onclose = this.handleClose.bind(this);
this.ws.onopen = () => {
this.onOpen.next(this);
if (import.meta.env.DEV) {
console.info(`Relay ${this.url} opened`);
}
};
this.ws.onclose = () => {
this.onClose.next(this);
if (import.meta.env.DEV) {
console.info(`Relay ${this.url} closed`);
}
};
this.ws.onmessage = this.handleMessage.bind(this);
}
send(json) {
@ -36,40 +48,35 @@ export class Relay {
get connecting() {
return this.ws?.readyState === WebSocket.CONNECTING;
}
get closing() {
return this.ws?.readyState === WebSocket.CLOSING;
}
get closed() {
return this.ws?.readyState === WebSocket.CLOSED;
}
get state() {
return this.ws?.readyState;
}
handleMessage(event) {
// skip empty events
if (!event.data) return;
try {
const data = JSON.parse(event.data);
const type = data[0];
switch (type) {
case "EVENT":
this.onEvent.emit({ type, subId: data[1], body: data[2] }, this);
this.onEvent.next({ type, subId: data[1], body: data[2] }, this);
break;
case "NOTICE":
this.onNotice.emit({ type, message: data[1] }, this);
this.onNotice.next({ type, message: data[1] }, this);
break;
}
} catch (e) {
console.log(`Failed to parse event from ${this.url}`);
console.log(event);
}
}
handleOpen() {
this.onOpen.emit(this);
if (import.meta.env.DEV) {
console.info(`Relay ${this.url} opened`);
}
}
handleClose() {
this.onClose.emit(this);
if (import.meta.env.DEV) {
console.info(`Relay ${this.url} closed`);
console.log(event.data);
}
}
}

View File

@ -1,21 +1,22 @@
import { Signal } from "../helpers/signal";
import { Subject } from "rxjs";
import relayPool from "./relays/relay-pool";
import settingsService from "./settings";
export class Subscription {
static OPEN = "open";
static IDLE = "open";
static CLOSED = "closed";
constructor(relayUrls, query) {
state = Subscription.CLOSED;
onEvent = new Subject();
cleanup = [];
constructor(relayUrls, query, name) {
this.id = String(Math.floor(Math.random() * 1000000));
this.query = query;
this.name = name;
this.relayUrls = relayUrls;
this.status = Subscription.IDLE;
this.relays = relayUrls.map((url) => relayPool.requestRelay(url));
this.onEvent = new Signal();
}
handleOpen(relay) {
// when the relay connects send the req event
@ -23,7 +24,7 @@ export class Subscription {
}
handleEvent(event) {
if (event.subId === this.id) {
this.onEvent.emit(event.body);
this.onEvent.next(event.body);
}
}
send(message) {
@ -41,12 +42,13 @@ export class Subscription {
}
}
open() {
if (this.state === Subscription.OPEN || !this.query) return;
this.state = Subscription.OPEN;
this.send(["REQ", this.id, this.query]);
for (const relay of this.relays) {
relay.onEvent.addListener(this.handleEvent, this);
relay.onOpen.addListener(this.handleOpen, this);
this.cleanup.push(relay.onEvent.subscribe(this.handleEvent.bind(this)));
this.cleanup.push(relay.onOpen.subscribe(this.handleOpen.bind(this)));
}
for (const url of this.relayUrls) {
@ -54,24 +56,22 @@ export class Subscription {
}
if (import.meta.env.DEV) {
console.info(`Subscription ${this.id} opened`);
console.info(`Subscription ${this.name || this.id} opened`);
}
}
close() {
this.status = Subscription.CLOSED;
if (this.state === Subscription.CLOSED) return;
this.state = Subscription.CLOSED;
this.send(["CLOSE", this.id]);
for (const relay of this.relays) {
relay.onEvent.removeListener(this.handleEvent, this);
relay.onOpen.removeListener(this.handleOpen, this);
}
this.cleanup.forEach((sub) => sub.unsubscribe());
for (const url of this.relayUrls) {
relayPool.removeClaim(url, this);
}
if (import.meta.env.DEV) {
console.info(`Subscription ${this.id} closed`);
console.info(`Subscription ${this.name || this.id} closed`);
}
}
}

View File

@ -0,0 +1,75 @@
import { BehaviorSubject, Subject } from "rxjs";
import settingsService from "./settings";
import { Subscription } from "./subscriptions";
function waitOnSignal(signal) {
return new Promise((res) => {
const listener = (event) => {
signal.removeListener(listener);
res(event);
};
signal.addListener(listener);
});
}
class UserMetadata {
requests = new Map();
constructor(relayUrls = []) {
this.subscription = new Subscription(relayUrls, null, "user-metadata");
this.subscription.onEvent.subscribe((event) => {
try {
const metadata = JSON.parse(event.content);
this.requests.get(event.pubkey)?.next(metadata);
} catch (e) {}
});
setInterval(() => {
this.pruneRequests();
}, 1000 * 10);
}
requestUserMetadata(pubkey) {
if (!this.requests.has(pubkey)) {
this.requests.set(pubkey, new BehaviorSubject(null));
this.updateSubscription();
}
return this.requests.get(pubkey);
}
updateSubscription() {
const pubkeys = Array.from(this.requests.keys());
if (pubkeys.length === 0) {
this.subscription.close();
} else {
this.subscription.setQuery({ authors: pubkeys, kinds: [0] });
if (this.subscription.state === Subscription.CLOSED) {
this.subscription.open();
}
}
}
pruneRequests() {
let removed = false;
const requests = Array.from(this.requests.entries());
for (const [pubkey, subject] of requests) {
if (!subject.observed) {
subject.complete();
this.requests.delete(pubkey);
removed = true;
}
}
if (removed) this.updateSubscription();
}
}
const userMetadata = new UserMetadata(await settingsService.getRelays());
if (import.meta.env.DEV) {
window.userMetadata = userMetadata;
}
export default userMetadata;

View File

@ -1,28 +1,35 @@
import React, { useState } from "react";
import React, { useEffect, useState } from "react";
import { SkeletonText } from "@chakra-ui/react";
import { useSubscription } from "../../helpers/use-subscription";
import { useSignal } from "../../hooks/use-signal";
import { useSubscription } from "../../hooks/use-subscription";
import { useRelays } from "../../providers/relay-provider";
import { Post } from "../../components/post";
import moment from "moment/moment";
export const GlobalView = () => {
const { relays } = useRelays();
const [events, setEvents] = useState({});
const sub = useSubscription(relays, { kinds: [1], limit: 10 }, []);
useSignal(
sub?.onEvent,
(event) => {
if (event.kind === 1) {
setEvents((dir) => ({ [event.id]: event, ...dir }));
}
},
[setEvents]
const sub = useSubscription(
relays,
{ kinds: [1], limit: 10, since: moment().startOf("day").valueOf() / 1000 },
"global-events"
);
useEffect(() => {
const s = sub.onEvent.subscribe((event) => {
setEvents((dir) => {
if (!dir[event.id]) {
return { [event.id]: event, ...dir };
}
return dir;
});
});
return () => s.unsubscribe();
}, [sub]);
const timeline = Object.values(events).sort(
(a, b) => a.created_at - b.created_at
(a, b) => b.created_at - a.created_at
);
if (timeline.length === 0) {

View File

@ -1,4 +1,4 @@
import React from "react";
import React, { useMemo } from "react";
import {
Heading,
Tab,
@ -9,6 +9,8 @@ import {
} from "@chakra-ui/react";
import { useParams } from "react-router-dom";
import { UserPostsTab } from "./posts";
import { useObservable } from "react-use";
import userMetadata from "../../services/user-metadata";
export const UserView = () => {
const params = useParams();
@ -18,9 +20,15 @@ export const UserView = () => {
throw new Error("No pubkey");
}
const observable = useMemo(
() => userMetadata.requestUserMetadata(params.pubkey),
[params.pubkey]
);
const metadata = useObservable(observable);
return (
<>
<Heading>{params.pubkey}</Heading>
<Heading>{metadata?.name ?? params.pubkey}</Heading>
<Tabs>
<TabList>
<Tab>Posts</Tab>

View File

@ -1,8 +1,7 @@
import React, { useState } from "react";
import React, { useEffect, useState } from "react";
import { SkeletonText } from "@chakra-ui/react";
import settingsService from "../../services/settings";
import { useSignal } from "../../hooks/use-signal";
import { useSubscription } from "../../helpers/use-subscription";
import { useSubscription } from "../../hooks/use-subscription";
import { Post } from "../../components/post";
const relayUrls = await settingsService.getRelays();
@ -10,20 +9,27 @@ const relayUrls = await settingsService.getRelays();
export const UserPostsTab = ({ pubkey }) => {
const [events, setEvents] = useState({});
const sub = useSubscription(relayUrls, { authors: [pubkey] }, [pubkey]);
useSignal(
sub?.onEvent,
(event) => {
if (event.kind === 1) {
setEvents((dir) => ({ [event.id]: event, ...dir }));
}
},
[setEvents]
const sub = useSubscription(
relayUrls,
{ authors: [pubkey], kinds: [1] },
`${pubkey} posts`
);
useEffect(() => {
const s = sub.onEvent.subscribe((event) => {
setEvents((dir) => {
if (!dir[event.id]) {
return { [event.id]: event, ...dir };
}
return dir;
});
});
return () => s.unsubscribe();
}, [sub]);
const timeline = Object.values(events).sort(
(a, b) => a.created_at - b.created_at
(a, b) => b.created_at - a.created_at
);
if (timeline.length === 0) {

View File

@ -1,41 +1,41 @@
import React, { useEffect, useState } from "react";
import { Card, CardBody, SkeletonText } from "@chakra-ui/react";
import ReactMarkdown from "react-markdown";
import { onEvent, subscribeToAuthor } from "../../services/relays";
import { useSignal } from "../../hooks/use-signal";
// import React, { useEffect, useState } from "react";
// import { Card, CardBody, SkeletonText } from "@chakra-ui/react";
// import ReactMarkdown from "react-markdown";
// import { onEvent, subscribeToAuthor } from "../../services/relays";
// import { useSignal } from "../../hooks/use-signal";
export const UserRelaysTab = ({ pubkey }) => {
const [events, setEvents] = useState({});
// export const UserRelaysTab = ({ pubkey }) => {
// const [events, setEvents] = useState({});
useEffect(() => {
if (pubkey) {
subscribeToAuthor(pubkey);
}
}, [pubkey]);
// useEffect(() => {
// if (pubkey) {
// subscribeToAuthor(pubkey);
// }
// }, [pubkey]);
useSignal(
onEvent,
(event) => {
if (event.body.kind === 1) {
setEvents((dir) => ({ [event.body.id]: event.body, ...dir }));
}
},
[setEvents]
);
// useSignal(
// onEvent,
// (event) => {
// if (event.body.kind === 1) {
// setEvents((dir) => ({ [event.body.id]: event.body, ...dir }));
// }
// },
// [setEvents]
// );
const timeline = Object.values(events).sort(
(a, b) => a.created_at - b.created_at
);
// const timeline = Object.values(events).sort(
// (a, b) => a.created_at - b.created_at
// );
if (timeline.length === 0) {
return <SkeletonText />;
}
// if (timeline.length === 0) {
// return <SkeletonText />;
// }
return timeline.map((event) => (
<Card key={event.id}>
<CardBody>
<ReactMarkdown>{event.content}</ReactMarkdown>
</CardBody>
</Card>
));
};
// return timeline.map((event) => (
// <Card key={event.id}>
// <CardBody>
// <ReactMarkdown>{event.content}</ReactMarkdown>
// </CardBody>
// </Card>
// ));
// };

View File

@ -3508,6 +3508,11 @@ minimatch@^5.0.1:
dependencies:
brace-expansion "^2.0.1"
moment@^2.29.4:
version "2.29.4"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.4.tgz#3dbe052889fe7c1b2ed966fcb3a77328964ef108"
integrity sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w==
mri@^1.1.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/mri/-/mri-1.2.0.tgz#6721480fec2a11a4889861115a48b6cbe7cc8f0b"
@ -3964,6 +3969,13 @@ run-parallel@^1.1.9:
dependencies:
queue-microtask "^1.2.2"
rxjs@^7.8.0:
version "7.8.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-7.8.0.tgz#90a938862a82888ff4c7359811a595e14e1e09a4"
integrity sha512-F2+gxDshqmIub1KdvZkaEfGDwLNpPvk9Fs6LD/MyQxNgMds/WH9OdDDXOmxUZpME+iSK3rQCctkL0DYyytUqMg==
dependencies:
tslib "^2.1.0"
sade@^1.7.3:
version "1.8.1"
resolved "https://registry.yarnpkg.com/sade/-/sade-1.8.1.tgz#0a78e81d658d394887be57d2a409bf703a3b2701"