diff --git a/package-lock.json b/package-lock.json index 9a32c5f..5fb39fe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "MIT", "dependencies": { "@noble/secp256k1": "1.7.0", + "debug": "4.3.4", "joi": "17.6.1", "knex": "2.3.0", "pg": "8.8.0", @@ -22,6 +23,7 @@ "@cucumber/pretty-formatter": "1.0.0", "@types/chai": "^4.3.1", "@types/chai-as-promised": "^7.1.5", + "@types/debug": "4.1.7", "@types/mocha": "^9.1.1", "@types/node": "^17.0.24", "@types/pg": "^8.6.5", @@ -1077,6 +1079,15 @@ "@types/chai": "*" } }, + "node_modules/@types/debug": { + "version": "4.1.7", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.7.tgz", + "integrity": "sha512-9AonUzyTjXXhEOa0DnqpzZi6VHlqKMswga9EXjpXnnqxwLtdvPPtlO8evrI5D9S6asFRCQ6v+wpiUKbw+vKqyg==", + "dev": true, + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/json-schema": { "version": "7.0.11", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.11.tgz", @@ -1095,6 +1106,12 @@ "integrity": "sha512-Z61JK7DKDtdKTWwLeElSEBcWGRLY8g95ic5FoQqI9CMx0ns/Ghep3B4DfcEimiKMvtamNVULVNKEsiwV3aQmXw==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.31", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz", + "integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==", + "dev": true + }, "node_modules/@types/node": { "version": "17.0.24", "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.24.tgz", @@ -6507,6 +6524,15 @@ "@types/chai": "*" } }, + "@types/debug": { + "version": "4.1.7", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.7.tgz", + "integrity": "sha512-9AonUzyTjXXhEOa0DnqpzZi6VHlqKMswga9EXjpXnnqxwLtdvPPtlO8evrI5D9S6asFRCQ6v+wpiUKbw+vKqyg==", + "dev": true, + "requires": { + "@types/ms": "*" + } + }, "@types/json-schema": { "version": "7.0.11", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.11.tgz", @@ -6525,6 +6551,12 @@ "integrity": "sha512-Z61JK7DKDtdKTWwLeElSEBcWGRLY8g95ic5FoQqI9CMx0ns/Ghep3B4DfcEimiKMvtamNVULVNKEsiwV3aQmXw==", "dev": true }, + "@types/ms": { + "version": "0.7.31", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz", + "integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==", + "dev": true + }, "@types/node": { "version": "17.0.24", "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.24.tgz", diff --git a/package.json b/package.json index a4dcffe..070dd88 100644 --- a/package.json +++ b/package.json @@ -64,6 +64,7 @@ "@cucumber/pretty-formatter": "1.0.0", "@types/chai": "^4.3.1", "@types/chai-as-promised": "^7.1.5", + "@types/debug": "4.1.7", "@types/mocha": "^9.1.1", "@types/node": "^17.0.24", "@types/pg": "^8.6.5", @@ -89,8 +90,9 @@ }, "dependencies": { "@noble/secp256k1": "1.7.0", + "debug": "4.3.4", "joi": "17.6.1", - "knex": "^2.3.0", + "knex": "2.3.0", "pg": "8.8.0", "pg-query-stream": "4.2.3", "ramda": "0.28.0", diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index de3fcba..d30a4ca 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -12,5 +12,6 @@ export interface IWebServerAdapter extends EventEmitter { export type IWebSocketAdapter = EventEmitter & { + getClientId(): string getSubscriptions(): Map } diff --git a/src/adapters/web-server-adapter.ts b/src/adapters/web-server-adapter.ts index c31abc9..5b0beb1 100644 --- a/src/adapters/web-server-adapter.ts +++ b/src/adapters/web-server-adapter.ts @@ -1,27 +1,39 @@ import { Duplex, EventEmitter } from 'stream' import { IncomingMessage, Server, ServerResponse } from 'http' + import packageJson from '../../package.json' +import { createLogger } from '../factories/logger-factory' import { ISettings } from '../@types/settings' import { IWebServerAdapter } from '../@types/adapters' -export class WebServerAdapter extends EventEmitter implements IWebServerAdapter { +const debug = createLogger('web-server-adapter') +export class WebServerAdapter extends EventEmitter implements IWebServerAdapter { public constructor( protected readonly webServer: Server, private readonly settings: () => ISettings, ) { + debug('web server starting') super() - this.webServer.on('request', this.onWebServerRequest.bind(this)) - .on('clientError', this.onWebServerSocketError.bind(this)) - .on('close', this.onClose.bind(this)) + this.webServer + .on('request', this.onRequest.bind(this)) + .on('clientError', this.onError.bind(this)) + .once('close', this.onClose.bind(this)) + .once('listening', this.onListening.bind(this)) } public listen(port: number): void { + debug('attempt to listen on port %d', port) this.webServer.listen(port) } - private onWebServerRequest(request: IncomingMessage, response: ServerResponse) { + private onListening() { + debug('listening for incoming connections') + } + + private onRequest(request: IncomingMessage, response: ServerResponse) { + debug('request received: %o', request) if (request.method === 'GET' && request.headers['accept'] === 'application/nostr+json') { const { info: { name, description, pubkey, contact }, @@ -38,14 +50,17 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter } response.setHeader('content-type', 'application/nostr+json') - response.end(JSON.stringify(relayInformationDocument)) + const body = JSON.stringify(relayInformationDocument) + response.end(body) } else { response.setHeader('content-type', 'application/text') response.end('Please use a Nostr client to connect.') } + debug('send response: %o', response) } - private onWebServerSocketError(error: Error, socket: Duplex) { + private onError(error: Error, socket: Duplex) { + debug('socket error: %o', error) if (error['code'] === 'ECONNRESET' || !socket.writable) { return } @@ -53,7 +68,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter } protected onClose() { - console.log(`worker ${process.pid} web server closing`) + debug('stopped listening to incoming connections') this.webServer.removeAllListeners() } } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index c0091be..adab4ea 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -10,20 +10,20 @@ import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter' import { attemptValidation } from '../utils/validation' +import { createLogger } from '../factories/logger-factory' import { Event } from '../@types/event' import { Factory } from '../@types/base' import { isEventMatchingFilter } from '../utils/event' import { messageSchema } from '../schemas/message-schema' +const debug = createLogger('web-socket-adapter') + export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter { - private id: string - private clientAddress: string + public clientId: string + // private clientAddress: string private alive: boolean private subscriptions: Map - private sent = 0 - private received = 0 - public constructor( private readonly client: WebSocket, private readonly request: IncomingHttpMessage, @@ -34,8 +34,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter this.alive = true this.subscriptions = new Map() - this.id = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex') - this.clientAddress = this.request.headers['x-forwarded-for'] as string + this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex') + // this.clientAddress = this.request.headers['x-forwarded-for'] as string this.client .on('message', this.onClientMessage.bind(this)) @@ -48,20 +48,30 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter .on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this)) .on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this)) .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this)) - .on(WebSocketAdapterEvent.Message, this.onSendMessage.bind(this)) + .on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this)) + + debug('client %s connected', this.clientId) + } + + public getClientId(): string { + return this.clientId } public onUnsubscribed(subscriptionId: string): void { + debug('client %s unsubscribed %s', this.clientId, subscriptionId) this.subscriptions.delete(subscriptionId) } public onSubscribed(subscriptionId: string, filters: SubscriptionFilter[]): void { + debug('client %s subscribed %s to %o', this.clientId, subscriptionId, filters) this.subscriptions.set(subscriptionId, filters) } public onBroadcast(event: Event): void { + debug('client %s broadcast event: %o', this.clientId, event) this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event) if (cluster.isWorker) { + debug('client %s broadcast event to primary: %o', this.clientId, event) process.send({ eventName: WebSocketServerAdapterEvent.Broadcast, event, @@ -80,22 +90,20 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private sendMessage(message: OutgoingMessage): void { - this.sent++ + debug('sending message to client %s: %o', this.clientId, message) this.client.send(JSON.stringify(message)) } - private onSendMessage(message: OutgoingMessage): void { - this.sendMessage(message) - } - public onHeartbeat(): void { if (!this.alive) { + debug('client %s pong timed out', this.clientId) this.terminate() return } this.alive = false this.client.ping() + debug('client %s ping', this.clientId) } public getSubscriptions(): Map { @@ -103,12 +111,13 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private terminate(): void { - console.debug(`worker ${process.pid} - terminating client`) + debug('terminating client %s', this.clientId) this.client.terminate() + debug('client %s terminated', this.clientId) } private async onClientMessage(raw: Buffer) { - let abort + let abort: () => void try { const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf-8'))) @@ -118,17 +127,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter this.client.prependOnceListener('close', abort) } - this.received++ - await messageHandler?.handleMessage(message) } catch (error) { if (error instanceof Error && error.name === 'AbortError') { - console.error(`worker ${process.pid} - message handler aborted`) + debug('message handler aborted') } else if (error instanceof Error && error.name === 'ValidationError') { - console.error(`worker ${process.pid} - invalid message`, (error as any).annotate()) + debug('invalid message: %o', (error as any).annotate()) this.sendMessage(createNoticeMessage(`Invalid message: ${error.message}`)) } else { - console.error(`worker ${process.pid} - unable to handle message: ${error.message}`) + debug('unable to handle message: %o', error) } } finally { if (abort) { @@ -138,13 +145,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private onClientPong() { + debug('client %s pong', this.clientId) this.alive = true } private onClientClose() { + debug('client %s closing', this.clientId) this.alive = false this.removeAllListeners() this.client.removeAllListeners() + + debug('client %s closed', this.clientId) } } diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index 5ee36e3..5da8578 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -3,12 +3,14 @@ import WebSocket, { OPEN, WebSocketServer } from 'ws' import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters' import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter' +import { createLogger } from '../factories/logger-factory' import { Event } from '../@types/event' import { Factory } from '../@types/base' import { ISettings } from '../@types/settings' import { propEq } from 'ramda' import { WebServerAdapter } from './web-server-adapter' +const debug = createLogger('web-socket-server-adapter') const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 @@ -36,9 +38,9 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.webSocketServer .on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this)) .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this)) - .on('error', (err) => { - console.error('error', err) - throw err + .on('error', (error) => { + debug('error: %o', error) + throw error }) this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) } @@ -55,8 +57,9 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock if (!propEq('readyState', OPEN)(webSocket)) { return } - - this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Event, event) + const webSocketAdapter = this.webSocketsAdapters.get(webSocket) + debug('broadcasting event to client %s: %o', webSocketAdapter.getClientId(), event) + webSocketAdapter.emit(WebSocketAdapterEvent.Event, event) }) } @@ -65,6 +68,7 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock } private onConnection(client: WebSocket, req: IncomingMessage) { + debug('client connected: %o', req.headers) this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) } @@ -75,13 +79,15 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock } protected onClose() { - this.webSocketServer.clients.forEach((webSocket: WebSocket) => - webSocket.terminate() - ) - console.debug(`worker ${process.pid} - websocket server closing`) + debug('closing') clearInterval(this.heartbeatInterval) + this.webSocketServer.clients.forEach((webSocket: WebSocket) => { + debug('terminating client') + webSocket.terminate() + }) this.removeAllListeners() this.webSocketServer.removeAllListeners() super.onClose() + debug('closed') } } diff --git a/src/app/app.ts b/src/app/app.ts index 6c87c3a..09b1c55 100644 --- a/src/app/app.ts +++ b/src/app/app.ts @@ -1,58 +1,68 @@ import { Cluster, Worker } from 'cluster' import { cpus } from 'os' +import { createLogger } from '../factories/logger-factory' import { IRunnable } from '../@types/base' import { ISettings } from '../@types/settings' import packageJson from '../../package.json' import { Serializable } from 'child_process' +const debug = createLogger('app-primary') + export class App implements IRunnable { public constructor( private readonly process: NodeJS.Process, private readonly cluster: Cluster, private readonly settingsFactory: () => ISettings, ) { + debug('starting') + this.cluster .on('message', this.onClusterMessage.bind(this)) .on('exit', this.onClusterExit.bind(this)) this.process .on('SIGTERM', this.onExit.bind(this)) + + debug('started') } public run(): void { - console.log(`${packageJson.name}@${packageJson.version}`) - console.log(`supported NIPs: ${packageJson.supportedNips}`) - console.log(`primary ${this.process.pid} - running`) + debug('running %s version %s', packageJson.name, packageJson.version) + debug('supported NIPs: %o', packageJson.supportedNips) const workerCount = this.settingsFactory().workers?.count || cpus().length for (let i = 0; i < workerCount; i++) { + debug('starting worker') this.cluster.fork() } } private onClusterMessage(source: Worker, message: Serializable) { + debug('message received from worker %s: %o', source.process.pid, message) for (const worker of Object.values(this.cluster.workers)) { if (source.id === worker.id) { continue } + debug('sending message to worker %s: %o', worker.process.pid, message) worker.send(message) } } private onClusterExit(deadWorker: Worker, code: number, signal: string) { - console.log(`worker ${deadWorker.process.pid} - exiting`) - if (code === 0 || signal === 'SIGINT') { - return - } - - this.cluster.fork() + debug('worker %s died', deadWorker.process.pid) + if (code === 0 || signal === 'SIGINT') { + return + } + debug('starting worker') + const newWorker = this.cluster.fork() + debug('started worker %s', newWorker.process.pid) } private onExit() { - console.log('exiting...') + debug('exiting') this.process.exit(0) } } \ No newline at end of file diff --git a/src/app/worker.ts b/src/app/worker.ts index f675cfd..2f5a6f1 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,6 +1,9 @@ import { IRunnable } from '../@types/base' import { IWebSocketServerAdapter } from '../@types/adapters' +import { createLogger } from '../factories/logger-factory' + +const debug = createLogger('app-worker') export class AppWorker implements IRunnable { public constructor( private readonly process: NodeJS.Process, @@ -19,27 +22,27 @@ export class AppWorker implements IRunnable { const port = Number(process.env.SERVER_PORT) || 8008 this.adapter.listen(port) - - console.log(`worker ${process.pid} - listening on port ${port}`) } private onMessage(message: { eventName: string, event: unknown }): void { + debug('broadcast message received: %o', message) this.adapter.emit(message.eventName, message.event) } private onError(error: Error) { - console.error(`worker ${process.pid} - error`, error) + debug('error: %o', error) throw error } private onExit() { - console.log(`worker ${process.pid} - exiting`) + debug('exiting') this.close(() => { this.process.exit(0) }) } public close(callback?: () => void) { + debug('closing') this.adapter.close(callback) } } diff --git a/src/database/client.ts b/src/database/client.ts index e6f3ca6..96ebfcb 100644 --- a/src/database/client.ts +++ b/src/database/client.ts @@ -1,6 +1,9 @@ import 'pg' import 'pg-query-stream' import knex, { Knex } from 'knex' +import { createLogger } from '../factories/logger-factory' + +const debug = createLogger('database-client') const createDbConfig = (): Knex.Config => ({ client: 'pg', @@ -20,9 +23,12 @@ const createDbConfig = (): Knex.Config => ({ }) let client: Knex + export const getDbClient = () => { if (!client) { - client = knex(createDbConfig()) + const config = createDbConfig() + debug('config: %o', config) + client = knex(config) } return client diff --git a/src/factories/logger-factory.ts b/src/factories/logger-factory.ts new file mode 100644 index 0000000..a27d75e --- /dev/null +++ b/src/factories/logger-factory.ts @@ -0,0 +1,19 @@ +import cluster from 'cluster' +import debug from 'debug' + +export const createLogger = ( + namespace: string, + options: { enabled?: boolean; stdout?: boolean } = { enabled: false, stdout: false } +) => { + const prefix = cluster.isWorker ? 'worker' : 'primary' + const instance = debug(prefix) + if (options.enabled) { + debug.enable(`${prefix}:${namespace}:*`) + } + if (options.stdout) { + instance.log = console.log.bind(console) + } + const fn = instance.extend(namespace) + + return fn +} diff --git a/src/handlers/delegated-event-message-handler.ts b/src/handlers/delegated-event-message-handler.ts index 5e8080e..15f5ac7 100644 --- a/src/handlers/delegated-event-message-handler.ts +++ b/src/handlers/delegated-event-message-handler.ts @@ -2,6 +2,7 @@ import { mergeDeepLeft } from 'ramda' import { DelegatedEvent, Event } from '../@types/event' import { EventDelegatorMetadataKey, EventTags } from '../constants/base' +import { createLogger } from '../factories/logger-factory' import { createNoticeMessage } from '../utils/messages' import { EventMessageHandler } from './event-message-handler' import { IMessageHandler } from '../@types/message-handlers' @@ -9,20 +10,23 @@ import { IncomingEventMessage } from '../@types/messages' import { isDelegatedEventValid } from '../utils/event' import { WebSocketAdapterEvent } from '../constants/adapter' +const debug = createLogger('delegated-event-message-handler') + export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler { public async handleMessage(message: IncomingEventMessage): Promise { const [, event] = message let reason = this.canAcceptEvent(event) if (reason) { + debug('event %s rejected: %s', event.id, reason) this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`)) - console.warn(`Event ${event.id} rejected. Reason: ${reason}`) return } reason = await this.isEventValid(event) if (reason) { - console.warn(`Event ${event.id} rejected. Reason: ${reason}`) + debug('event %s rejected: %s', event.id, reason) + this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`)) return } @@ -43,7 +47,7 @@ export class DelegatedEventMessageHandler extends EventMessageHandler implements try { await strategy.execute(delegatedEvent) } catch (error) { - console.error('Error handling message:', message, error) + debug('error handling message %o: %o', message, error) } } diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index bada685..0a2198e 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -1,6 +1,7 @@ import { EventKindsRange, ISettings } from '../@types/settings' import { getEventProofOfWork, getPubkeyProofOfWork, isEventIdValid, isEventSignatureValid } from '../utils/event' import { IEventStrategy, IMessageHandler } from '../@types/message-handlers' +import { createLogger } from '../factories/logger-factory' import { createNoticeMessage } from '../utils/messages' import { Event } from '../@types/event' import { EventKinds } from '../constants/base' @@ -9,6 +10,8 @@ import { IncomingEventMessage } from '../@types/messages' import { IWebSocketAdapter } from '../@types/adapters' import { WebSocketAdapterEvent } from '../constants/adapter' +const debug = createLogger('event-message-handler') + export class EventMessageHandler implements IMessageHandler { public constructor( protected readonly webSocket: IWebSocketAdapter, @@ -17,18 +20,20 @@ export class EventMessageHandler implements IMessageHandler { ) { } public async handleMessage(message: IncomingEventMessage): Promise { + debug('received message: %o', message) const [, event] = message let reason = await this.isEventValid(event) if (reason) { - console.warn(`Event ${event.id} rejected. Reason: ${reason}`) + debug('event %s rejected: %s', event.id, reason) + this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`)) return } reason = this.canAcceptEvent(event) if (reason) { + debug('event %s rejected: %s', event.id, reason) this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`)) - console.warn(`Event ${event.id} rejected. Reason: ${reason}`) return } @@ -41,7 +46,7 @@ export class EventMessageHandler implements IMessageHandler { try { await strategy.execute(event) } catch (error) { - console.error('Error handling message:', message, error) + debug('error handling message %o: %o', message, error) } } diff --git a/src/handlers/event-strategies/default-event-strategy.ts b/src/handlers/event-strategies/default-event-strategy.ts index b1f3e5a..2cf0bcc 100644 --- a/src/handlers/event-strategies/default-event-strategy.ts +++ b/src/handlers/event-strategies/default-event-strategy.ts @@ -1,9 +1,11 @@ +import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' import { WebSocketAdapterEvent } from '../../constants/adapter' +const debug = createLogger('default-event-strategy') export class DefaultEventStrategy implements IEventStrategy> { public constructor( @@ -12,6 +14,7 @@ export class DefaultEventStrategy implements IEventStrategy ) { } public async execute(event: Event): Promise { + debug('received event: %o', event) const count = await this.eventRepository.create(event) if (!count) { return diff --git a/src/handlers/event-strategies/delete-event-strategy.ts b/src/handlers/event-strategies/delete-event-strategy.ts index 943ca42..4533475 100644 --- a/src/handlers/event-strategies/delete-event-strategy.ts +++ b/src/handlers/event-strategies/delete-event-strategy.ts @@ -1,3 +1,4 @@ +import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { EventTags } from '../../constants/base' import { IEventRepository } from '../../@types/repositories' @@ -5,6 +6,7 @@ import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' import { WebSocketAdapterEvent } from '../../constants/adapter' +const debug = createLogger('delete-event-strategy') export class DeleteEventStrategy implements IEventStrategy> { public constructor( @@ -13,6 +15,7 @@ export class DeleteEventStrategy implements IEventStrategy> ) { } public async execute(event: Event): Promise { + debug('received event: %o', event) await this.eventRepository.create(event) const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event) diff --git a/src/handlers/event-strategies/ephemeral-event-strategy.ts b/src/handlers/event-strategies/ephemeral-event-strategy.ts index 39883b2..8ed4cd6 100644 --- a/src/handlers/event-strategies/ephemeral-event-strategy.ts +++ b/src/handlers/event-strategies/ephemeral-event-strategy.ts @@ -1,8 +1,10 @@ +import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' import { WebSocketAdapterEvent } from '../../constants/adapter' +const debug = createLogger('ephemeral-event-strategy') export class EphemeralEventStrategy implements IEventStrategy> { public constructor( @@ -10,6 +12,7 @@ export class EphemeralEventStrategy implements IEventStrategy { + debug('received event: %o', event) this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } } diff --git a/src/handlers/event-strategies/replaceable-event-strategy.ts b/src/handlers/event-strategies/replaceable-event-strategy.ts index 69fd9d2..2042da1 100644 --- a/src/handlers/event-strategies/replaceable-event-strategy.ts +++ b/src/handlers/event-strategies/replaceable-event-strategy.ts @@ -1,9 +1,11 @@ +import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' import { IWebSocketAdapter } from '../../@types/adapters' import { WebSocketAdapterEvent } from '../../constants/adapter' +const debug = createLogger('replaceable-event-strategy') export class ReplaceableEventStrategy implements IEventStrategy> { public constructor( @@ -12,6 +14,7 @@ export class ReplaceableEventStrategy implements IEventStrategy { + debug('received event: %o', event) const count = await this.eventRepository.upsert(event) if (!count) { return diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index 58f1322..c074ad9 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -6,6 +6,7 @@ import { IAbortable, IMessageHandler } from '../@types/message-handlers' import { isEventMatchingFilter, toNostrEvent } from '../utils/event' import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' +import { createLogger } from '../factories/logger-factory' import { Event } from '../@types/event' import { IEventRepository } from '../@types/repositories' import { ISettings } from '../@types/settings' @@ -13,6 +14,8 @@ import { IWebSocketAdapter } from '../@types/adapters' import { SubscribeMessage } from '../@types/messages' import { WebSocketAdapterEvent } from '../constants/adapter' +const debug = createLogger('subscribe-message-handler') + export class SubscribeMessageHandler implements IMessageHandler, IAbortable { private readonly abortController: AbortController @@ -29,21 +32,24 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { } public async handleMessage(message: SubscribeMessage): Promise { + debug('received message: %o', message) const subscriptionId = message[1] as SubscriptionId const filters = uniqWith(equals, message.slice(2)) as SubscriptionFilter[] const reason = this.canSubscribe(subscriptionId, filters) if (reason) { + debug('subscription %s with %o rejected: %s', subscriptionId, filters, reason) this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Subscription request rejected: ${reason}`)) return } this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, filters) - return this.fetchAndSend(subscriptionId, filters) + await this.fetchAndSend(subscriptionId, filters) } private async fetchAndSend(subscriptionId: string, filters: SubscriptionFilter[]): Promise { + debug('fetching events for subscription %s with %o', subscriptionId, filters) const sendEvent = (event: Event) => this.webSocket.emit(WebSocketAdapterEvent.Message, createOutgoingEventMessage(subscriptionId, event)) const sendEOSE = () => @@ -65,7 +71,10 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { ) } catch (error) { if (error instanceof Error && error.name === 'AbortError') { + debug('aborted: %o', error) findEvents.end() + } else { + debug('error streaming events: %o', error) } throw error } diff --git a/src/handlers/unsubscribe-message-handler.ts b/src/handlers/unsubscribe-message-handler.ts index b3d7fc2..42f8fb1 100644 --- a/src/handlers/unsubscribe-message-handler.ts +++ b/src/handlers/unsubscribe-message-handler.ts @@ -1,9 +1,10 @@ -import { IWebSocketAdapter } from '../@types/adapters' - +import { createLogger } from '../factories/logger-factory' import { IMessageHandler } from '../@types/message-handlers' +import { IWebSocketAdapter } from '../@types/adapters' import { UnsubscribeMessage } from '../@types/messages' import { WebSocketAdapterEvent } from '../constants/adapter' +const debug = createLogger('unsubscribe-message-handler') export class UnsubscribeMessageHandler implements IMessageHandler { public constructor( @@ -11,6 +12,7 @@ export class UnsubscribeMessageHandler implements IMessageHandler { ) { } public async handleMessage(message: UnsubscribeMessage): Promise { + debug('received message: %o', message) this.webSocket.emit(WebSocketAdapterEvent.Unsubscribe, message[1]) } } diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index e02c543..9763e9d 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -30,6 +30,7 @@ import { DatabaseClient, EventId } from '../@types/base' import { DBEvent, Event } from '../@types/event' import { IEventRepository, IQueryResult } from '../@types/repositories' import { toBuffer, toJSON } from '../utils/transform' +import { createLogger } from '../factories/logger-factory' import { EventDelegatorMetadataKey } from '../constants/base' import { isGenericTagQuery } from '../utils/filter' import { SubscriptionFilter } from '../@types/subscription' @@ -47,10 +48,13 @@ const groupByLengthSpec = groupBy( ) ) +const debug = createLogger('event-repository') + export class EventRepository implements IEventRepository { public constructor(private readonly dbClient: DatabaseClient) { } public findByFilters(filters: SubscriptionFilter[]): IQueryResult { + debug('querying for %o', filters) if (!Array.isArray(filters) || !filters.length) { throw new Error('Filters cannot be empty') } @@ -153,6 +157,7 @@ export class EventRepository implements IEventRepository { } public async create(event: Event): Promise { + debug('creating event: %o', event) return this.insert(event).then(prop('rowCount') as () => number) } @@ -180,6 +185,7 @@ export class EventRepository implements IEventRepository { public upsert(event: Event): Promise { + debug('upserting event: %o', event) const toJSON = (input: any) => JSON.stringify(input) const row = applySpec({ @@ -212,6 +218,7 @@ export class EventRepository implements IEventRepository { } public deleteByPubkeyAndIds(pubkey: string, ids: EventId[]): Promise { + debug('deleting events from %s: %o', pubkey, ids) return this.dbClient('events') .where({ event_pubkey: toBuffer(pubkey), diff --git a/src/utils/settings.ts b/src/utils/settings.ts index 9a61a95..2940ba7 100644 --- a/src/utils/settings.ts +++ b/src/utils/settings.ts @@ -3,9 +3,11 @@ import { homedir } from 'os' import { join } from 'path' import { mergeDeepRight } from 'ramda' +import { createLogger } from '../factories/logger-factory' import { ISettings } from '../@types/settings' import packageJson from '../../package.json' +const debug = createLogger('settings') export class SettingsStatic { static _settings: ISettings @@ -58,6 +60,7 @@ export class SettingsStatic { } public static loadSettings(path: string) { + debug('loading settings from %s', path) return JSON.parse( fs.readFileSync( path, @@ -67,6 +70,7 @@ export class SettingsStatic { } public static createSettings(): ISettings { + debug('creating settings') if (SettingsStatic._settings) { return SettingsStatic._settings } @@ -86,13 +90,14 @@ export class SettingsStatic { return SettingsStatic._settings } catch (error) { - console.error('Unable to read config file. Reason: %s', error.message) + debug('error reading config file at %s: %o', path, error) return defaults } } public static saveSettings(path: string, settings: ISettings) { + debug('saving settings to %s: %o', path, settings) return fs.writeFileSync( path, JSON.stringify(settings, null, 2), diff --git a/test/integration/features/nip-01/nip-01.feature b/test/integration/features/nip-01/nip-01.feature index b68def6..08ff384 100644 --- a/test/integration/features/nip-01/nip-01.feature +++ b/test/integration/features/nip-01/nip-01.feature @@ -1,6 +1,69 @@ Feature: NIP-01 - Scenario: Alice posts set_metadata event - Given I am Alice - And I subscribe to author Alice - When I send a set_metadata event as Alice - Then I receive a set_metadata event from Alice + Scenario: Alice posts a set_metadata event + Given someone is Alice + And Alice subscribes to author Alice + When Alice sends a set_metadata event + Then Alice receives a set_metadata event from Alice + + Scenario: Alice posts a text_note event + Given someone is Alice + And Alice subscribes to author Alice + When Alice sends a text_note event with content "hello world" + Then Alice receives a text_note event from Alice with content "hello world" + + Scenario: Alice posts a recommend_server event + Given someone is Alice + And Alice subscribes to author Alice + When Alice sends a recommend_server event with content "https://nostr-ts-relay.wlvs.space" + Then Alice receives a recommend_server event from Alice with content "https://nostr-ts-relay.wlvs.space" + + Scenario: Alice can't post a text_note event with an invalid signature + Given someone is Alice + When Alice sends a text_note event with invalid signature + Then Alice receives a notice with invalid signature + + Scenario: Alice and Bob exchange text_note events + Given someone is Alice + And someone is Bob + And Alice subscribes to author Bob + And Bob subscribes to author Alice + When Bob sends a text_note event with content "hello alice" + Then Alice receives a text_note event from Bob with content "hello alice" + When Alice sends a text_note event with content "hello bob" + Then Bob receives a text_note event from Alice with content "hello bob" + + Scenario: Alice is interested in text_note events + Given someone is Alice + And someone is Bob + And Alice subscribes to text_note events + When Bob sends a text_note event with content "hello nostr" + Then Alice receives a text_note event from Bob with content "hello nostr" + + Scenario: Alice is interested in the #NostrNovember hashtag + Given someone is Alice + And someone is Bob + And Alice subscribes to tag t with "NostrNovember" + When Bob sends a text_note event with content "Nostr FTW!" and tag t containing "NostrNovember" + Then Alice receives a text_note event from Bob with content "Nostr FTW!" + + Scenario: Alice is interested in Bob's events from back in November + Given someone is Alice + And someone is Bob + When Bob sends a text_note event with content "What's up?" on 1668074223 + And Alice subscribes to any event since 1667275200 until 1669870799 + Then Alice receives a text_note event from Bob with content "What's up?" + + Scenario: Alice is interested Bob's in 2 past events + Given someone is Alice + And someone is Bob + Then Bob subscribes to author Bob + And Bob sends a text_note event with content "One" + And Bob receives a text_note event from Bob with content "One" + And Bob sends a text_note event with content "Two" + And Bob receives a text_note event from Bob with content "Two" + And Bob sends a text_note event with content "Three" + And Bob receives a text_note event from Bob with content "Three" + When Alice subscribes to author Bob with a limit of 2 + Then Alice receives 2 text_note events from Bob and EOSE + + diff --git a/test/integration/features/nip-01/nip-01.feature-step.ts b/test/integration/features/nip-01/nip-01.feature-step.ts index d19a136..c5f3062 100644 --- a/test/integration/features/nip-01/nip-01.feature-step.ts +++ b/test/integration/features/nip-01/nip-01.feature-step.ts @@ -13,6 +13,7 @@ import { createHmac } from 'crypto' import sinonChai from 'sinon-chai' import { Event } from '../../../../src/@types/event' +import { getDbClient } from '../../../../src/database/client' import { MessageType } from '../../../../src/@types/messages' import { serializeEvent } from '../../../../src/utils/event' import { SubscriptionFilter } from '../../../../src/@types/subscription' @@ -21,42 +22,82 @@ chai.use(sinonChai) const { expect } = chai Before(async function () { - const ws = new WebSocket('ws://localhost:8008') - this.parameters.ws = ws - await new Promise((resolve, reject) => { - ws - .once('open', resolve) - .once('error', reject) + this.parameters.identities = {} + this.parameters.subscriptions = {} + this.parameters.clients = {} +}) + +After(async function () { + this.parameters.subscriptions = {} + Object.values(this.parameters.clients).forEach((ws: WebSocket) => { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.close() + } }) + this.parameters.clients = {} + + const dbClient = getDbClient() + await Promise.all( + Object.values(this.parameters.identities) + .map(async (identity: { pubkey: string }) => dbClient('events').where({ event_pubkey: Buffer.from(identity.pubkey, 'hex') }).del()) + ) + this.parameters.identities = {} }) -After(function () { - const ws = this.parameters.ws as WebSocket - if (ws && ws.readyState === WebSocket.OPEN) { - ws.close() - } +Given(/someone is (\w+)/, async function(name: string) { + const connection = connect(name) + this.parameters.identities[name] = this.parameters.identities[name] ?? createIdentity(name) + this.parameters.clients[name] = await connection + this.parameters.subscriptions[name] = [] }) -Given(/I am (\w+)/, function(name: string) { - this.parameters.authors = this.parameters.authors ?? {} - this.parameters.authors[name] = this.parameters.authors[name] ?? createIdentity(name) -}) - -When(/I subscribe to author (\w+)/, async function(this: World>, name: string) { - const ws = this.parameters.ws as WebSocket - const pubkey = this.parameters.authors[name].pubkey - this.parameters.subscriptions = this.parameters.subscriptions ?? [] +When(/(\w+) subscribes to author (\w+)$/, async function(this: World>, from: string, to: string) { + const ws = this.parameters.clients[from] as WebSocket + const pubkey = this.parameters.identities[to].pubkey const subscription = { name: `test-${Math.random()}`, filters: [{ authors: [pubkey] }] } - this.parameters.subscriptions.push(subscription) + this.parameters.subscriptions[from].push(subscription) + + await createSubscription(ws, subscription.name, subscription.filters) +}) + +When(/(\w+) subscribes to author (\w+) with a limit of (\d+)/, async function(this: World>, from: string, to: string, limit: string) { + const ws = this.parameters.clients[from] as WebSocket + const pubkey = this.parameters.identities[to].pubkey + const subscription = { name: `test-${Math.random()}`, filters: [{ authors: [pubkey], limit: Number(limit) }] } + this.parameters.subscriptions[from].push(subscription) + + await createSubscription(ws, subscription.name, subscription.filters) +}) + +When(/(\w+) subscribes to text_note events/, async function(this: World>, name: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = { name: `test-${Math.random()}`, filters: [{ kinds: [1] }] } + this.parameters.subscriptions[name].push(subscription) + + await createSubscription(ws, subscription.name, subscription.filters) +}) + +When(/(\w+) subscribes to any event since (\d+) until (\d+)/, async function(this: World>, name: string, since: string, until: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = { name: `test-${Math.random()}`, filters: [{ since: Number(since), until: Number(until) }] } + this.parameters.subscriptions[name].push(subscription) + + await createSubscription(ws, subscription.name, subscription.filters) +}) + +When(/(\w+) subscribes to tag (\w) with "(.*?)"$/, async function(this: World>, name: string, tag: string, value: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = { name: `test-${Math.random()}`, filters: [{ [`#${tag}`]: [value] }] } + this.parameters.subscriptions[name].push(subscription) await createSubscription(ws, subscription.name, subscription.filters) await waitForEOSE(ws, subscription.name) }) -When(/I send a set_metadata event as (\w+)/, async function(name: string) { - const ws = this.parameters.ws as WebSocket - const { pubkey, privkey } = this.parameters.authors[name] +When(/(\w+) sends a set_metadata event/, async function(name: string) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] const content = JSON.stringify({ name }) const event: Event = await createEvent({ pubkey, kind: 0, content }, privkey) @@ -67,19 +108,156 @@ When(/I send a set_metadata event as (\w+)/, async function(name: string) { this.parameters.events.push(event) }) -Then(/I receive a set_metadata event from (\w+)/, async function(author: string) { - const expectedEvent = this.parameters.events.pop() - const subscription = this.parameters.subscriptions[this.parameters.subscriptions.length - 1] - const receivedEvent = await waitForNextEvent(this.parameters.ws, subscription.name) - expect(receivedEvent.pubkey).to.equal(this.parameters.authors[author].pubkey) - expect(receivedEvent).to.deep.equal(expectedEvent) +When(/^(\w+) sends a text_note event with content "([^"]+)"$/, async function(name: string, content: string) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 1, content }, privkey) + + await sendEvent(ws, event) }) +When(/^(\w+) sends a text_note event with content "([^"]+)" and tag (\w) containing "([^"]+)"$/, async function( + name: string, + content: string, + tag: string, + value: string, +) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 1, content, tags: [[tag, value]] }, privkey) + + await sendEvent(ws, event) +}) + +When(/^(\w+) sends a text_note event with content "([^"]+)" on (\d+)$/, async function( + name: string, + content: string, + createdAt: string, +) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 1, content, created_at: Number(createdAt) }, privkey) + + await sendEvent(ws, event) +}) + +When(/(\w+) sends a text_note event with invalid signature/, async function(name: string) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 1, content: "I'm cheating" }, privkey) + + event.sig = 'f'.repeat(128) + + await sendEvent(ws, event) +}) + +When(/(\w+) sends a recommend_server event with content "(.+?)"/, async function(name: string, content: string) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 2, content }, privkey) + + await sendEvent(ws, event) +}) + +Then(/(\w+) receives a set_metadata event from (\w+)/, async function(name: string, author: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1] + const receivedEvent = await waitForNextEvent(ws, subscription.name) + + expect(receivedEvent.kind).to.equal(0) + expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey) +}) + +Then(/(\w+) receives a text_note event from (\w+) with content "(.+?)"/, async function(name: string, author: string, content: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1] + const receivedEvent = await waitForNextEvent(ws, subscription.name) + + expect(receivedEvent.kind).to.equal(1) + expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey) + expect(receivedEvent.content).to.equal(content) +}) + +Then(/(\w+) receives a text_note event from (\w+) with content "(.+?)" on (\d+)/, async function( + name: string, + author: string, + content: string, + createdAt: string, +) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1] + const receivedEvent = await waitForNextEvent(ws, subscription.name) + + expect(receivedEvent.kind).to.equal(1) + expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey) + expect(receivedEvent.content).to.equal(content) + expect(receivedEvent.created_at).to.equal(Number(createdAt)) +}) + +Then(/(\w+) receives (\d+) text_note events from (\w+)/, async function( + name: string, + count: string, + author: string, +) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1] + const events = await waitForEventCount(ws, subscription.name, Number(count), true) + + expect(events.length).to.equal(2) + expect(events[0].kind).to.equal(1) + expect(events[1].kind).to.equal(1) + expect(events[0].pubkey).to.equal(this.parameters.identities[author].pubkey) + expect(events[1].pubkey).to.equal(this.parameters.identities[author].pubkey) +}) + +Then(/(\w+) receives a recommend_server event from (\w+) with content "(.+?)"/, async function(name: string, author: string, content: string) { + const ws = this.parameters.clients[name] as WebSocket + const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1] + const receivedEvent = await waitForNextEvent(ws, subscription.name) + + expect(receivedEvent.kind).to.equal(2) + expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey) + expect(receivedEvent.content).to.equal(content) +}) + +Then(/(\w+) receives a notice with (.*)/, async function(name: string, pattern: string) { + const ws = this.parameters.clients[name] as WebSocket + const actualNotice = await waitForNotice(ws) + + expect(actualNotice).to.contain(pattern) +}) + +async function connect(_name: string) { + const host = 'ws://localhost:8008' + const ws = new WebSocket(host) + await new Promise((resolve, reject) => { + ws + // .on('message', (data: RawData) => { + // console.log(`${name} received`, JSON.parse(data.toString('utf-8'))) + // }) + .once('open', () => { + resolve() + }) + .once('error', reject) + .once('close', () => { + ws.removeAllListeners() + }) + }) + return ws +} + +let eventCount = 0 + async function createEvent(input: Partial, privkey: any): Promise { const event: Event = { pubkey: input.pubkey, kind: input.kind, - created_at: input.created_at ?? Math.floor(Date.now() / 1000), + created_at: input.created_at ?? Math.floor(Date.now() / 1000) + eventCount++, content: input.content ?? '', tags: input.tags ?? [], } as any @@ -174,6 +352,82 @@ async function sendEvent(ws: WebSocket, event: Event) { async function waitForNextEvent(ws: WebSocket, subscription: string): Promise { return new Promise((resolve, reject) => { + ws.on('message', onMessage) + ws.once('error', onError) + + function cleanup() { + ws.removeListener('message', onMessage) + ws.removeListener('error', onError) + } + + function onError(error: Error) { + reject(error) + cleanup() + } + + function onMessage(raw: RawData) { + const message = JSON.parse(raw.toString('utf-8')) + if (message[0] === MessageType.EVENT && message[1] === subscription) { + resolve(message[2]) + cleanup() + } else if (message[0] === MessageType.NOTICE) { + reject(new Error(message[1])) + cleanup() + } + } + }) +} + +async function waitForEventCount( + ws: WebSocket, + subscription: string, + count = 1, + eose = false, +): Promise { + const events = [] + + return new Promise((resolve, reject) => { + ws.on('message', onMessage) + ws.once('error', onError) + function cleanup() { + ws.removeListener('message', onMessage) + ws.removeListener('error', onError) + } + + function onError(error: Error) { + reject(error) + cleanup() + } + function onMessage(raw: RawData) { + const message = JSON.parse(raw.toString('utf-8')) + if (message[0] === MessageType.EVENT && message[1] === subscription) { + events.push(message[2]) + if (!eose && events.length === count) { + resolve(events) + cleanup() + } else if (events.length > count) { + reject(new Error(`Expected ${count} but got ${events.length} events`)) + cleanup() + } + } else if (message[0] === MessageType.EOSE && message[1] === subscription) { + if (!eose) { + reject(new Error('Expected event but received EOSE')) + } else if (events.length !== count) { + reject(new Error(`Expected ${count} but got ${events.length} events before EOSE`)) + } else { + resolve(events) + } + cleanup() + } else if (message[0] === MessageType.NOTICE) { + reject(new Error(message[1])) + cleanup() + } + } + }) +} + +async function waitForNotice(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { function cleanup() { ws.removeListener('message', onMessage) ws.removeListener('error', onError) @@ -186,16 +440,13 @@ async function waitForNextEvent(ws: WebSocket, subscription: string): Promise