feat: implement static mirroring

Signed-off-by: Ricardo Arturo Cabral Mejía <me@ricardocabral.io>
This commit is contained in:
Ricardo Arturo Cabral Mejía 2023-01-29 01:06:12 -05:00
parent 282f7db0a1
commit cd4c60a139
11 changed files with 248 additions and 23 deletions

View File

@ -47,8 +47,11 @@ Running `nostream` for the first time creates the settings file in `<project_roo
| info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) |
| info.pubkey | Relay operator's Nostr pubkey in hex format. |
| info.contact | Relay operator's contact. (e.g. mailto:operator@relay-your-domain.com) |
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) |
| mirroring.static[].filters | Subscription filters used to mirror. |
| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. |
| workers.count | Number of workers to spin up to handle incoming connections. |
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
| limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. |

View File

@ -35,6 +35,8 @@ network:
idleTimeout: 60
workers:
count: 0
mirroring:
static: []
limits:
invoice:
rateLimits:

View File

@ -7,6 +7,8 @@ export type TagName = string
export type Signature = string
export type Tag = TagBase & string[]
export type Secret = string
export interface TagBase {
0: TagName
[index: number]: string

View File

@ -1,7 +1,7 @@
import { ContextMetadata, EventId, Pubkey, Tag } from './base'
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base'
export interface Event {
export interface BaseEvent {
id: EventId
pubkey: Pubkey
created_at: number
@ -9,9 +9,14 @@ export interface Event {
tags: Tag[]
sig: string
content: string
}
export interface Event extends BaseEvent {
[ContextMetadataKey]?: ContextMetadata
}
export type RelayedEvent = Event
export type UnsignedEvent = Omit<Event, 'sig'>
export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'>

View File

@ -1,7 +1,7 @@
import { ContextMetadata, EventId, Range } from './base'
import { ContextMetadata, EventId, Range, Secret } from './base'
import { Event, RelayedEvent } from './event'
import { SubscriptionFilter, SubscriptionId } from './subscription'
import { ContextMetadataKey } from '../constants/base'
import { Event } from './event'
export enum MessageType {
REQ = 'REQ',
@ -36,9 +36,12 @@ export type SubscribeMessage = {
export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event]
export type IncomingRelayedEventMessage = [MessageType.EVENT, RelayedEvent, Secret]
export interface EventMessage {
0: MessageType.EVENT
1: Event
2?: Secret
}
export interface OutgoingEventMessage {

View File

@ -1,6 +1,7 @@
import { Pubkey, Secret } from './base'
import { EventKinds } from '../constants/base'
import { MessageType } from './messages'
import { Pubkey } from './base'
import { SubscriptionFilter } from './subscription'
export interface Info {
relay_url: string
@ -151,6 +152,24 @@ export interface PaymentsProcessors {
zebedee?: ZebedeePaymentsProcessor
}
export interface Local {
secret: Secret
}
export interface Remote {
secret: Secret
}
export interface Mirror {
address: string
filters?: SubscriptionFilter[]
secret?: Secret
}
export interface Mirroring {
static?: Mirror[]
}
export interface Settings {
info: Info
payments?: Payments
@ -158,4 +177,5 @@ export interface Settings {
network: Network
workers?: Worker
limits?: Limits
mirroring?: Mirroring
}

View File

@ -14,7 +14,7 @@ import { SettingsStatic } from '../utils/settings'
const debug = createLogger('app-primary')
export class App implements IRunnable {
private workers: WeakMap<Worker, string>
private workers: WeakMap<Worker, Record<string, string>>
private watchers: FSWatcher[] | undefined
public constructor(
@ -66,18 +66,32 @@ export class App implements IRunnable {
? Number(process.env.WORKER_COUNT)
: this.settings().workers?.count || cpus().length
for (let i = 0; i < workerCount; i++) {
debug('starting worker')
const worker = this.cluster.fork({
WORKER_TYPE: 'worker',
})
this.workers.set(worker, 'worker')
const createWorker = (env: Record<string, string>) => {
const worker = this.cluster.fork(env)
this.workers.set(worker, env)
}
const worker = this.cluster.fork({
for (let i = 0; i < workerCount; i++) {
debug('starting worker')
createWorker({
WORKER_TYPE: 'worker',
})
}
createWorker({
WORKER_TYPE: 'maintenance',
})
this.workers.set(worker, 'maintenance')
const mirrors = settings?.mirroring?.static
if (Array.isArray(mirrors) && mirrors.length) {
for (let i = 0; i < mirrors.length; i++) {
createWorker({
WORKER_TYPE: 'static-mirroring',
MIRROR_INDEX: i.toString(),
})
}
}
logCentered(`${workerCount} workers started`, width)
@ -111,14 +125,13 @@ export class App implements IRunnable {
}
setTimeout(() => {
debug('starting worker')
const workerType = this.workers.get(deadWorker)
if (!workerType) {
const workerEnv = this.workers.get(deadWorker)
if (!workerEnv) {
throw new Error('Mistakes were made')
}
const newWorker = this.cluster.fork({
WORKER_TYPE: workerType,
})
this.workers.set(newWorker, workerType)
const newWorker = this.cluster.fork(workerEnv)
this.workers.set(newWorker, workerEnv)
debug('started worker %s', newWorker.process.pid)
}, 10000)
}

View File

@ -0,0 +1,149 @@
import { anyPass, map, path } from 'ramda'
import { RawData, WebSocket } from 'ws'
import cluster from 'cluster'
import { randomUUID } from 'crypto'
import { createRelayedEventMessage, createSubscriptionMessage } from '../utils/messages'
import { isEventIdValid, isEventMatchingFilter, isEventSignatureValid } from '../utils/event'
import { Mirror, Settings } from '../@types/settings'
import { createLogger } from '../factories/logger-factory'
import { IRunnable } from '../@types/base'
import { OutgoingEventMessage } from '../@types/messages'
import { WebSocketServerAdapterEvent } from '../constants/adapter'
const debug = createLogger('static-mirror-worker')
export class StaticMirroringWorker implements IRunnable {
private client: WebSocket | undefined
private config: Mirror
public constructor(
private readonly process: NodeJS.Process,
private readonly settings: () => Settings,
) {
this.process
.on('message', this.onMessage.bind(this))
.on('SIGINT', this.onExit.bind(this))
.on('SIGHUP', this.onExit.bind(this))
.on('SIGTERM', this.onExit.bind(this))
.on('uncaughtException', this.onError.bind(this))
.on('unhandledRejection', this.onError.bind(this))
}
public run(): void {
const currentSettings = this.settings()
console.log('mirroring', currentSettings.mirroring)
this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror
let since = Math.floor(Date.now() / 1000) - 60*10
const createMirror = (config: Mirror) => {
const subscriptionId = `mirror-${randomUUID()}`
debug('connecting to %s', config.address)
return new WebSocket(config.address, { timeout: 5000 })
.on('open', function () {
debug('connected to %s', config.address)
if (Array.isArray(config.filters) && config.filters?.length) {
const filters = config.filters.map((filter) => ({ ...filter, since }))
debug('subscribing with %s: %o', subscriptionId, filters)
this.send(JSON.stringify(createSubscriptionMessage(subscriptionId, filters)))
}
})
.on('message', async function (raw: RawData) {
try {
const message = JSON.parse(raw.toString('utf8')) as OutgoingEventMessage
debug('received: %o', message)
if (!Array.isArray(message)) {
return
}
if (message[0] !== 'EVENT' || message[1] !== subscriptionId) {
return
}
const event = message[2]
if (!anyPass(map(isEventMatchingFilter, config.filters))(event)) {
return
}
if (!await isEventIdValid(event) || !await isEventSignatureValid(event)) {
return
}
since = Math.floor(Date.now()) - 30
if (cluster.isWorker && typeof process.send === 'function') {
process.send({
eventName: WebSocketServerAdapterEvent.Broadcast,
event,
source: config.address,
})
}
} catch (error) {
debug('unable to process message: %o', error)
}
})
.on('close', (code, reason) => {
debug(`disconnected (${code}): ${reason.toString()}`)
setTimeout(() => {
this.client.removeAllListeners()
this.client = createMirror(config)
}, 5000)
})
.on('error', function (error) {
debug('connection error: %o', error)
})
}
this.client = createMirror(this.config)
}
private onMessage(message: { eventName: string, event: unknown, source: string }): void {
if (
message.eventName !== WebSocketServerAdapterEvent.Broadcast
|| message.source === this.config.address
|| !this.client
|| this.client.readyState !== WebSocket.OPEN
) {
return
}
debug('received broadcast: %o', message.event)
const eventToRelay = createRelayedEventMessage(message.event as any, this.config.secret)
debug('relaying: %o', eventToRelay)
this.client.send(JSON.stringify(eventToRelay))
}
private onError(error: Error) {
debug('error: %o', error)
throw error
}
private onExit() {
debug('exiting')
this.close(() => {
this.process.exit(0)
})
}
public close(callback?: () => void) {
debug('closing')
if (this.client) {
this.client.terminate()
}
if (typeof callback === 'function') {
callback()
}
}
}

View File

@ -0,0 +1,6 @@
import { createSettings } from './settings-factory'
import { StaticMirroringWorker } from '../app/static-mirroring-worker'
export const staticMirroringWorkerFactory = () => {
return new StaticMirroringWorker(process, createSettings)
}

View File

@ -4,6 +4,7 @@ dotenv.config()
import { appFactory } from './factories/app-factory'
import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory'
import { staticMirroringWorkerFactory } from './factories/static-mirroring.worker-factory'
import { workerFactory } from './factories/worker-factory'
export const getRunner = () => {
@ -15,6 +16,8 @@ export const getRunner = () => {
return workerFactory()
case 'maintenance':
return maintenanceWorkerFactory()
case 'static-mirroring':
return staticMirroringWorkerFactory()
default:
throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`)
}

View File

@ -1,12 +1,15 @@
import {
EndOfStoredEventsNotice,
IncomingEventMessage,
IncomingRelayedEventMessage,
MessageType,
NoticeMessage,
OutgoingMessage,
SubscribeMessage,
} from '../@types/messages'
import { Event } from '../@types/event'
import { Event, RelayedEvent } from '../@types/event'
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
import { EventId } from '../@types/base'
import { SubscriptionId } from '../@types/subscription'
export const createNoticeMessage = (notice: string): NoticeMessage => {
return [MessageType.NOTICE, notice]
@ -30,3 +33,19 @@ export const createEndOfStoredEventsNoticeMessage = (
export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => {
return [MessageType.OK, eventId, successful, message]
}
export const createSubscriptionMessage = (
subscriptionId: SubscriptionId,
filters: SubscriptionFilter[]
): SubscribeMessage => {
return [MessageType.REQ, subscriptionId, ...filters] as any
}
export const createRelayedEventMessage =
(event: RelayedEvent, secret?: string): IncomingRelayedEventMessage | IncomingEventMessage => {
if (!secret) {
return [MessageType.EVENT, event]
}
return [MessageType.EVENT, event, secret]
}