feat: implement static mirroring

Signed-off-by: Ricardo Arturo Cabral Mejía <me@ricardocabral.io>
This commit is contained in:
Ricardo Arturo Cabral Mejía
2023-01-29 01:06:12 -05:00
parent 282f7db0a1
commit cd4c60a139
11 changed files with 248 additions and 23 deletions

View File

@ -47,8 +47,11 @@ Running `nostream` for the first time creates the settings file in `<project_roo
| info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) | | info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) |
| info.pubkey | Relay operator's Nostr pubkey in hex format. | | info.pubkey | Relay operator's Nostr pubkey in hex format. |
| info.contact | Relay operator's contact. (e.g. mailto:operator@relay-your-domain.com) | | info.contact | Relay operator's contact. (e.g. mailto:operator@relay-your-domain.com) |
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame | | network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. | | network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) |
| mirroring.static[].filters | Subscription filters used to mirror. |
| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. |
| workers.count | Number of workers to spin up to handle incoming connections. | | workers.count | Number of workers to spin up to handle incoming connections. |
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. | | | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
| limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. | | limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. |

View File

@ -35,6 +35,8 @@ network:
idleTimeout: 60 idleTimeout: 60
workers: workers:
count: 0 count: 0
mirroring:
static: []
limits: limits:
invoice: invoice:
rateLimits: rateLimits:

View File

@ -7,6 +7,8 @@ export type TagName = string
export type Signature = string export type Signature = string
export type Tag = TagBase & string[] export type Tag = TagBase & string[]
export type Secret = string
export interface TagBase { export interface TagBase {
0: TagName 0: TagName
[index: number]: string [index: number]: string

View File

@ -1,7 +1,7 @@
import { ContextMetadata, EventId, Pubkey, Tag } from './base' import { ContextMetadata, EventId, Pubkey, Tag } from './base'
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base' import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base'
export interface Event { export interface BaseEvent {
id: EventId id: EventId
pubkey: Pubkey pubkey: Pubkey
created_at: number created_at: number
@ -9,9 +9,14 @@ export interface Event {
tags: Tag[] tags: Tag[]
sig: string sig: string
content: string content: string
}
export interface Event extends BaseEvent {
[ContextMetadataKey]?: ContextMetadata [ContextMetadataKey]?: ContextMetadata
} }
export type RelayedEvent = Event
export type UnsignedEvent = Omit<Event, 'sig'> export type UnsignedEvent = Omit<Event, 'sig'>
export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'> export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'>

View File

@ -1,7 +1,7 @@
import { ContextMetadata, EventId, Range } from './base' import { ContextMetadata, EventId, Range, Secret } from './base'
import { Event, RelayedEvent } from './event'
import { SubscriptionFilter, SubscriptionId } from './subscription' import { SubscriptionFilter, SubscriptionId } from './subscription'
import { ContextMetadataKey } from '../constants/base' import { ContextMetadataKey } from '../constants/base'
import { Event } from './event'
export enum MessageType { export enum MessageType {
REQ = 'REQ', REQ = 'REQ',
@ -36,9 +36,12 @@ export type SubscribeMessage = {
export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event] export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event]
export type IncomingRelayedEventMessage = [MessageType.EVENT, RelayedEvent, Secret]
export interface EventMessage { export interface EventMessage {
0: MessageType.EVENT 0: MessageType.EVENT
1: Event 1: Event
2?: Secret
} }
export interface OutgoingEventMessage { export interface OutgoingEventMessage {

View File

@ -1,6 +1,7 @@
import { Pubkey, Secret } from './base'
import { EventKinds } from '../constants/base' import { EventKinds } from '../constants/base'
import { MessageType } from './messages' import { MessageType } from './messages'
import { Pubkey } from './base' import { SubscriptionFilter } from './subscription'
export interface Info { export interface Info {
relay_url: string relay_url: string
@ -151,6 +152,24 @@ export interface PaymentsProcessors {
zebedee?: ZebedeePaymentsProcessor zebedee?: ZebedeePaymentsProcessor
} }
export interface Local {
secret: Secret
}
export interface Remote {
secret: Secret
}
export interface Mirror {
address: string
filters?: SubscriptionFilter[]
secret?: Secret
}
export interface Mirroring {
static?: Mirror[]
}
export interface Settings { export interface Settings {
info: Info info: Info
payments?: Payments payments?: Payments
@ -158,4 +177,5 @@ export interface Settings {
network: Network network: Network
workers?: Worker workers?: Worker
limits?: Limits limits?: Limits
mirroring?: Mirroring
} }

View File

@ -14,7 +14,7 @@ import { SettingsStatic } from '../utils/settings'
const debug = createLogger('app-primary') const debug = createLogger('app-primary')
export class App implements IRunnable { export class App implements IRunnable {
private workers: WeakMap<Worker, string> private workers: WeakMap<Worker, Record<string, string>>
private watchers: FSWatcher[] | undefined private watchers: FSWatcher[] | undefined
public constructor( public constructor(
@ -66,18 +66,32 @@ export class App implements IRunnable {
? Number(process.env.WORKER_COUNT) ? Number(process.env.WORKER_COUNT)
: this.settings().workers?.count || cpus().length : this.settings().workers?.count || cpus().length
for (let i = 0; i < workerCount; i++) { const createWorker = (env: Record<string, string>) => {
debug('starting worker') const worker = this.cluster.fork(env)
const worker = this.cluster.fork({ this.workers.set(worker, env)
WORKER_TYPE: 'worker',
})
this.workers.set(worker, 'worker')
} }
const worker = this.cluster.fork({ for (let i = 0; i < workerCount; i++) {
debug('starting worker')
createWorker({
WORKER_TYPE: 'worker',
})
}
createWorker({
WORKER_TYPE: 'maintenance', WORKER_TYPE: 'maintenance',
}) })
this.workers.set(worker, 'maintenance')
const mirrors = settings?.mirroring?.static
if (Array.isArray(mirrors) && mirrors.length) {
for (let i = 0; i < mirrors.length; i++) {
createWorker({
WORKER_TYPE: 'static-mirroring',
MIRROR_INDEX: i.toString(),
})
}
}
logCentered(`${workerCount} workers started`, width) logCentered(`${workerCount} workers started`, width)
@ -111,14 +125,13 @@ export class App implements IRunnable {
} }
setTimeout(() => { setTimeout(() => {
debug('starting worker') debug('starting worker')
const workerType = this.workers.get(deadWorker) const workerEnv = this.workers.get(deadWorker)
if (!workerType) { if (!workerEnv) {
throw new Error('Mistakes were made') throw new Error('Mistakes were made')
} }
const newWorker = this.cluster.fork({ const newWorker = this.cluster.fork(workerEnv)
WORKER_TYPE: workerType, this.workers.set(newWorker, workerEnv)
})
this.workers.set(newWorker, workerType)
debug('started worker %s', newWorker.process.pid) debug('started worker %s', newWorker.process.pid)
}, 10000) }, 10000)
} }

View File

@ -0,0 +1,149 @@
import { anyPass, map, path } from 'ramda'
import { RawData, WebSocket } from 'ws'
import cluster from 'cluster'
import { randomUUID } from 'crypto'
import { createRelayedEventMessage, createSubscriptionMessage } from '../utils/messages'
import { isEventIdValid, isEventMatchingFilter, isEventSignatureValid } from '../utils/event'
import { Mirror, Settings } from '../@types/settings'
import { createLogger } from '../factories/logger-factory'
import { IRunnable } from '../@types/base'
import { OutgoingEventMessage } from '../@types/messages'
import { WebSocketServerAdapterEvent } from '../constants/adapter'
const debug = createLogger('static-mirror-worker')
export class StaticMirroringWorker implements IRunnable {
private client: WebSocket | undefined
private config: Mirror
public constructor(
private readonly process: NodeJS.Process,
private readonly settings: () => Settings,
) {
this.process
.on('message', this.onMessage.bind(this))
.on('SIGINT', this.onExit.bind(this))
.on('SIGHUP', this.onExit.bind(this))
.on('SIGTERM', this.onExit.bind(this))
.on('uncaughtException', this.onError.bind(this))
.on('unhandledRejection', this.onError.bind(this))
}
public run(): void {
const currentSettings = this.settings()
console.log('mirroring', currentSettings.mirroring)
this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror
let since = Math.floor(Date.now() / 1000) - 60*10
const createMirror = (config: Mirror) => {
const subscriptionId = `mirror-${randomUUID()}`
debug('connecting to %s', config.address)
return new WebSocket(config.address, { timeout: 5000 })
.on('open', function () {
debug('connected to %s', config.address)
if (Array.isArray(config.filters) && config.filters?.length) {
const filters = config.filters.map((filter) => ({ ...filter, since }))
debug('subscribing with %s: %o', subscriptionId, filters)
this.send(JSON.stringify(createSubscriptionMessage(subscriptionId, filters)))
}
})
.on('message', async function (raw: RawData) {
try {
const message = JSON.parse(raw.toString('utf8')) as OutgoingEventMessage
debug('received: %o', message)
if (!Array.isArray(message)) {
return
}
if (message[0] !== 'EVENT' || message[1] !== subscriptionId) {
return
}
const event = message[2]
if (!anyPass(map(isEventMatchingFilter, config.filters))(event)) {
return
}
if (!await isEventIdValid(event) || !await isEventSignatureValid(event)) {
return
}
since = Math.floor(Date.now()) - 30
if (cluster.isWorker && typeof process.send === 'function') {
process.send({
eventName: WebSocketServerAdapterEvent.Broadcast,
event,
source: config.address,
})
}
} catch (error) {
debug('unable to process message: %o', error)
}
})
.on('close', (code, reason) => {
debug(`disconnected (${code}): ${reason.toString()}`)
setTimeout(() => {
this.client.removeAllListeners()
this.client = createMirror(config)
}, 5000)
})
.on('error', function (error) {
debug('connection error: %o', error)
})
}
this.client = createMirror(this.config)
}
private onMessage(message: { eventName: string, event: unknown, source: string }): void {
if (
message.eventName !== WebSocketServerAdapterEvent.Broadcast
|| message.source === this.config.address
|| !this.client
|| this.client.readyState !== WebSocket.OPEN
) {
return
}
debug('received broadcast: %o', message.event)
const eventToRelay = createRelayedEventMessage(message.event as any, this.config.secret)
debug('relaying: %o', eventToRelay)
this.client.send(JSON.stringify(eventToRelay))
}
private onError(error: Error) {
debug('error: %o', error)
throw error
}
private onExit() {
debug('exiting')
this.close(() => {
this.process.exit(0)
})
}
public close(callback?: () => void) {
debug('closing')
if (this.client) {
this.client.terminate()
}
if (typeof callback === 'function') {
callback()
}
}
}

View File

@ -0,0 +1,6 @@
import { createSettings } from './settings-factory'
import { StaticMirroringWorker } from '../app/static-mirroring-worker'
export const staticMirroringWorkerFactory = () => {
return new StaticMirroringWorker(process, createSettings)
}

View File

@ -4,6 +4,7 @@ dotenv.config()
import { appFactory } from './factories/app-factory' import { appFactory } from './factories/app-factory'
import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory' import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory'
import { staticMirroringWorkerFactory } from './factories/static-mirroring.worker-factory'
import { workerFactory } from './factories/worker-factory' import { workerFactory } from './factories/worker-factory'
export const getRunner = () => { export const getRunner = () => {
@ -15,6 +16,8 @@ export const getRunner = () => {
return workerFactory() return workerFactory()
case 'maintenance': case 'maintenance':
return maintenanceWorkerFactory() return maintenanceWorkerFactory()
case 'static-mirroring':
return staticMirroringWorkerFactory()
default: default:
throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`) throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`)
} }

View File

@ -1,12 +1,15 @@
import { import {
EndOfStoredEventsNotice, EndOfStoredEventsNotice,
IncomingEventMessage,
IncomingRelayedEventMessage,
MessageType, MessageType,
NoticeMessage, NoticeMessage,
OutgoingMessage, OutgoingMessage,
SubscribeMessage,
} from '../@types/messages' } from '../@types/messages'
import { Event } from '../@types/event' import { Event, RelayedEvent } from '../@types/event'
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
import { EventId } from '../@types/base' import { EventId } from '../@types/base'
import { SubscriptionId } from '../@types/subscription'
export const createNoticeMessage = (notice: string): NoticeMessage => { export const createNoticeMessage = (notice: string): NoticeMessage => {
return [MessageType.NOTICE, notice] return [MessageType.NOTICE, notice]
@ -30,3 +33,19 @@ export const createEndOfStoredEventsNoticeMessage = (
export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => { export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => {
return [MessageType.OK, eventId, successful, message] return [MessageType.OK, eventId, successful, message]
} }
export const createSubscriptionMessage = (
subscriptionId: SubscriptionId,
filters: SubscriptionFilter[]
): SubscribeMessage => {
return [MessageType.REQ, subscriptionId, ...filters] as any
}
export const createRelayedEventMessage =
(event: RelayedEvent, secret?: string): IncomingRelayedEventMessage | IncomingEventMessage => {
if (!secret) {
return [MessageType.EVENT, event]
}
return [MessageType.EVENT, event, secret]
}