From 17227107cb66e4d91a96e594a3ef370c18fca705 Mon Sep 17 00:00:00 2001 From: Ricardo Arturo Cabral Mejia Date: Sun, 11 Sep 2022 22:35:28 +0000 Subject: [PATCH] fix: clustering & broadcasting --- src/@types/adapters.ts | 2 - src/adapters/web-server-adapter.ts | 2 +- src/adapters/web-socket-adapter.ts | 20 +++-- src/adapters/web-socket-server-adapter.ts | 20 ++--- src/index.ts | 96 ++++++----------------- 5 files changed, 46 insertions(+), 94 deletions(-) diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index 5754b13..08556a9 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -1,9 +1,7 @@ import { EventEmitter } from 'node:stream' -import { WebSocket } from 'ws' export interface IWebSocketServerAdapter extends EventEmitter { getConnectedClients(): number - getClients(): Set terminate(): Promise } diff --git a/src/adapters/web-server-adapter.ts b/src/adapters/web-server-adapter.ts index a5c9ed7..b3cd7df 100644 --- a/src/adapters/web-server-adapter.ts +++ b/src/adapters/web-server-adapter.ts @@ -53,7 +53,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter } protected onClose() { - console.log('web server closing') + console.log(`worker ${process.pid} web server closing`) this.webServer.removeAllListeners() } } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index a529b89..6fd609b 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -20,6 +20,9 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter private alive: boolean private subscriptions: Map> + private sent = 0 + private received = 0 + public constructor( private readonly client: WebSocket, private readonly request: IncomingHttpMessage, @@ -30,8 +33,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter this.alive = true this.subscriptions = new Map() - this.id = Buffer.from(request.headers['sec-websocket-key'], 'base64').toString('hex') - this.clientAddress = request.headers['x-forwarded-for'] as string + this.id = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex') + this.clientAddress = this.request.headers['x-forwarded-for'] as string this.client .on('message', this.onClientMessage.bind(this)) @@ -70,6 +73,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private sendMessage(message: OutgoingMessage): void { + this.sent++ this.client.send(JSON.stringify(message)) } @@ -92,6 +96,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private terminate(): void { + console.debug(`worker ${process.pid} - terminating client`) this.client.terminate() } @@ -106,14 +111,16 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter this.client.prependOnceListener('close', abort) } + this.received++ + await messageHandler?.handleMessage(message) } catch (error) { if (error instanceof Error && error.name === 'AbortError') { - console.error('Message handler aborted') + console.error(`worker ${process.pid} - message handler aborted`) } else if (error instanceof Error && error.name === 'ValidationError') { - console.error('Invalid message', (error as any).annotate()) + console.error(`worker ${process.pid} - invalid message`, (error as any).annotate()) } else { - console.error(`Unable to handle message: ${error.message}`) + console.error(`worker ${process.pid} - unable to handle message: ${error.message}`) } } finally { if (abort) { @@ -128,8 +135,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter private onClientClose(code: number) { this.alive = false - const connected = this.webSocketServer.getConnectedClients() - console.debug(`client disconnected code ${code} - ${connected}/${this.webSocketServer.getClients().size} clients connected`) + console.debug(`worker ${process.pid} - client disconnected with code ${code}`) this.removeAllListeners() this.client.removeAllListeners() diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index 294cd23..9941e89 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -28,11 +28,13 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.webSocketsAdapters = new WeakMap() - this.webSocketServer - .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this)) - .on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this)) + this .on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this)) + this.webSocketServer + .on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this)) + .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this)) + this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) } @@ -56,33 +58,27 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock }) } - public getClients(): Set { - return this.webSocketServer.clients - } - public getConnectedClients(): number { return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length } private onConnection(client: WebSocket, req: IncomingMessage) { - console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) - + console.debug(`worker ${process.pid} - new client - ${this.webSocketServer.clients.size} clients`) this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) } private onHeartbeat() { - console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Heartbeat) ) } + protected onClose() { this.webSocketServer.clients.forEach((webSocket: WebSocket) => - webSocket.terminate() ) - console.debug('websocket server closing') + console.debug(`worker ${process.pid} - websocket server closing`) clearInterval(this.heartbeatInterval) this.webSocketServer.removeAllListeners() super.onClose() diff --git a/src/index.ts b/src/index.ts index 8eee187..a4cc862 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,83 +9,41 @@ import { getDbClient } from './database/client' import { webSocketAdapterFactory } from './factories/websocket-adapter-factory' import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter' -const dbClient = getDbClient() -const eventRepository = new EventRepository(dbClient) - -const numCpus = cpus().length -const port = Number(process.env.SERVER_PORT) || 8008 - - const newWorker = (): Worker => { - let timeout - const worker = cluster.fork() - worker - .on('listening', () => { - console.log(`worker ${worker.process.pid} listening`) - // worker.send('shutdown') - // worker.disconnect() - // timeout = setTimeout(() => { - // worker.kill() - // }, 5000) - }) - .on('disconnect', () => { - console.log(`worker ${worker.process.pid} disconnect`) - clearTimeout(timeout) - }) - .on('exit', (code, signal) => { - console.log(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`) - }) - - return worker + return cluster.fork() } if (cluster.isPrimary) { - console.log(`Primary ${process.pid} is running`) + console.log(`primary ${process.pid} - running`) + + const numCpus = cpus().length for (let i = 0; i < numCpus; i++) { newWorker() } - cluster.on('exit', (deadWorker) => { + cluster.on('exit', (deadWorker, code, signal) => { + console.log(`worker ${deadWorker.process.pid} - exiting`) + if (signal === 'SIGINT') { + return + } const worker = newWorker() - // Note the process IDs const newPID = worker.process.pid const oldPID = deadWorker.process.pid - // Log the event console.log('worker ' + oldPID + ' died.') console.log('worker ' + newPID + ' born.') }) - process.on('SIGINT', function () { - console.log('\rCaught interrupt signal') - - //await Promise.all(apps.map((app) => app.terminate())) - // for (const id in cluster.workers) { - // apps.get(cluster.workers[id]) - // } - - // await new Promise((resolve, reject) => - // wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) - // ) - // await new Promise((resolve, reject) => - // server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) - // ) - - for (const id in cluster.workers) { - console.log('id', id) - console.log(`shutting down worker ${cluster.workers[id].process.pid}`) - cluster.workers[id].send('shutdown') - } - - console.log('Disconnecting from db') - dbClient.destroy(() => { - console.log('Exiting') - process.exit() - }) + process.on('exit', () => { + console.log('Primary exiting') }) } else if (cluster.isWorker) { + const port = Number(process.env.SERVER_PORT) || 8008 + + const dbClient = getDbClient() + const eventRepository = new EventRepository(dbClient) const server = http.createServer() const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) @@ -97,21 +55,15 @@ if (cluster.isPrimary) { adapter.listen(port) - process.on('message', async (msg) => { - console.log('worker received', msg) - if (msg === 'shutdown') { - console.log('disconnecting all clients') - wss.clients.forEach((client) => client.terminate()) - wss.close() - // server.close() - // await new Promise((resolve, reject) => - // wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) - // ) - // await new Promise((resolve, reject) => - // server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined)) - // ) - } + process.on('SIGINT', () => { + wss.close(() => { + server.close(() => { + dbClient.destroy(() => { + process.exit(0) + }) + }) + }) }) - console.log(`Worker ${process.pid} started and listening on port ${port}`) + console.log(`worker ${process.pid} - listening on port ${port}`) }