diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index cc9a7ef..ee0bb11 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -1,13 +1,9 @@ import { EventEmitter } from 'node:stream' import { WebSocket } from 'ws' -import { Event } from './event' -import { OutgoingMessage } from './messages' - -export interface IWebSocketServerAdapter { +export interface IWebSocketServerAdapter extends EventEmitter { getConnectedClients(): number getClients(): Set - broadcastEvent(event: Event): Promise } export interface IWebServerAdapter extends EventEmitter { @@ -15,7 +11,4 @@ export interface IWebServerAdapter extends EventEmitter { } -export interface IWebSocketAdapter extends EventEmitter { - getWebSocketServer(): IWebSocketServerAdapter - sendMessage(message: OutgoingMessage): void -} +export type IWebSocketAdapter = EventEmitter diff --git a/src/adapters/web-server-adapter.ts b/src/adapters/web-server-adapter.ts index 74ef12b..a5c9ed7 100644 --- a/src/adapters/web-server-adapter.ts +++ b/src/adapters/web-server-adapter.ts @@ -52,7 +52,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') } - private onClose() { + protected onClose() { console.log('web server closing') this.webServer.removeAllListeners() } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index 9a848eb..bb32bb5 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -12,6 +12,7 @@ import { Event } from '../@types/event' import { Factory } from '../@types/base' import { isEventMatchingFilter } from '../utils/event' import { messageSchema } from '../schemas/message-schema' +import { WebSocketAdapterEvent } from '../constants/adapter' export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter { private id: string @@ -41,11 +42,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter .on('heartbeat', this.onHeartbeat.bind(this)) .on('subscribe', this.onSubscribed.bind(this)) .on('unsubscribe', this.onUnsubscribed.bind(this)) - .on('broadcast', this.onBroadcast.bind(this)) - } - - public getWebSocketServer(): IWebSocketServerAdapter { - return this.webSocketServer + .on(WebSocketAdapterEvent.Send, this.onSend.bind(this)) + .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this)) } public onUnsubscribed(subscriptionId: string): void { @@ -57,6 +55,10 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } public onBroadcast(event: Event): void { + this.webSocketServer.emit('broadcast', event) + } + + public onSend(event: Event): void { this.subscriptions.forEach((filters, subscriptionId) => { if ( Array.from(filters).map(isEventMatchingFilter).some((Matches) => Matches(event)) diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index 35ad862..8cbb984 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -2,6 +2,7 @@ import { IncomingMessage, Server } from 'http' import WebSocket, { OPEN, WebSocketServer } from 'ws' import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters' +import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter' import { Event } from '../@types/event' import { Factory } from '../@types/base' import { propEq } from 'ramda' @@ -28,10 +29,21 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.webSocketsAdapters = new WeakMap() this.webSocketServer - .on('connection', this.onWebSocketServerConnection.bind(this)) - .on('close', this.onWebSocketServerClose.bind(this)) + .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this)) + .on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this)) + .on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this)) - this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) + this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) + } + + private onBroadcast(event: Event) { + this.webSocketServer.clients.forEach((webSocket: WebSocket) => { + if (!propEq('readyState', OPEN)(webSocket)) { + return + } + + this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Send, event) + }) } public getClients(): Set { @@ -42,31 +54,22 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length } - public async broadcastEvent(event: Event): Promise { - this.webSocketServer.clients.forEach((webSocket: WebSocket) => { - if (!propEq('readyState', OPEN)(webSocket)) { - return - } - - this.webSocketsAdapters.get(webSocket).emit('broadcast', event) - }) - } - - private onWebSocketServerConnection(client: WebSocket, req: IncomingMessage) { + private onConnection(client: WebSocket, req: IncomingMessage) { console.debug(`new client - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) } - private onWebSocketServerHeartbeat() { + private onHeartbeat() { console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`) this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat')) } - private onWebSocketServerClose() { + protected onClose() { console.debug('websocket server closing') clearInterval(this.heartbeatInterval) this.webSocketServer.removeAllListeners() + super.onClose() } } diff --git a/src/constants/adapter.ts b/src/constants/adapter.ts new file mode 100644 index 0000000..df4d2c4 --- /dev/null +++ b/src/constants/adapter.ts @@ -0,0 +1,10 @@ +export enum WebSocketAdapterEvent { + Send = 'send', + Broadcast = 'broadcast' +} + +export enum WebSocketServerAdapterEvent { + Broadcast = 'broadcast', + Close = 'close', + Connection = 'connection' +} diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index ab1a524..627450a 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -9,7 +9,7 @@ import { IWebSocketAdapter } from '../@types/adapters' export class EventMessageHandler implements IMessageHandler { public constructor( private readonly webSocket: IWebSocketAdapter, - private readonly strategyFactory: Factory>, [Event, IWebSocketAdapter]> + private readonly strategyFactory: Factory>, [Event, IWebSocketAdapter]> ) { } public async handleMessage(message: IncomingEventMessage): Promise { diff --git a/src/handlers/event-strategies/default-event-strategy.ts b/src/handlers/event-strategies/default-event-strategy.ts index d9a5db7..b1f3e5a 100644 --- a/src/handlers/event-strategies/default-event-strategy.ts +++ b/src/handlers/event-strategies/default-event-strategy.ts @@ -2,29 +2,20 @@ import { Event } from '../../@types/event' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' +import { WebSocketAdapterEvent } from '../../constants/adapter' -export class DefaultEventStrategy implements IEventStrategy> { +export class DefaultEventStrategy implements IEventStrategy> { public constructor( private readonly webSocket: IWebSocketAdapter, private readonly eventRepository: IEventRepository, ) { } - public async execute(event: Event): Promise { - try { - const count = await this.eventRepository.create(event) - if (!count) { - return true - } - - await this.webSocket.getWebSocketServer().broadcastEvent(event) - - return true - } catch (error) { - console.error('Unable to handle event. Reason:', error) - - return false + public async execute(event: Event): Promise { + const count = await this.eventRepository.create(event) + if (!count) { + return } + this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } - } diff --git a/src/handlers/event-strategies/delete-event-strategy.ts b/src/handlers/event-strategies/delete-event-strategy.ts index 58fab84..1909207 100644 --- a/src/handlers/event-strategies/delete-event-strategy.ts +++ b/src/handlers/event-strategies/delete-event-strategy.ts @@ -2,32 +2,33 @@ import { Event } from '../../@types/event' import { EventTags } from '../../constants/base' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' +import { IWebSocketAdapter } from '../../@types/adapters' +import { WebSocketAdapterEvent } from '../../constants/adapter' -export class DeleteEventStrategy implements IEventStrategy> { +export class DeleteEventStrategy implements IEventStrategy> { public constructor( + private readonly webSocket: IWebSocketAdapter, private readonly eventRepository: IEventRepository, ) { } - public async execute(event: Event): Promise { - try { - const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event) - - if (!eTags.length) { - return - } - - await this.eventRepository.deleteByPubkeyAndIds( - event.pubkey, - eTags.map((tag) => tag[1]) - ) + public async execute(event: Event): Promise { + const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event) + if (!eTags.length) { return - } catch (error) { - console.error('Unable to handle event. Reason:', error) - - return false } + + const count = await this.eventRepository.create(event) + if (!count) { + return + } + + await this.eventRepository.deleteByPubkeyAndIds( + event.pubkey, + eTags.map((tag) => tag[1]) + ) + this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } } diff --git a/src/handlers/event-strategies/ephemeral-event-strategy.ts b/src/handlers/event-strategies/ephemeral-event-strategy.ts index 80afbc7..39883b2 100644 --- a/src/handlers/event-strategies/ephemeral-event-strategy.ts +++ b/src/handlers/event-strategies/ephemeral-event-strategy.ts @@ -1,20 +1,15 @@ import { Event } from '../../@types/event' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' +import { WebSocketAdapterEvent } from '../../constants/adapter' -export class EphemeralEventStrategy implements IEventStrategy> { +export class EphemeralEventStrategy implements IEventStrategy> { public constructor( private readonly webSocket: IWebSocketAdapter, ) { } - public async execute(event: Event): Promise { - try { - await this.webSocket.getWebSocketServer().broadcastEvent(event) - } catch (error) { - console.error('Unable to handle event. Reason:', error) - - return false - } + public async execute(event: Event): Promise { + this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } } diff --git a/src/handlers/event-strategies/null-event-strategy.ts b/src/handlers/event-strategies/null-event-strategy.ts index 121151d..4d8f91c 100644 --- a/src/handlers/event-strategies/null-event-strategy.ts +++ b/src/handlers/event-strategies/null-event-strategy.ts @@ -3,8 +3,8 @@ import { IEventStrategy } from '../../@types/message-handlers' /** * An event strategy that refuses to do anything useful */ -export class NullEventStrategy implements IEventStrategy> { - public async execute(): Promise { - return true +export class NullEventStrategy implements IEventStrategy> { + public async execute(): Promise { + return } } diff --git a/src/handlers/event-strategies/replaceable-event-strategy.ts b/src/handlers/event-strategies/replaceable-event-strategy.ts index 3ceb33d..69fd9d2 100644 --- a/src/handlers/event-strategies/replaceable-event-strategy.ts +++ b/src/handlers/event-strategies/replaceable-event-strategy.ts @@ -2,28 +2,21 @@ import { Event } from '../../@types/event' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' +import { WebSocketAdapterEvent } from '../../constants/adapter' -export class ReplaceableEventStrategy implements IEventStrategy> { +export class ReplaceableEventStrategy implements IEventStrategy> { public constructor( private readonly webSocket: IWebSocketAdapter, private readonly eventRepository: IEventRepository, ) { } - public async execute(event: Event): Promise { - try { - const count = await this.eventRepository.upsert(event) - if (!count) { - return true - } - - await this.webSocket.getWebSocketServer().broadcastEvent(event) - - return true - } catch (error) { - console.error('Unable to handle event. Reason:', error) - - return false + public async execute(event: Event): Promise { + const count = await this.eventRepository.upsert(event) + if (!count) { + return } + + this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } } diff --git a/test/unit/handlers/event-strategies/default-event-strategy.spec.ts b/test/unit/handlers/event-strategies/default-event-strategy.spec.ts new file mode 100644 index 0000000..c232870 --- /dev/null +++ b/test/unit/handlers/event-strategies/default-event-strategy.spec.ts @@ -0,0 +1,78 @@ +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' + +chai.use(chaiAsPromised) + +import { DatabaseClient } from '../../../../src/@types/base' +import { DefaultEventStrategy } from '../../../../src/handlers/event-strategies/default-event-strategy' +import { Event } from '../../../../src/@types/event' +import { EventRepository } from '../../../../src/repositories/event-repository' +import { IEventRepository } from '../../../../src/@types/repositories' +import { IEventStrategy } from '../../../../src/@types/message-handlers' +import { IWebSocketAdapter } from '../../../../src/@types/adapters' +import { WebSocketAdapterEvent } from '../../../../src/constants/adapter' + +const { expect } = chai + +describe('DefaultEventStrategy', () => { + const event: Event = {} as any + let webSocket: IWebSocketAdapter + let eventRepository: IEventRepository + + let webSocketEmitStub: Sinon.SinonStub + let eventRepositoryCreateStub: Sinon.SinonStub + + let strategy: IEventStrategy> + + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + + eventRepositoryCreateStub = sandbox.stub(EventRepository.prototype, 'create') + + webSocketEmitStub = sandbox.stub() + webSocket = { + emit: webSocketEmitStub, + } as any + const client: DatabaseClient = {} as any + eventRepository = new EventRepository(client) + + strategy = new DefaultEventStrategy(webSocket, eventRepository) + }) + + afterEach(() => { + sandbox.restore() + }) + + describe('execute', () => { + it('creates event', async () => { + await strategy.execute(event) + + expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event) + }) + + it('broadcast event if event is created', async () => { + eventRepositoryCreateStub.resolves(1) + + await strategy.execute(event) + + expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event) + expect(webSocketEmitStub).to.have.been.calledOnceWithExactly( + WebSocketAdapterEvent.Broadcast, + event + ) + }) + + it('rejects if unable to create event', async () => { + const error = new Error() + eventRepositoryCreateStub.rejects(error) + + await expect(strategy.execute(event)).to.eventually.be.rejectedWith(error) + + expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly(event) + expect(webSocketEmitStub).not.to.have.been.called + }) + }) +})