diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 5400256..c63d3f3 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -47,8 +47,11 @@ Running `nostream` for the first time creates the settings file in ` export type UnidentifiedEvent = Omit diff --git a/src/@types/messages.ts b/src/@types/messages.ts index 09e8646..63f24b6 100644 --- a/src/@types/messages.ts +++ b/src/@types/messages.ts @@ -1,7 +1,7 @@ -import { ContextMetadata, EventId, Range } from './base' +import { ContextMetadata, EventId, Range, Secret } from './base' +import { Event, RelayedEvent } from './event' import { SubscriptionFilter, SubscriptionId } from './subscription' import { ContextMetadataKey } from '../constants/base' -import { Event } from './event' export enum MessageType { REQ = 'REQ', @@ -36,9 +36,12 @@ export type SubscribeMessage = { export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event] +export type IncomingRelayedEventMessage = [MessageType.EVENT, RelayedEvent, Secret] + export interface EventMessage { 0: MessageType.EVENT 1: Event + 2?: Secret } export interface OutgoingEventMessage { diff --git a/src/@types/settings.ts b/src/@types/settings.ts index cd19855..818e596 100644 --- a/src/@types/settings.ts +++ b/src/@types/settings.ts @@ -1,6 +1,7 @@ +import { Pubkey, Secret } from './base' import { EventKinds } from '../constants/base' import { MessageType } from './messages' -import { Pubkey } from './base' +import { SubscriptionFilter } from './subscription' export interface Info { relay_url: string @@ -151,6 +152,24 @@ export interface PaymentsProcessors { zebedee?: ZebedeePaymentsProcessor } +export interface Local { + secret: Secret +} + +export interface Remote { + secret: Secret +} + +export interface Mirror { + address: string + filters?: SubscriptionFilter[] + secret?: Secret +} + +export interface Mirroring { + static?: Mirror[] +} + export interface Settings { info: Info payments?: Payments @@ -158,4 +177,5 @@ export interface Settings { network: Network workers?: Worker limits?: Limits + mirroring?: Mirroring } diff --git a/src/app/app.ts b/src/app/app.ts index bd289d4..5715c79 100644 --- a/src/app/app.ts +++ b/src/app/app.ts @@ -14,7 +14,7 @@ import { SettingsStatic } from '../utils/settings' const debug = createLogger('app-primary') export class App implements IRunnable { - private workers: WeakMap + private workers: WeakMap> private watchers: FSWatcher[] | undefined public constructor( @@ -66,18 +66,32 @@ export class App implements IRunnable { ? Number(process.env.WORKER_COUNT) : this.settings().workers?.count || cpus().length - for (let i = 0; i < workerCount; i++) { - debug('starting worker') - const worker = this.cluster.fork({ - WORKER_TYPE: 'worker', - }) - this.workers.set(worker, 'worker') + const createWorker = (env: Record) => { + const worker = this.cluster.fork(env) + this.workers.set(worker, env) } - const worker = this.cluster.fork({ + for (let i = 0; i < workerCount; i++) { + debug('starting worker') + createWorker({ + WORKER_TYPE: 'worker', + }) + } + + createWorker({ WORKER_TYPE: 'maintenance', }) - this.workers.set(worker, 'maintenance') + + const mirrors = settings?.mirroring?.static + + if (Array.isArray(mirrors) && mirrors.length) { + for (let i = 0; i < mirrors.length; i++) { + createWorker({ + WORKER_TYPE: 'static-mirroring', + MIRROR_INDEX: i.toString(), + }) + } + } logCentered(`${workerCount} workers started`, width) @@ -111,14 +125,13 @@ export class App implements IRunnable { } setTimeout(() => { debug('starting worker') - const workerType = this.workers.get(deadWorker) - if (!workerType) { + const workerEnv = this.workers.get(deadWorker) + if (!workerEnv) { throw new Error('Mistakes were made') } - const newWorker = this.cluster.fork({ - WORKER_TYPE: workerType, - }) - this.workers.set(newWorker, workerType) + const newWorker = this.cluster.fork(workerEnv) + this.workers.set(newWorker, workerEnv) + debug('started worker %s', newWorker.process.pid) }, 10000) } diff --git a/src/app/static-mirroring-worker.ts b/src/app/static-mirroring-worker.ts new file mode 100644 index 0000000..1a2993a --- /dev/null +++ b/src/app/static-mirroring-worker.ts @@ -0,0 +1,149 @@ +import { anyPass, map, path } from 'ramda' +import { RawData, WebSocket } from 'ws' +import cluster from 'cluster' +import { randomUUID } from 'crypto' + +import { createRelayedEventMessage, createSubscriptionMessage } from '../utils/messages' +import { isEventIdValid, isEventMatchingFilter, isEventSignatureValid } from '../utils/event' +import { Mirror, Settings } from '../@types/settings' +import { createLogger } from '../factories/logger-factory' +import { IRunnable } from '../@types/base' +import { OutgoingEventMessage } from '../@types/messages' +import { WebSocketServerAdapterEvent } from '../constants/adapter' + +const debug = createLogger('static-mirror-worker') + +export class StaticMirroringWorker implements IRunnable { + private client: WebSocket | undefined + private config: Mirror + + public constructor( + private readonly process: NodeJS.Process, + private readonly settings: () => Settings, + ) { + this.process + .on('message', this.onMessage.bind(this)) + .on('SIGINT', this.onExit.bind(this)) + .on('SIGHUP', this.onExit.bind(this)) + .on('SIGTERM', this.onExit.bind(this)) + .on('uncaughtException', this.onError.bind(this)) + .on('unhandledRejection', this.onError.bind(this)) + } + + public run(): void { + const currentSettings = this.settings() + + console.log('mirroring', currentSettings.mirroring) + + this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror + + let since = Math.floor(Date.now() / 1000) - 60*10 + + const createMirror = (config: Mirror) => { + const subscriptionId = `mirror-${randomUUID()}` + + debug('connecting to %s', config.address) + + return new WebSocket(config.address, { timeout: 5000 }) + .on('open', function () { + debug('connected to %s', config.address) + + if (Array.isArray(config.filters) && config.filters?.length) { + const filters = config.filters.map((filter) => ({ ...filter, since })) + + debug('subscribing with %s: %o', subscriptionId, filters) + + this.send(JSON.stringify(createSubscriptionMessage(subscriptionId, filters))) + } + }) + .on('message', async function (raw: RawData) { + try { + const message = JSON.parse(raw.toString('utf8')) as OutgoingEventMessage + debug('received: %o', message) + + if (!Array.isArray(message)) { + return + } + + if (message[0] !== 'EVENT' || message[1] !== subscriptionId) { + return + } + + const event = message[2] + + if (!anyPass(map(isEventMatchingFilter, config.filters))(event)) { + return + } + + if (!await isEventIdValid(event) || !await isEventSignatureValid(event)) { + return + } + + since = Math.floor(Date.now()) - 30 + + if (cluster.isWorker && typeof process.send === 'function') { + process.send({ + eventName: WebSocketServerAdapterEvent.Broadcast, + event, + source: config.address, + }) + } + } catch (error) { + debug('unable to process message: %o', error) + } + }) + .on('close', (code, reason) => { + debug(`disconnected (${code}): ${reason.toString()}`) + + setTimeout(() => { + this.client.removeAllListeners() + this.client = createMirror(config) + }, 5000) + }) + .on('error', function (error) { + debug('connection error: %o', error) + }) + } + + this.client = createMirror(this.config) + } + + private onMessage(message: { eventName: string, event: unknown, source: string }): void { + if ( + message.eventName !== WebSocketServerAdapterEvent.Broadcast + || message.source === this.config.address + || !this.client + || this.client.readyState !== WebSocket.OPEN + ) { + return + } + + debug('received broadcast: %o', message.event) + + const eventToRelay = createRelayedEventMessage(message.event as any, this.config.secret) + debug('relaying: %o', eventToRelay) + this.client.send(JSON.stringify(eventToRelay)) + } + + private onError(error: Error) { + debug('error: %o', error) + throw error + } + + private onExit() { + debug('exiting') + this.close(() => { + this.process.exit(0) + }) + } + + public close(callback?: () => void) { + debug('closing') + if (this.client) { + this.client.terminate() + } + if (typeof callback === 'function') { + callback() + } + } +} diff --git a/src/factories/static-mirroring.worker-factory.ts b/src/factories/static-mirroring.worker-factory.ts new file mode 100644 index 0000000..9990814 --- /dev/null +++ b/src/factories/static-mirroring.worker-factory.ts @@ -0,0 +1,6 @@ +import { createSettings } from './settings-factory' +import { StaticMirroringWorker } from '../app/static-mirroring-worker' + +export const staticMirroringWorkerFactory = () => { + return new StaticMirroringWorker(process, createSettings) +} diff --git a/src/index.ts b/src/index.ts index 0e072d8..7d8ef98 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,7 @@ dotenv.config() import { appFactory } from './factories/app-factory' import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory' +import { staticMirroringWorkerFactory } from './factories/static-mirroring.worker-factory' import { workerFactory } from './factories/worker-factory' export const getRunner = () => { @@ -15,6 +16,8 @@ export const getRunner = () => { return workerFactory() case 'maintenance': return maintenanceWorkerFactory() + case 'static-mirroring': + return staticMirroringWorkerFactory() default: throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`) } diff --git a/src/utils/messages.ts b/src/utils/messages.ts index 6f5e8af..a0971e2 100644 --- a/src/utils/messages.ts +++ b/src/utils/messages.ts @@ -1,12 +1,15 @@ import { EndOfStoredEventsNotice, + IncomingEventMessage, + IncomingRelayedEventMessage, MessageType, NoticeMessage, OutgoingMessage, + SubscribeMessage, } from '../@types/messages' -import { Event } from '../@types/event' +import { Event, RelayedEvent } from '../@types/event' +import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' import { EventId } from '../@types/base' -import { SubscriptionId } from '../@types/subscription' export const createNoticeMessage = (notice: string): NoticeMessage => { return [MessageType.NOTICE, notice] @@ -30,3 +33,19 @@ export const createEndOfStoredEventsNoticeMessage = ( export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => { return [MessageType.OK, eventId, successful, message] } + +export const createSubscriptionMessage = ( + subscriptionId: SubscriptionId, + filters: SubscriptionFilter[] +): SubscribeMessage => { + return [MessageType.REQ, subscriptionId, ...filters] as any +} + +export const createRelayedEventMessage = + (event: RelayedEvent, secret?: string): IncomingRelayedEventMessage | IncomingEventMessage => { + if (!secret) { + return [MessageType.EVENT, event] + } + + return [MessageType.EVENT, event, secret] + }