mirror of
https://github.com/Cameri/nostream.git
synced 2025-07-14 15:52:23 +02:00
feat: implement static mirroring
Signed-off-by: Ricardo Arturo Cabral Mejía <me@ricardocabral.io>
This commit is contained in:
@ -47,8 +47,11 @@ Running `nostream` for the first time creates the settings file in `<project_roo
|
|||||||
| info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) |
|
| info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) |
|
||||||
| info.pubkey | Relay operator's Nostr pubkey in hex format. |
|
| info.pubkey | Relay operator's Nostr pubkey in hex format. |
|
||||||
| info.contact | Relay operator's contact. (e.g. mailto:operator@relay-your-domain.com) |
|
| info.contact | Relay operator's contact. (e.g. mailto:operator@relay-your-domain.com) |
|
||||||
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
|
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
|
||||||
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
|
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
|
||||||
|
| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) |
|
||||||
|
| mirroring.static[].filters | Subscription filters used to mirror. |
|
||||||
|
| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. |
|
||||||
| workers.count | Number of workers to spin up to handle incoming connections. |
|
| workers.count | Number of workers to spin up to handle incoming connections. |
|
||||||
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
|
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
|
||||||
| limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. |
|
| limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. |
|
||||||
|
@ -35,6 +35,8 @@ network:
|
|||||||
idleTimeout: 60
|
idleTimeout: 60
|
||||||
workers:
|
workers:
|
||||||
count: 0
|
count: 0
|
||||||
|
mirroring:
|
||||||
|
static: []
|
||||||
limits:
|
limits:
|
||||||
invoice:
|
invoice:
|
||||||
rateLimits:
|
rateLimits:
|
||||||
|
@ -7,6 +7,8 @@ export type TagName = string
|
|||||||
export type Signature = string
|
export type Signature = string
|
||||||
export type Tag = TagBase & string[]
|
export type Tag = TagBase & string[]
|
||||||
|
|
||||||
|
export type Secret = string
|
||||||
|
|
||||||
export interface TagBase {
|
export interface TagBase {
|
||||||
0: TagName
|
0: TagName
|
||||||
[index: number]: string
|
[index: number]: string
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { ContextMetadata, EventId, Pubkey, Tag } from './base'
|
import { ContextMetadata, EventId, Pubkey, Tag } from './base'
|
||||||
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base'
|
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base'
|
||||||
|
|
||||||
export interface Event {
|
export interface BaseEvent {
|
||||||
id: EventId
|
id: EventId
|
||||||
pubkey: Pubkey
|
pubkey: Pubkey
|
||||||
created_at: number
|
created_at: number
|
||||||
@ -9,9 +9,14 @@ export interface Event {
|
|||||||
tags: Tag[]
|
tags: Tag[]
|
||||||
sig: string
|
sig: string
|
||||||
content: string
|
content: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Event extends BaseEvent {
|
||||||
[ContextMetadataKey]?: ContextMetadata
|
[ContextMetadataKey]?: ContextMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type RelayedEvent = Event
|
||||||
|
|
||||||
export type UnsignedEvent = Omit<Event, 'sig'>
|
export type UnsignedEvent = Omit<Event, 'sig'>
|
||||||
|
|
||||||
export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'>
|
export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'>
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { ContextMetadata, EventId, Range } from './base'
|
import { ContextMetadata, EventId, Range, Secret } from './base'
|
||||||
|
import { Event, RelayedEvent } from './event'
|
||||||
import { SubscriptionFilter, SubscriptionId } from './subscription'
|
import { SubscriptionFilter, SubscriptionId } from './subscription'
|
||||||
import { ContextMetadataKey } from '../constants/base'
|
import { ContextMetadataKey } from '../constants/base'
|
||||||
import { Event } from './event'
|
|
||||||
|
|
||||||
export enum MessageType {
|
export enum MessageType {
|
||||||
REQ = 'REQ',
|
REQ = 'REQ',
|
||||||
@ -36,9 +36,12 @@ export type SubscribeMessage = {
|
|||||||
|
|
||||||
export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event]
|
export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event]
|
||||||
|
|
||||||
|
export type IncomingRelayedEventMessage = [MessageType.EVENT, RelayedEvent, Secret]
|
||||||
|
|
||||||
export interface EventMessage {
|
export interface EventMessage {
|
||||||
0: MessageType.EVENT
|
0: MessageType.EVENT
|
||||||
1: Event
|
1: Event
|
||||||
|
2?: Secret
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface OutgoingEventMessage {
|
export interface OutgoingEventMessage {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
|
import { Pubkey, Secret } from './base'
|
||||||
import { EventKinds } from '../constants/base'
|
import { EventKinds } from '../constants/base'
|
||||||
import { MessageType } from './messages'
|
import { MessageType } from './messages'
|
||||||
import { Pubkey } from './base'
|
import { SubscriptionFilter } from './subscription'
|
||||||
|
|
||||||
export interface Info {
|
export interface Info {
|
||||||
relay_url: string
|
relay_url: string
|
||||||
@ -151,6 +152,24 @@ export interface PaymentsProcessors {
|
|||||||
zebedee?: ZebedeePaymentsProcessor
|
zebedee?: ZebedeePaymentsProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface Local {
|
||||||
|
secret: Secret
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Remote {
|
||||||
|
secret: Secret
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Mirror {
|
||||||
|
address: string
|
||||||
|
filters?: SubscriptionFilter[]
|
||||||
|
secret?: Secret
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Mirroring {
|
||||||
|
static?: Mirror[]
|
||||||
|
}
|
||||||
|
|
||||||
export interface Settings {
|
export interface Settings {
|
||||||
info: Info
|
info: Info
|
||||||
payments?: Payments
|
payments?: Payments
|
||||||
@ -158,4 +177,5 @@ export interface Settings {
|
|||||||
network: Network
|
network: Network
|
||||||
workers?: Worker
|
workers?: Worker
|
||||||
limits?: Limits
|
limits?: Limits
|
||||||
|
mirroring?: Mirroring
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ import { SettingsStatic } from '../utils/settings'
|
|||||||
const debug = createLogger('app-primary')
|
const debug = createLogger('app-primary')
|
||||||
|
|
||||||
export class App implements IRunnable {
|
export class App implements IRunnable {
|
||||||
private workers: WeakMap<Worker, string>
|
private workers: WeakMap<Worker, Record<string, string>>
|
||||||
private watchers: FSWatcher[] | undefined
|
private watchers: FSWatcher[] | undefined
|
||||||
|
|
||||||
public constructor(
|
public constructor(
|
||||||
@ -66,18 +66,32 @@ export class App implements IRunnable {
|
|||||||
? Number(process.env.WORKER_COUNT)
|
? Number(process.env.WORKER_COUNT)
|
||||||
: this.settings().workers?.count || cpus().length
|
: this.settings().workers?.count || cpus().length
|
||||||
|
|
||||||
for (let i = 0; i < workerCount; i++) {
|
const createWorker = (env: Record<string, string>) => {
|
||||||
debug('starting worker')
|
const worker = this.cluster.fork(env)
|
||||||
const worker = this.cluster.fork({
|
this.workers.set(worker, env)
|
||||||
WORKER_TYPE: 'worker',
|
|
||||||
})
|
|
||||||
this.workers.set(worker, 'worker')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const worker = this.cluster.fork({
|
for (let i = 0; i < workerCount; i++) {
|
||||||
|
debug('starting worker')
|
||||||
|
createWorker({
|
||||||
|
WORKER_TYPE: 'worker',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
createWorker({
|
||||||
WORKER_TYPE: 'maintenance',
|
WORKER_TYPE: 'maintenance',
|
||||||
})
|
})
|
||||||
this.workers.set(worker, 'maintenance')
|
|
||||||
|
const mirrors = settings?.mirroring?.static
|
||||||
|
|
||||||
|
if (Array.isArray(mirrors) && mirrors.length) {
|
||||||
|
for (let i = 0; i < mirrors.length; i++) {
|
||||||
|
createWorker({
|
||||||
|
WORKER_TYPE: 'static-mirroring',
|
||||||
|
MIRROR_INDEX: i.toString(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
logCentered(`${workerCount} workers started`, width)
|
logCentered(`${workerCount} workers started`, width)
|
||||||
|
|
||||||
@ -111,14 +125,13 @@ export class App implements IRunnable {
|
|||||||
}
|
}
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
debug('starting worker')
|
debug('starting worker')
|
||||||
const workerType = this.workers.get(deadWorker)
|
const workerEnv = this.workers.get(deadWorker)
|
||||||
if (!workerType) {
|
if (!workerEnv) {
|
||||||
throw new Error('Mistakes were made')
|
throw new Error('Mistakes were made')
|
||||||
}
|
}
|
||||||
const newWorker = this.cluster.fork({
|
const newWorker = this.cluster.fork(workerEnv)
|
||||||
WORKER_TYPE: workerType,
|
this.workers.set(newWorker, workerEnv)
|
||||||
})
|
|
||||||
this.workers.set(newWorker, workerType)
|
|
||||||
debug('started worker %s', newWorker.process.pid)
|
debug('started worker %s', newWorker.process.pid)
|
||||||
}, 10000)
|
}, 10000)
|
||||||
}
|
}
|
||||||
|
149
src/app/static-mirroring-worker.ts
Normal file
149
src/app/static-mirroring-worker.ts
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
import { anyPass, map, path } from 'ramda'
|
||||||
|
import { RawData, WebSocket } from 'ws'
|
||||||
|
import cluster from 'cluster'
|
||||||
|
import { randomUUID } from 'crypto'
|
||||||
|
|
||||||
|
import { createRelayedEventMessage, createSubscriptionMessage } from '../utils/messages'
|
||||||
|
import { isEventIdValid, isEventMatchingFilter, isEventSignatureValid } from '../utils/event'
|
||||||
|
import { Mirror, Settings } from '../@types/settings'
|
||||||
|
import { createLogger } from '../factories/logger-factory'
|
||||||
|
import { IRunnable } from '../@types/base'
|
||||||
|
import { OutgoingEventMessage } from '../@types/messages'
|
||||||
|
import { WebSocketServerAdapterEvent } from '../constants/adapter'
|
||||||
|
|
||||||
|
const debug = createLogger('static-mirror-worker')
|
||||||
|
|
||||||
|
export class StaticMirroringWorker implements IRunnable {
|
||||||
|
private client: WebSocket | undefined
|
||||||
|
private config: Mirror
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private readonly process: NodeJS.Process,
|
||||||
|
private readonly settings: () => Settings,
|
||||||
|
) {
|
||||||
|
this.process
|
||||||
|
.on('message', this.onMessage.bind(this))
|
||||||
|
.on('SIGINT', this.onExit.bind(this))
|
||||||
|
.on('SIGHUP', this.onExit.bind(this))
|
||||||
|
.on('SIGTERM', this.onExit.bind(this))
|
||||||
|
.on('uncaughtException', this.onError.bind(this))
|
||||||
|
.on('unhandledRejection', this.onError.bind(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
public run(): void {
|
||||||
|
const currentSettings = this.settings()
|
||||||
|
|
||||||
|
console.log('mirroring', currentSettings.mirroring)
|
||||||
|
|
||||||
|
this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror
|
||||||
|
|
||||||
|
let since = Math.floor(Date.now() / 1000) - 60*10
|
||||||
|
|
||||||
|
const createMirror = (config: Mirror) => {
|
||||||
|
const subscriptionId = `mirror-${randomUUID()}`
|
||||||
|
|
||||||
|
debug('connecting to %s', config.address)
|
||||||
|
|
||||||
|
return new WebSocket(config.address, { timeout: 5000 })
|
||||||
|
.on('open', function () {
|
||||||
|
debug('connected to %s', config.address)
|
||||||
|
|
||||||
|
if (Array.isArray(config.filters) && config.filters?.length) {
|
||||||
|
const filters = config.filters.map((filter) => ({ ...filter, since }))
|
||||||
|
|
||||||
|
debug('subscribing with %s: %o', subscriptionId, filters)
|
||||||
|
|
||||||
|
this.send(JSON.stringify(createSubscriptionMessage(subscriptionId, filters)))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('message', async function (raw: RawData) {
|
||||||
|
try {
|
||||||
|
const message = JSON.parse(raw.toString('utf8')) as OutgoingEventMessage
|
||||||
|
debug('received: %o', message)
|
||||||
|
|
||||||
|
if (!Array.isArray(message)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message[0] !== 'EVENT' || message[1] !== subscriptionId) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const event = message[2]
|
||||||
|
|
||||||
|
if (!anyPass(map(isEventMatchingFilter, config.filters))(event)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!await isEventIdValid(event) || !await isEventSignatureValid(event)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
since = Math.floor(Date.now()) - 30
|
||||||
|
|
||||||
|
if (cluster.isWorker && typeof process.send === 'function') {
|
||||||
|
process.send({
|
||||||
|
eventName: WebSocketServerAdapterEvent.Broadcast,
|
||||||
|
event,
|
||||||
|
source: config.address,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
debug('unable to process message: %o', error)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('close', (code, reason) => {
|
||||||
|
debug(`disconnected (${code}): ${reason.toString()}`)
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
this.client.removeAllListeners()
|
||||||
|
this.client = createMirror(config)
|
||||||
|
}, 5000)
|
||||||
|
})
|
||||||
|
.on('error', function (error) {
|
||||||
|
debug('connection error: %o', error)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
this.client = createMirror(this.config)
|
||||||
|
}
|
||||||
|
|
||||||
|
private onMessage(message: { eventName: string, event: unknown, source: string }): void {
|
||||||
|
if (
|
||||||
|
message.eventName !== WebSocketServerAdapterEvent.Broadcast
|
||||||
|
|| message.source === this.config.address
|
||||||
|
|| !this.client
|
||||||
|
|| this.client.readyState !== WebSocket.OPEN
|
||||||
|
) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
debug('received broadcast: %o', message.event)
|
||||||
|
|
||||||
|
const eventToRelay = createRelayedEventMessage(message.event as any, this.config.secret)
|
||||||
|
debug('relaying: %o', eventToRelay)
|
||||||
|
this.client.send(JSON.stringify(eventToRelay))
|
||||||
|
}
|
||||||
|
|
||||||
|
private onError(error: Error) {
|
||||||
|
debug('error: %o', error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
|
||||||
|
private onExit() {
|
||||||
|
debug('exiting')
|
||||||
|
this.close(() => {
|
||||||
|
this.process.exit(0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
public close(callback?: () => void) {
|
||||||
|
debug('closing')
|
||||||
|
if (this.client) {
|
||||||
|
this.client.terminate()
|
||||||
|
}
|
||||||
|
if (typeof callback === 'function') {
|
||||||
|
callback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
6
src/factories/static-mirroring.worker-factory.ts
Normal file
6
src/factories/static-mirroring.worker-factory.ts
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
import { createSettings } from './settings-factory'
|
||||||
|
import { StaticMirroringWorker } from '../app/static-mirroring-worker'
|
||||||
|
|
||||||
|
export const staticMirroringWorkerFactory = () => {
|
||||||
|
return new StaticMirroringWorker(process, createSettings)
|
||||||
|
}
|
@ -4,6 +4,7 @@ dotenv.config()
|
|||||||
|
|
||||||
import { appFactory } from './factories/app-factory'
|
import { appFactory } from './factories/app-factory'
|
||||||
import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory'
|
import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory'
|
||||||
|
import { staticMirroringWorkerFactory } from './factories/static-mirroring.worker-factory'
|
||||||
import { workerFactory } from './factories/worker-factory'
|
import { workerFactory } from './factories/worker-factory'
|
||||||
|
|
||||||
export const getRunner = () => {
|
export const getRunner = () => {
|
||||||
@ -15,6 +16,8 @@ export const getRunner = () => {
|
|||||||
return workerFactory()
|
return workerFactory()
|
||||||
case 'maintenance':
|
case 'maintenance':
|
||||||
return maintenanceWorkerFactory()
|
return maintenanceWorkerFactory()
|
||||||
|
case 'static-mirroring':
|
||||||
|
return staticMirroringWorkerFactory()
|
||||||
default:
|
default:
|
||||||
throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`)
|
throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`)
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,15 @@
|
|||||||
import {
|
import {
|
||||||
EndOfStoredEventsNotice,
|
EndOfStoredEventsNotice,
|
||||||
|
IncomingEventMessage,
|
||||||
|
IncomingRelayedEventMessage,
|
||||||
MessageType,
|
MessageType,
|
||||||
NoticeMessage,
|
NoticeMessage,
|
||||||
OutgoingMessage,
|
OutgoingMessage,
|
||||||
|
SubscribeMessage,
|
||||||
} from '../@types/messages'
|
} from '../@types/messages'
|
||||||
import { Event } from '../@types/event'
|
import { Event, RelayedEvent } from '../@types/event'
|
||||||
|
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
|
||||||
import { EventId } from '../@types/base'
|
import { EventId } from '../@types/base'
|
||||||
import { SubscriptionId } from '../@types/subscription'
|
|
||||||
|
|
||||||
export const createNoticeMessage = (notice: string): NoticeMessage => {
|
export const createNoticeMessage = (notice: string): NoticeMessage => {
|
||||||
return [MessageType.NOTICE, notice]
|
return [MessageType.NOTICE, notice]
|
||||||
@ -30,3 +33,19 @@ export const createEndOfStoredEventsNoticeMessage = (
|
|||||||
export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => {
|
export const createCommandResult = (eventId: EventId, successful: boolean, message: string) => {
|
||||||
return [MessageType.OK, eventId, successful, message]
|
return [MessageType.OK, eventId, successful, message]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const createSubscriptionMessage = (
|
||||||
|
subscriptionId: SubscriptionId,
|
||||||
|
filters: SubscriptionFilter[]
|
||||||
|
): SubscribeMessage => {
|
||||||
|
return [MessageType.REQ, subscriptionId, ...filters] as any
|
||||||
|
}
|
||||||
|
|
||||||
|
export const createRelayedEventMessage =
|
||||||
|
(event: RelayedEvent, secret?: string): IncomingRelayedEventMessage | IncomingEventMessage => {
|
||||||
|
if (!secret) {
|
||||||
|
return [MessageType.EVENT, event]
|
||||||
|
}
|
||||||
|
|
||||||
|
return [MessageType.EVENT, event, secret]
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user