chore: refactor adapters

This commit is contained in:
Ricardo Arturo Cabral Mejia
2022-08-29 03:46:26 +00:00
parent 814b489f91
commit b6c6269328
12 changed files with 158 additions and 92 deletions

View File

@@ -1,13 +1,9 @@
import { EventEmitter } from 'node:stream' import { EventEmitter } from 'node:stream'
import { WebSocket } from 'ws' import { WebSocket } from 'ws'
import { Event } from './event' export interface IWebSocketServerAdapter extends EventEmitter {
import { OutgoingMessage } from './messages'
export interface IWebSocketServerAdapter {
getConnectedClients(): number getConnectedClients(): number
getClients(): Set<WebSocket> getClients(): Set<WebSocket>
broadcastEvent(event: Event): Promise<void>
} }
export interface IWebServerAdapter extends EventEmitter { export interface IWebServerAdapter extends EventEmitter {
@@ -15,7 +11,4 @@ export interface IWebServerAdapter extends EventEmitter {
} }
export interface IWebSocketAdapter extends EventEmitter { export type IWebSocketAdapter = EventEmitter
getWebSocketServer(): IWebSocketServerAdapter
sendMessage(message: OutgoingMessage): void
}

View File

@@ -52,7 +52,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter
socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') socket.end('HTTP/1.1 400 Bad Request\r\n\r\n')
} }
private onClose() { protected onClose() {
console.log('web server closing') console.log('web server closing')
this.webServer.removeAllListeners() this.webServer.removeAllListeners()
} }

View File

@@ -12,6 +12,7 @@ import { Event } from '../@types/event'
import { Factory } from '../@types/base' import { Factory } from '../@types/base'
import { isEventMatchingFilter } from '../utils/event' import { isEventMatchingFilter } from '../utils/event'
import { messageSchema } from '../schemas/message-schema' import { messageSchema } from '../schemas/message-schema'
import { WebSocketAdapterEvent } from '../constants/adapter'
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter { export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
private id: string private id: string
@@ -41,11 +42,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
.on('heartbeat', this.onHeartbeat.bind(this)) .on('heartbeat', this.onHeartbeat.bind(this))
.on('subscribe', this.onSubscribed.bind(this)) .on('subscribe', this.onSubscribed.bind(this))
.on('unsubscribe', this.onUnsubscribed.bind(this)) .on('unsubscribe', this.onUnsubscribed.bind(this))
.on('broadcast', this.onBroadcast.bind(this)) .on(WebSocketAdapterEvent.Send, this.onSend.bind(this))
} .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this))
public getWebSocketServer(): IWebSocketServerAdapter {
return this.webSocketServer
} }
public onUnsubscribed(subscriptionId: string): void { public onUnsubscribed(subscriptionId: string): void {
@@ -57,6 +55,10 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
} }
public onBroadcast(event: Event): void { public onBroadcast(event: Event): void {
this.webSocketServer.emit('broadcast', event)
}
public onSend(event: Event): void {
this.subscriptions.forEach((filters, subscriptionId) => { this.subscriptions.forEach((filters, subscriptionId) => {
if ( if (
Array.from(filters).map(isEventMatchingFilter).some((Matches) => Matches(event)) Array.from(filters).map(isEventMatchingFilter).some((Matches) => Matches(event))

View File

@@ -2,6 +2,7 @@ import { IncomingMessage, Server } from 'http'
import WebSocket, { OPEN, WebSocketServer } from 'ws' import WebSocket, { OPEN, WebSocketServer } from 'ws'
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters' import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
import { Event } from '../@types/event' import { Event } from '../@types/event'
import { Factory } from '../@types/base' import { Factory } from '../@types/base'
import { propEq } from 'ramda' import { propEq } from 'ramda'
@@ -28,10 +29,21 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
this.webSocketsAdapters = new WeakMap() this.webSocketsAdapters = new WeakMap()
this.webSocketServer this.webSocketServer
.on('connection', this.onWebSocketServerConnection.bind(this)) .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
.on('close', this.onWebSocketServerClose.bind(this)) .on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this))
.on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this))
this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
}
private onBroadcast(event: Event) {
this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
if (!propEq('readyState', OPEN)(webSocket)) {
return
}
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Send, event)
})
} }
public getClients(): Set<WebSocket.WebSocket> { public getClients(): Set<WebSocket.WebSocket> {
@@ -42,31 +54,22 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length
} }
public async broadcastEvent(event: Event): Promise<void> { private onConnection(client: WebSocket, req: IncomingMessage) {
this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
if (!propEq('readyState', OPEN)(webSocket)) {
return
}
this.webSocketsAdapters.get(webSocket).emit('broadcast', event)
})
}
private onWebSocketServerConnection(client: WebSocket, req: IncomingMessage) {
console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this]))
} }
private onWebSocketServerHeartbeat() { private onHeartbeat() {
console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat')) this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat'))
} }
private onWebSocketServerClose() { protected onClose() {
console.debug('websocket server closing') console.debug('websocket server closing')
clearInterval(this.heartbeatInterval) clearInterval(this.heartbeatInterval)
this.webSocketServer.removeAllListeners() this.webSocketServer.removeAllListeners()
super.onClose()
} }
} }

10
src/constants/adapter.ts Normal file
View File

@@ -0,0 +1,10 @@
export enum WebSocketAdapterEvent {
Send = 'send',
Broadcast = 'broadcast'
}
export enum WebSocketServerAdapterEvent {
Broadcast = 'broadcast',
Close = 'close',
Connection = 'connection'
}

View File

@@ -9,7 +9,7 @@ import { IWebSocketAdapter } from '../@types/adapters'
export class EventMessageHandler implements IMessageHandler { export class EventMessageHandler implements IMessageHandler {
public constructor( public constructor(
private readonly webSocket: IWebSocketAdapter, private readonly webSocket: IWebSocketAdapter,
private readonly strategyFactory: Factory<IEventStrategy<Event, Promise<boolean>>, [Event, IWebSocketAdapter]> private readonly strategyFactory: Factory<IEventStrategy<Event, Promise<void>>, [Event, IWebSocketAdapter]>
) { } ) { }
public async handleMessage(message: IncomingEventMessage): Promise<void> { public async handleMessage(message: IncomingEventMessage): Promise<void> {

View File

@@ -2,29 +2,20 @@ import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories' import { IEventRepository } from '../../@types/repositories'
import { IEventStrategy } from '../../@types/message-handlers' import { IEventStrategy } from '../../@types/message-handlers'
import { IWebSocketAdapter } from '../../@types/adapters' import { IWebSocketAdapter } from '../../@types/adapters'
import { WebSocketAdapterEvent } from '../../constants/adapter'
export class DefaultEventStrategy implements IEventStrategy<Event, Promise<boolean>> { export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>> {
public constructor( public constructor(
private readonly webSocket: IWebSocketAdapter, private readonly webSocket: IWebSocketAdapter,
private readonly eventRepository: IEventRepository, private readonly eventRepository: IEventRepository,
) { } ) { }
public async execute(event: Event): Promise<boolean> { public async execute(event: Event): Promise<void> {
try { const count = await this.eventRepository.create(event)
const count = await this.eventRepository.create(event) if (!count) {
if (!count) { return
return true
}
await this.webSocket.getWebSocketServer().broadcastEvent(event)
return true
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
} }
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
} }
} }

View File

@@ -2,32 +2,33 @@ import { Event } from '../../@types/event'
import { EventTags } from '../../constants/base' import { EventTags } from '../../constants/base'
import { IEventRepository } from '../../@types/repositories' import { IEventRepository } from '../../@types/repositories'
import { IEventStrategy } from '../../@types/message-handlers' import { IEventStrategy } from '../../@types/message-handlers'
import { IWebSocketAdapter } from '../../@types/adapters'
import { WebSocketAdapterEvent } from '../../constants/adapter'
export class DeleteEventStrategy implements IEventStrategy<Event, Promise<boolean>> { export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>> {
public constructor( public constructor(
private readonly webSocket: IWebSocketAdapter,
private readonly eventRepository: IEventRepository, private readonly eventRepository: IEventRepository,
) { } ) { }
public async execute(event: Event): Promise<boolean> { public async execute(event: Event): Promise<void> {
try { const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event)
const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event)
if (!eTags.length) {
return
}
await this.eventRepository.deleteByPubkeyAndIds(
event.pubkey,
eTags.map((tag) => tag[1])
)
if (!eTags.length) {
return return
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
} }
const count = await this.eventRepository.create(event)
if (!count) {
return
}
await this.eventRepository.deleteByPubkeyAndIds(
event.pubkey,
eTags.map((tag) => tag[1])
)
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
} }
} }

View File

@@ -1,20 +1,15 @@
import { Event } from '../../@types/event' import { Event } from '../../@types/event'
import { IEventStrategy } from '../../@types/message-handlers' import { IEventStrategy } from '../../@types/message-handlers'
import { IWebSocketAdapter } from '../../@types/adapters' import { IWebSocketAdapter } from '../../@types/adapters'
import { WebSocketAdapterEvent } from '../../constants/adapter'
export class EphemeralEventStrategy implements IEventStrategy<Event, Promise<boolean>> { export class EphemeralEventStrategy implements IEventStrategy<Event, Promise<void>> {
public constructor( public constructor(
private readonly webSocket: IWebSocketAdapter, private readonly webSocket: IWebSocketAdapter,
) { } ) { }
public async execute(event: Event): Promise<boolean> { public async execute(event: Event): Promise<void> {
try { this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
await this.webSocket.getWebSocketServer().broadcastEvent(event)
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
}
} }
} }

View File

@@ -3,8 +3,8 @@ import { IEventStrategy } from '../../@types/message-handlers'
/** /**
* An event strategy that refuses to do anything useful * An event strategy that refuses to do anything useful
*/ */
export class NullEventStrategy implements IEventStrategy<void, Promise<boolean>> { export class NullEventStrategy implements IEventStrategy<void, Promise<void>> {
public async execute(): Promise<boolean> { public async execute(): Promise<void> {
return true return
} }
} }

View File

@@ -2,28 +2,21 @@ import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories' import { IEventRepository } from '../../@types/repositories'
import { IEventStrategy } from '../../@types/message-handlers' import { IEventStrategy } from '../../@types/message-handlers'
import { IWebSocketAdapter } from '../../@types/adapters' import { IWebSocketAdapter } from '../../@types/adapters'
import { WebSocketAdapterEvent } from '../../constants/adapter'
export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<boolean>> { export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<void>> {
public constructor( public constructor(
private readonly webSocket: IWebSocketAdapter, private readonly webSocket: IWebSocketAdapter,
private readonly eventRepository: IEventRepository, private readonly eventRepository: IEventRepository,
) { } ) { }
public async execute(event: Event): Promise<boolean> { public async execute(event: Event): Promise<void> {
try { const count = await this.eventRepository.upsert(event)
const count = await this.eventRepository.upsert(event) if (!count) {
if (!count) { return
return true
}
await this.webSocket.getWebSocketServer().broadcastEvent(event)
return true
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
} }
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
} }
} }

View File

@@ -0,0 +1,78 @@
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import Sinon from 'sinon'
chai.use(chaiAsPromised)
import { DatabaseClient } from '../../../../src/@types/base'
import { DefaultEventStrategy } from '../../../../src/handlers/event-strategies/default-event-strategy'
import { Event } from '../../../../src/@types/event'
import { EventRepository } from '../../../../src/repositories/event-repository'
import { IEventRepository } from '../../../../src/@types/repositories'
import { IEventStrategy } from '../../../../src/@types/message-handlers'
import { IWebSocketAdapter } from '../../../../src/@types/adapters'
import { WebSocketAdapterEvent } from '../../../../src/constants/adapter'
const { expect } = chai
describe('DefaultEventStrategy', () => {
const event: Event = {} as any
let webSocket: IWebSocketAdapter
let eventRepository: IEventRepository
let webSocketEmitStub: Sinon.SinonStub
let eventRepositoryCreateStub: Sinon.SinonStub
let strategy: IEventStrategy<Event, Promise<void>>
let sandbox: Sinon.SinonSandbox
beforeEach(() => {
sandbox = Sinon.createSandbox()
eventRepositoryCreateStub = sandbox.stub(EventRepository.prototype, 'create')
webSocketEmitStub = sandbox.stub()
webSocket = {
emit: webSocketEmitStub,
} as any
const client: DatabaseClient = {} as any
eventRepository = new EventRepository(client)
strategy = new DefaultEventStrategy(webSocket, eventRepository)
})
afterEach(() => {
sandbox.restore()
})
describe('execute', () => {
it('creates event', async () => {
await strategy.execute(event)
expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event)
})
it('broadcast event if event is created', async () => {
eventRepositoryCreateStub.resolves(1)
await strategy.execute(event)
expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event)
expect(webSocketEmitStub).to.have.been.calledOnceWithExactly(
WebSocketAdapterEvent.Broadcast,
event
)
})
it('rejects if unable to create event', async () => {
const error = new Error()
eventRepositoryCreateStub.rejects(error)
await expect(strategy.execute(event)).to.eventually.be.rejectedWith(error)
expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event)
expect(webSocketEmitStub).not.to.have.been.called
})
})
})