From 58c8a1371aa86cc9a317e1bccf56b4e31e544c7d Mon Sep 17 00:00:00 2001 From: Ricardo Arturo Cabral Mejia Date: Wed, 31 Aug 2022 12:52:21 +0000 Subject: [PATCH] feat: clustering --- .nvmrc | 2 +- .nycrc.json | 3 +- knexfile.js | 2 +- src/@types/adapters.ts | 1 + src/@types/runes.ts | 14 +++ src/adapters/web-socket-adapter.ts | 21 ++-- src/adapters/web-socket-server-adapter.ts | 21 +++- src/constants/adapter.ts | 8 +- src/handlers/subscribe-message-handler.ts | 9 +- src/handlers/unsubscribe-message-handler.ts | 3 +- src/index.ts | 125 ++++++++++++++++---- src/utils/event.ts | 4 - src/utils/runes/alternative.ts | 7 +- src/utils/runes/restriction.ts | 11 +- src/utils/runes/rune-like.ts | 13 +- 15 files changed, 185 insertions(+), 59 deletions(-) create mode 100644 src/@types/runes.ts diff --git a/.nvmrc b/.nvmrc index f274881..39e593e 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -v16.16.0 +v18.8.0 diff --git a/.nycrc.json b/.nycrc.json index 47e7dce..02c7e4d 100644 --- a/.nycrc.json +++ b/.nycrc.json @@ -33,7 +33,8 @@ ], "exclude": [ "src/@types", - "src/constants" + "src/constants", + "src/database" ], "require": [ "ts-node/register", diff --git a/knexfile.js b/knexfile.js index 0f77275..e1638e1 100644 --- a/knexfile.js +++ b/knexfile.js @@ -7,7 +7,7 @@ module.exports = { password: process.env.DB_PASSWORD ?? 'postgres', database: process.env.DB_NAME ?? 'nostr-ts-relay', }, - pool: { min: 0, max: 7 }, + pool: { min: 4, max: 16 }, seeds: { directory: './seeds', }, diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index ee0bb11..5754b13 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -4,6 +4,7 @@ import { WebSocket } from 'ws' export interface IWebSocketServerAdapter extends EventEmitter { getConnectedClients(): number getClients(): Set + terminate(): Promise } export interface IWebServerAdapter extends EventEmitter { diff --git a/src/@types/runes.ts b/src/@types/runes.ts new file mode 100644 index 0000000..3443cc4 --- /dev/null +++ b/src/@types/runes.ts @@ -0,0 +1,14 @@ +export interface IAlternative { + test(values: Record): string | undefined + encode(): string +} + +export interface IRestriction { + test(values: Record): string | undefined + encode(): string +} + +export interface IRuneLike { + test(values: Record): [boolean, string] + encode(): string +} diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index bb32bb5..a529b89 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -6,13 +6,13 @@ import { IAbortable, IMessageHandler } from '../@types/message-handlers' import { IncomingMessage, OutgoingMessage } from '../@types/messages' import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' +import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter' import { attemptValidation } from '../utils/validation' import { createOutgoingEventMessage } from '../utils/messages' import { Event } from '../@types/event' import { Factory } from '../@types/base' import { isEventMatchingFilter } from '../utils/event' import { messageSchema } from '../schemas/message-schema' -import { WebSocketAdapterEvent } from '../constants/adapter' export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter { private id: string @@ -39,11 +39,12 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter .on('pong', this.onClientPong.bind(this)) this - .on('heartbeat', this.onHeartbeat.bind(this)) - .on('subscribe', this.onSubscribed.bind(this)) - .on('unsubscribe', this.onUnsubscribed.bind(this)) - .on(WebSocketAdapterEvent.Send, this.onSend.bind(this)) + .on(WebSocketAdapterEvent.Heartbeat, this.onHeartbeat.bind(this)) + .on(WebSocketAdapterEvent.Subscribe, this.onSubscribed.bind(this)) + .on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this)) + .on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this)) .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this)) + .on(WebSocketAdapterEvent.Message, this.onSendMessage.bind(this)) } public onUnsubscribed(subscriptionId: string): void { @@ -55,10 +56,10 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } public onBroadcast(event: Event): void { - this.webSocketServer.emit('broadcast', event) + this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event) } - public onSend(event: Event): void { + public onSendEvent(event: Event): void { this.subscriptions.forEach((filters, subscriptionId) => { if ( Array.from(filters).map(isEventMatchingFilter).some((Matches) => Matches(event)) @@ -68,10 +69,14 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter }) } - public sendMessage(message: OutgoingMessage): void { + private sendMessage(message: OutgoingMessage): void { this.client.send(JSON.stringify(message)) } + private onSendMessage(message: OutgoingMessage): void { + this.sendMessage(message) + } + public onHeartbeat(): void { if (!this.alive) { this.terminate() diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index 8cbb984..294cd23 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -36,13 +36,23 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) } + public async terminate(): Promise { + return void Promise.all( + [ + ...Array.from(this.webSocketServer.clients).map((webSocket: WebSocket) => + webSocket.terminate() + ), + ], + ) + } + private onBroadcast(event: Event) { this.webSocketServer.clients.forEach((webSocket: WebSocket) => { if (!propEq('readyState', OPEN)(webSocket)) { return } - this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Send, event) + this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Event, event) }) } @@ -62,14 +72,19 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock private onHeartbeat() { console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) - this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat')) + this.webSocketServer.clients.forEach((webSocket) => + this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Heartbeat) + ) } protected onClose() { + this.webSocketServer.clients.forEach((webSocket: WebSocket) => + + webSocket.terminate() + ) console.debug('websocket server closing') clearInterval(this.heartbeatInterval) this.webSocketServer.removeAllListeners() super.onClose() } - } diff --git a/src/constants/adapter.ts b/src/constants/adapter.ts index df4d2c4..79bdbca 100644 --- a/src/constants/adapter.ts +++ b/src/constants/adapter.ts @@ -1,6 +1,10 @@ export enum WebSocketAdapterEvent { - Send = 'send', - Broadcast = 'broadcast' + Event = 'event', + Message = 'message', + Broadcast = 'broadcast', + Subscribe = 'subscribe', + Unsubscribe = 'unsubscribe', + Heartbeat = 'heartbeat' } export enum WebSocketServerAdapterEvent { diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index a897767..ed71978 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -10,6 +10,7 @@ import { Event } from '../@types/event' import { IEventRepository } from '../@types/repositories' import { IWebSocketAdapter } from '../@types/adapters' import { SubscribeMessage } from '../@types/messages' +import { WebSocketAdapterEvent } from '../constants/adapter' export class SubscribeMessageHandler implements IMessageHandler, IAbortable { @@ -30,10 +31,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { const subscriptionId = message[1] as SubscriptionId const filters = message.slice(2) as SubscriptionFilter[] - this.webSocket.emit('subscribe', subscriptionId, new Set(filters)) + this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, new Set(filters)) - const sendEvent = (event: Event) => this.webSocket.sendMessage(createOutgoingEventMessage(subscriptionId, event)) - const sendEOSE = () => this.webSocket.sendMessage(createEndOfStoredEventsNoticeMessage(subscriptionId)) + const sendEvent = (event: Event) => + this.webSocket.emit(WebSocketAdapterEvent.Message, createOutgoingEventMessage(subscriptionId, event)) + const sendEOSE = () => + this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId)) const findEvents = this.eventRepository.findByFilters(filters).stream() try { diff --git a/src/handlers/unsubscribe-message-handler.ts b/src/handlers/unsubscribe-message-handler.ts index 7c12843..b3d7fc2 100644 --- a/src/handlers/unsubscribe-message-handler.ts +++ b/src/handlers/unsubscribe-message-handler.ts @@ -2,6 +2,7 @@ import { IWebSocketAdapter } from '../@types/adapters' import { IMessageHandler } from '../@types/message-handlers' import { UnsubscribeMessage } from '../@types/messages' +import { WebSocketAdapterEvent } from '../constants/adapter' export class UnsubscribeMessageHandler implements IMessageHandler { @@ -10,6 +11,6 @@ export class UnsubscribeMessageHandler implements IMessageHandler { ) { } public async handleMessage(message: UnsubscribeMessage): Promise { - this.webSocket.emit('unsubscribe', message[1]) + this.webSocket.emit(WebSocketAdapterEvent.Unsubscribe, message[1]) } } diff --git a/src/index.ts b/src/index.ts index 67701ab..8eee187 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,7 @@ -import * as http from 'http' +import cluster, { Worker } from 'cluster' +import { cpus } from 'os' +import http from 'http' +import process from 'process' import { WebSocketServer } from 'ws' import { EventRepository } from './repositories/event-repository' @@ -6,29 +9,109 @@ import { getDbClient } from './database/client' import { webSocketAdapterFactory } from './factories/websocket-adapter-factory' import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter' -const server = http.createServer() -const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) const dbClient = getDbClient() const eventRepository = new EventRepository(dbClient) -const adapter = new WebSocketServerAdapter( - server, - wss, - webSocketAdapterFactory(eventRepository) -) - +const numCpus = cpus().length const port = Number(process.env.SERVER_PORT) || 8008 -adapter.listen(port) -process.on('SIGINT', async function () { - console.log('\rCaught interrupt signal') - wss.clients.forEach((client) => client.terminate()) - await new Promise((resolve, reject) => - wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) + +const newWorker = (): Worker => { + let timeout + const worker = cluster.fork() + worker + .on('listening', () => { + console.log(`worker ${worker.process.pid} listening`) + // worker.send('shutdown') + // worker.disconnect() + // timeout = setTimeout(() => { + // worker.kill() + // }, 5000) + }) + .on('disconnect', () => { + console.log(`worker ${worker.process.pid} disconnect`) + clearTimeout(timeout) + }) + .on('exit', (code, signal) => { + console.log(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`) + }) + + return worker +} + +if (cluster.isPrimary) { + console.log(`Primary ${process.pid} is running`) + + for (let i = 0; i < numCpus; i++) { + newWorker() + } + + cluster.on('exit', (deadWorker) => { + const worker = newWorker() + + // Note the process IDs + const newPID = worker.process.pid + const oldPID = deadWorker.process.pid + + // Log the event + console.log('worker ' + oldPID + ' died.') + console.log('worker ' + newPID + ' born.') + }) + + process.on('SIGINT', function () { + console.log('\rCaught interrupt signal') + + //await Promise.all(apps.map((app) => app.terminate())) + // for (const id in cluster.workers) { + // apps.get(cluster.workers[id]) + // } + + // await new Promise((resolve, reject) => + // wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) + // ) + // await new Promise((resolve, reject) => + // server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) + // ) + + for (const id in cluster.workers) { + console.log('id', id) + console.log(`shutting down worker ${cluster.workers[id].process.pid}`) + cluster.workers[id].send('shutdown') + } + + console.log('Disconnecting from db') + dbClient.destroy(() => { + console.log('Exiting') + process.exit() + }) + }) +} else if (cluster.isWorker) { + + const server = http.createServer() + const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) + const adapter = new WebSocketServerAdapter( + server, + wss, + webSocketAdapterFactory(eventRepository) ) - await new Promise((resolve, reject) => - server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) - ) - dbClient.destroy() - process.exit() -}) + + adapter.listen(port) + + process.on('message', async (msg) => { + console.log('worker received', msg) + if (msg === 'shutdown') { + console.log('disconnecting all clients') + wss.clients.forEach((client) => client.terminate()) + wss.close() + // server.close() + // await new Promise((resolve, reject) => + // wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) + // ) + // await new Promise((resolve, reject) => + // server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) + // ) + } + }) + + console.log(`Worker ${process.pid} started and listening on port ${port}`) +} diff --git a/src/utils/event.ts b/src/utils/event.ts index 63be785..f4092ea 100644 --- a/src/utils/event.ts +++ b/src/utils/event.ts @@ -172,10 +172,6 @@ export const isEphemeralEvent = (event: Event): boolean => { return event.kind >= 20000 && event.kind < 30000 } -export const isNullEvent = (event: Event): boolean => { - return event.kind === Number.MAX_SAFE_INTEGER -} - export const isDeleteEvent = (event: Event): boolean => { return event.kind === EventKinds.DELETE } diff --git a/src/utils/runes/alternative.ts b/src/utils/runes/alternative.ts index c45aaa4..82add1e 100644 --- a/src/utils/runes/alternative.ts +++ b/src/utils/runes/alternative.ts @@ -1,3 +1,4 @@ +import { IAlternative } from '../../@types/runes' const punctuations = /[!"#\$%&'()*+-.\/:;<=>?@\[\\\]^`{|}~]/ @@ -5,7 +6,7 @@ const hasPunctuation = (input) => punctuations.test(input) // Reference: https://github.com/rustyrussell/runes/blob/master/runes/runes.py -export class Alternative { +export class Alternative implements IAlternative { public constructor( private readonly field: string, private readonly cond: string, @@ -78,7 +79,7 @@ export class Alternative { return `${this.field}${this.cond}${this.value.replace(/[\\|&]/g, '\\$&')}` } - public static decode(encodedStr: string): [Alternative, string] { + public static decode(encodedStr: string): [IAlternative, string] { let cond = undefined let endOff = 0 @@ -118,7 +119,7 @@ export class Alternative { return [new Alternative(field, cond, value), encodedStr.slice(endOff)] } - public static from(encodedStr: string): Alternative { + public static from(encodedStr: string): IAlternative { const [field, cond, value] = encodedStr.replace(/\s+/g, '').split(new RegExp(`(${punctuations.source})`, 'g')) return new Alternative(field, cond, value) diff --git a/src/utils/runes/restriction.ts b/src/utils/runes/restriction.ts index 5174cad..7381617 100644 --- a/src/utils/runes/restriction.ts +++ b/src/utils/runes/restriction.ts @@ -1,9 +1,10 @@ +import { IAlternative, IRestriction } from '../../@types/runes' import { Alternative } from './alternative' -export class Restriction { +export class Restriction implements IRestriction { public constructor( - private readonly alternatives: Alternative[] + private readonly alternatives: IAlternative[] ) { if (!alternatives.length) { throw new Error('Restriction must have some alternatives') @@ -27,10 +28,10 @@ export class Restriction { return this.alternatives.map((alternative) => alternative.encode()).join('|') } - public static decode(encodedStr: string): [Restriction, string] { + public static decode(encodedStr: string): [IRestriction, string] { let encStr = encodedStr - let alternative: Alternative - const alternatives: Alternative[] = [] + let alternative: IAlternative + const alternatives: IAlternative[] = [] while (encStr.length) { if (encStr.startsWith('&')) { encStr = encStr.slice(1) diff --git a/src/utils/runes/rune-like.ts b/src/utils/runes/rune-like.ts index 0ba98ef..6e45697 100644 --- a/src/utils/runes/rune-like.ts +++ b/src/utils/runes/rune-like.ts @@ -1,9 +1,10 @@ +import { IRestriction, IRuneLike } from '../../@types/runes' import { Restriction } from './restriction' -export class RuneLike { +export class RuneLike implements IRuneLike { public constructor( - private readonly restrictions: Restriction[] = [] + private readonly restrictions: IRestriction[] ) { } public test(values: Record): [boolean, string] { @@ -17,13 +18,13 @@ export class RuneLike { return [true, ''] } - public encode() { + public encode(): string { return this.restrictions.map((restriction) => restriction.encode()).join('&') } - public static from(encodedStr: string): RuneLike { - const restrictions: Restriction[] = [] - let restriction: Restriction + public static from(encodedStr: string): IRuneLike { + const restrictions: IRestriction[] = [] + let restriction: IRestriction let encStr = encodedStr.replace(/\s+/g, '') while (encStr.length) {