fix: clustering & broadcasting

This commit is contained in:
Ricardo Arturo Cabral Mejia 2022-09-11 22:35:28 +00:00
parent 58c8a1371a
commit 17227107cb
No known key found for this signature in database
GPG Key ID: 5931EBF43A650245
5 changed files with 46 additions and 94 deletions

View File

@ -1,9 +1,7 @@
import { EventEmitter } from 'node:stream'
import { WebSocket } from 'ws'
export interface IWebSocketServerAdapter extends EventEmitter {
getConnectedClients(): number
getClients(): Set<WebSocket>
terminate(): Promise<void>
}

View File

@ -53,7 +53,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter
}
protected onClose() {
console.log('web server closing')
console.log(`worker ${process.pid} web server closing`)
this.webServer.removeAllListeners()
}
}

View File

@ -20,6 +20,9 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private alive: boolean
private subscriptions: Map<SubscriptionId, Set<SubscriptionFilter>>
private sent = 0
private received = 0
public constructor(
private readonly client: WebSocket,
private readonly request: IncomingHttpMessage,
@ -30,8 +33,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
this.alive = true
this.subscriptions = new Map()
this.id = Buffer.from(request.headers['sec-websocket-key'], 'base64').toString('hex')
this.clientAddress = request.headers['x-forwarded-for'] as string
this.id = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
this.clientAddress = this.request.headers['x-forwarded-for'] as string
this.client
.on('message', this.onClientMessage.bind(this))
@ -70,6 +73,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}
private sendMessage(message: OutgoingMessage): void {
this.sent++
this.client.send(JSON.stringify(message))
}
@ -92,6 +96,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}
private terminate(): void {
console.debug(`worker ${process.pid} - terminating client`)
this.client.terminate()
}
@ -106,14 +111,16 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
this.client.prependOnceListener('close', abort)
}
this.received++
await messageHandler?.handleMessage(message)
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.error('Message handler aborted')
console.error(`worker ${process.pid} - message handler aborted`)
} else if (error instanceof Error && error.name === 'ValidationError') {
console.error('Invalid message', (error as any).annotate())
console.error(`worker ${process.pid} - invalid message`, (error as any).annotate())
} else {
console.error(`Unable to handle message: ${error.message}`)
console.error(`worker ${process.pid} - unable to handle message: ${error.message}`)
}
} finally {
if (abort) {
@ -128,8 +135,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private onClientClose(code: number) {
this.alive = false
const connected = this.webSocketServer.getConnectedClients()
console.debug(`client disconnected code ${code} - ${connected}/${this.webSocketServer.getClients().size} clients connected`)
console.debug(`worker ${process.pid} - client disconnected with code ${code}`)
this.removeAllListeners()
this.client.removeAllListeners()

View File

@ -28,11 +28,13 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
this.webSocketsAdapters = new WeakMap()
this.webSocketServer
.on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
.on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this))
this
.on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this))
this.webSocketServer
.on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this))
.on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
}
@ -56,33 +58,27 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
})
}
public getClients(): Set<WebSocket.WebSocket> {
return this.webSocketServer.clients
}
public getConnectedClients(): number {
return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length
}
private onConnection(client: WebSocket, req: IncomingMessage) {
console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
console.debug(`worker ${process.pid} - new client - ${this.webSocketServer.clients.size} clients`)
this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this]))
}
private onHeartbeat() {
console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
this.webSocketServer.clients.forEach((webSocket) =>
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Heartbeat)
)
}
protected onClose() {
this.webSocketServer.clients.forEach((webSocket: WebSocket) =>
webSocket.terminate()
)
console.debug('websocket server closing')
console.debug(`worker ${process.pid} - websocket server closing`)
clearInterval(this.heartbeatInterval)
this.webSocketServer.removeAllListeners()
super.onClose()

View File

@ -9,83 +9,41 @@ import { getDbClient } from './database/client'
import { webSocketAdapterFactory } from './factories/websocket-adapter-factory'
import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter'
const dbClient = getDbClient()
const eventRepository = new EventRepository(dbClient)
const numCpus = cpus().length
const port = Number(process.env.SERVER_PORT) || 8008
const newWorker = (): Worker => {
let timeout
const worker = cluster.fork()
worker
.on('listening', () => {
console.log(`worker ${worker.process.pid} listening`)
// worker.send('shutdown')
// worker.disconnect()
// timeout = setTimeout(() => {
// worker.kill()
// }, 5000)
})
.on('disconnect', () => {
console.log(`worker ${worker.process.pid} disconnect`)
clearTimeout(timeout)
})
.on('exit', (code, signal) => {
console.log(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
})
return worker
return cluster.fork()
}
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`)
console.log(`primary ${process.pid} - running`)
const numCpus = cpus().length
for (let i = 0; i < numCpus; i++) {
newWorker()
}
cluster.on('exit', (deadWorker) => {
cluster.on('exit', (deadWorker, code, signal) => {
console.log(`worker ${deadWorker.process.pid} - exiting`)
if (signal === 'SIGINT') {
return
}
const worker = newWorker()
// Note the process IDs
const newPID = worker.process.pid
const oldPID = deadWorker.process.pid
// Log the event
console.log('worker ' + oldPID + ' died.')
console.log('worker ' + newPID + ' born.')
})
process.on('SIGINT', function () {
console.log('\rCaught interrupt signal')
//await Promise.all(apps.map((app) => app.terminate()))
// for (const id in cluster.workers) {
// apps.get(cluster.workers[id])
// }
// await new Promise((resolve, reject) =>
// wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
// )
// await new Promise((resolve, reject) =>
// server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
// )
for (const id in cluster.workers) {
console.log('id', id)
console.log(`shutting down worker ${cluster.workers[id].process.pid}`)
cluster.workers[id].send('shutdown')
}
console.log('Disconnecting from db')
dbClient.destroy(() => {
console.log('Exiting')
process.exit()
})
process.on('exit', () => {
console.log('Primary exiting')
})
} else if (cluster.isWorker) {
const port = Number(process.env.SERVER_PORT) || 8008
const dbClient = getDbClient()
const eventRepository = new EventRepository(dbClient)
const server = http.createServer()
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
@ -97,21 +55,15 @@ if (cluster.isPrimary) {
adapter.listen(port)
process.on('message', async (msg) => {
console.log('worker received', msg)
if (msg === 'shutdown') {
console.log('disconnecting all clients')
wss.clients.forEach((client) => client.terminate())
wss.close()
// server.close()
// await new Promise((resolve, reject) =>
// wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
// )
// await new Promise((resolve, reject) =>
// server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
// )
}
process.on('SIGINT', () => {
wss.close(() => {
server.close(() => {
dbClient.destroy(() => {
process.exit(0)
})
})
})
})
console.log(`Worker ${process.pid} started and listening on port ${port}`)
console.log(`worker ${process.pid} - listening on port ${port}`)
}