chore: major refactor

This commit is contained in:
Ricardo Arturo Cabral Mejia
2022-08-13 05:29:53 +00:00
parent a7d8561bea
commit bd5360ee7e
29 changed files with 926 additions and 401 deletions

View File

@@ -1,127 +1,69 @@
import { Server } from 'http'
import WebSocket, { WebSocketServer } from 'ws'
import WebSocket, { OPEN, WebSocketServer } from 'ws'
import { isEventMatchingFilter } from '../utils/event'
import { createOutgoingEventMessage } from '../messages'
import { Event } from '../@types/event'
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
import { Message } from '../@types/messages'
import { IWebSocketServerAdapter } from '../@types/servers'
import { SubscriptionId, SubscriptionFilter } from '../@types/subscription'
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
import { WebServerAdapter } from './web-server-adapter'
import { Factory } from '../@types/base'
import { propEq } from 'ramda'
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000
export class WebSocketServerAdapter extends WebServerAdapter implements IWebSocketServerAdapter {
private subscriptions: WeakMap<
WebSocket,
Map<SubscriptionId, SubscriptionFilter[]>
>
private heartbeats: WeakMap<WebSocket, boolean>
private readonly handlers: IMessageHandler[] = []
private webSocketsAdapters: WeakMap<WebSocket, IWebSocketAdapter>
private heartbeatInterval: NodeJS.Timer
public constructor(
webServer: Server,
private readonly webSocketServer: WebSocketServer,
private readonly messageHandlerFactory: Factory<IMessageHandler, [Message, IWebSocketServerAdapter]>,
private readonly createWebSocketAdapter: Factory<IWebSocketAdapter, [WebSocket, IWebSocketServerAdapter]>
) {
super(webServer)
this.subscriptions = new WeakMap<WebSocket,
Map<SubscriptionId, SubscriptionFilter[]>>()
this.heartbeats = new WeakMap<WebSocket, boolean>()
this.webSocketsAdapters = new WeakMap()
this.webSocketServer
.on('connection', this.onWebSocketServerConnection.bind(this))
.on('close', this.onWebSocketServerClose.bind(this))
this.webSocketServer.on('connection', this.onWebSocketServerConnection.bind(this))
this.webSocketServer.on('close', this.onWebSocketServerClose.bind(this))
this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
}
public getSubscriptions(client: WebSocket): Map<string, SubscriptionFilter[]> | undefined {
return this.subscriptions.get(client)
public getClients(): Set<WebSocket.WebSocket> {
return this.webSocketServer.clients
}
public getConnectedClients(): number {
return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length
}
public async broadcastEvent(event: Event): Promise<void> {
this.webSocketServer.clients.forEach((client: WebSocket) => {
if (client.readyState !== WebSocket.OPEN) {
this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
if (!propEq('readyState', OPEN)(webSocket)) {
return
}
this.subscriptions.get(client)?.forEach((filters, subscriptionId) => {
if (
!filters.map(isEventMatchingFilter).some((isMatch) => isMatch(event))
) {
return
}
console.log('Event sent', event.id)
client.send(
JSON.stringify(createOutgoingEventMessage(subscriptionId, event))
)
})
this.webSocketsAdapters.get(webSocket).emit('broadcast', event)
})
}
private onWebSocketServerConnection(client: WebSocket) {
this.heartbeats.set(client, true)
this.subscriptions.set(client, new Map())
console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
client.on('message', (raw: WebSocket.RawData) => this.onWebSocketClientMessage(client, raw))
client.on('close', (code: number) => this.onWebSocketClientClose(client, code))
client.on('pong', (data: Buffer) => this.onWebSocketClientPong(client, data))
}
private async onWebSocketClientMessage(client: WebSocket, raw: WebSocket.RawData) {
let abort
try {
const message = JSON.parse(raw.toString('utf-8'))
const messageHandler = this.messageHandlerFactory([message, this]) as IMessageHandler & IAbortable
if (typeof messageHandler.abort === 'function') {
abort = messageHandler.abort.bind(messageHandler)
client.once('close', abort)
}
await messageHandler?.handleMessage(message, client)
} catch (error) {
console.error(`Unable to handle message: ${error.message}`)
} finally {
if (abort) {
client.removeEventListener('close', abort)
}
}
}
private onWebSocketClientPong(client: WebSocket, _data: Buffer) {
this.heartbeats.set(client, true)
this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, this]))
}
private onWebSocketServerHeartbeat() {
this.webSocketServer.clients.forEach((client) => {
if (!this.heartbeats.get(client)) {
console.warn('terminating client')
client.terminate()
return
}
this.heartbeats.set(client, false)
client.ping()
})
console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat'))
}
private onWebSocketServerClose() {
console.debug('websocket server closing')
clearInterval(this.heartbeatInterval)
this.webSocketServer.removeAllListeners()
}
private onWebSocketClientClose(client: WebSocket, code: number) {
console.debug('client closing', code)
this.subscriptions.delete(client)
client.removeAllListeners()
}
}