From a7d8561bea9415e235d3e6fdc6f3015682155177 Mon Sep 17 00:00:00 2001 From: Ricardo Arturo Cabral Mejia Date: Thu, 11 Aug 2022 23:13:28 +0000 Subject: [PATCH] chore: stream events --- .../20220524_153400_create_events_table.js | 2 +- ...400_add_replaceable_events_unique_index.js | 10 +++ package-lock.json | 31 +++++++ package.json | 1 + src/{types => @types}/base.ts | 2 + src/{types => @types}/event.ts | 2 +- src/@types/message-handlers.ts | 15 ++++ src/{types => @types}/messages.ts | 3 +- src/{types => @types}/repositories.ts | 4 +- src/{types => @types}/servers.ts | 7 +- src/@types/settings.ts | 61 ++++++++++++++ src/{types => @types}/subscription.ts | 0 src/adapters/web-server-adapter.ts | 16 ++-- src/adapters/web-socket-server-adapter.ts | 62 +++++++------- src/client.ts | 23 ------ src/database/client.ts | 1 + src/factories/event-strategy-factory.ts | 27 +++++++ src/factories/message-handler-factory.ts | 24 ++++++ src/handlers/event-message-handler.ts | 42 +++++----- .../default-event-strategy.ts | 31 +++++++ .../ephemeral-event-strategy.ts | 22 +++++ .../null-event-strategy copy.ts | 10 +++ .../replaceable-event-strategy.ts | 31 +++++++ src/handlers/subscribe-message-handler.ts | 81 ++++++++++--------- src/handlers/unsubscribe-message-handler.ts | 20 +++-- src/index.ts | 8 +- src/messages.ts | 6 +- src/repositories/event-repository.ts | 67 ++++++++------- src/schemas/base-schema.ts | 4 +- src/settings.ts | 58 +++++++------ src/types/message-handlers.ts | 9 --- src/utils/event.ts | 28 ++++++- src/utils/hash-event.ts | 2 +- src/utils/serialize-event.ts | 2 +- src/utils/transforms.ts | 26 ++++++ test/unit/events.spec.ts | 4 +- 36 files changed, 517 insertions(+), 225 deletions(-) create mode 100644 migrations/20220809_190400_add_replaceable_events_unique_index.js rename src/{types => @types}/base.ts (81%) rename src/{types => @types}/event.ts (92%) create mode 100644 src/@types/message-handlers.ts rename src/{types => @types}/messages.ts (91%) rename src/{types => @types}/repositories.ts (54%) rename src/{types => @types}/servers.ts (69%) create mode 100644 src/@types/settings.ts rename src/{types => @types}/subscription.ts (100%) delete mode 100644 src/client.ts create mode 100644 src/factories/event-strategy-factory.ts create mode 100644 src/factories/message-handler-factory.ts create mode 100644 src/handlers/event-strategies/default-event-strategy.ts create mode 100644 src/handlers/event-strategies/ephemeral-event-strategy.ts create mode 100644 src/handlers/event-strategies/null-event-strategy copy.ts create mode 100644 src/handlers/event-strategies/replaceable-event-strategy.ts delete mode 100644 src/types/message-handlers.ts create mode 100644 src/utils/transforms.ts diff --git a/migrations/20220524_153400_create_events_table.js b/migrations/20220524_153400_create_events_table.js index bb6a7fc..c7bfdca 100644 --- a/migrations/20220524_153400_create_events_table.js +++ b/migrations/20220524_153400_create_events_table.js @@ -1,7 +1,7 @@ exports.up = function (knex) { return knex.schema.createTable('events', (table) => { table.uuid('id').primary().defaultTo(knex.raw('uuid_generate_v4()')) - table.binary('event_id').unique().notNullable() + table.binary('event_id').unique().notNullable().index() table.binary('event_pubkey').notNullable().index() table.integer('event_kind').unsigned().notNullable().index() table.integer('event_created_at').unsigned().notNullable().index() diff --git a/migrations/20220809_190400_add_replaceable_events_unique_index.js b/migrations/20220809_190400_add_replaceable_events_unique_index.js new file mode 100644 index 0000000..d0d1bf5 --- /dev/null +++ b/migrations/20220809_190400_add_replaceable_events_unique_index.js @@ -0,0 +1,10 @@ +exports.up = function (knex) { + // NIP-16: Replaceable Events + return knex.raw( + 'CREATE UNIQUE INDEX replaceable_events_idx ON events ( event_pubkey, event_kind ) WHERE event_kind = 0 OR event_kind >= 10000 AND event_kind < 20000;', + ) +} + +exports.down = function (knex) { + return knex.raw('DROP INDEX IF EXISTS replaceable_events_idx') +} diff --git a/package-lock.json b/package-lock.json index e34eae9..923e0dc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "joi": "^17.6.0", "knex": "^2.0.0", "pg": "^8.7.3", + "pg-query-stream": "4.2.3", "ramda": "^0.28.0", "ws": "^8.5.0" }, @@ -2180,6 +2181,14 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "node_modules/pg-cursor": { + "version": "2.7.3", + "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.7.3.tgz", + "integrity": "sha512-vmjXRMD4jZK/oHaaYk6clTypgHNlzCCAqyLCO5d/UeI42egJVE5H4ZfZWACub3jzkHUXXyvibH207zAJg9iBOw==", + "peerDependencies": { + "pg": "^8" + } + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", @@ -2201,6 +2210,14 @@ "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz", "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==" }, + "node_modules/pg-query-stream": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/pg-query-stream/-/pg-query-stream-4.2.3.tgz", + "integrity": "sha512-3mrOzffAoGGi2EqsfTdKanKn444ZB+E+Gbz/EJL3rd0thlXD3kb3ZBrwX42bRnQssrEd7/kVFM1FbiIMSQ5ung==", + "dependencies": { + "pg-cursor": "^2.7.3" + } + }, "node_modules/pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", @@ -4694,6 +4711,12 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "pg-cursor": { + "version": "2.7.3", + "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.7.3.tgz", + "integrity": "sha512-vmjXRMD4jZK/oHaaYk6clTypgHNlzCCAqyLCO5d/UeI42egJVE5H4ZfZWACub3jzkHUXXyvibH207zAJg9iBOw==", + "requires": {} + }, "pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", @@ -4710,6 +4733,14 @@ "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz", "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==" }, + "pg-query-stream": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/pg-query-stream/-/pg-query-stream-4.2.3.tgz", + "integrity": "sha512-3mrOzffAoGGi2EqsfTdKanKn444ZB+E+Gbz/EJL3rd0thlXD3kb3ZBrwX42bRnQssrEd7/kVFM1FbiIMSQ5ung==", + "requires": { + "pg-cursor": "^2.7.3" + } + }, "pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", diff --git a/package.json b/package.json index 9688f7a..ef7ca24 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "joi": "^17.6.0", "knex": "^2.0.0", "pg": "^8.7.3", + "pg-query-stream": "4.2.3", "ramda": "^0.28.0", "ws": "^8.5.0" } diff --git a/src/types/base.ts b/src/@types/base.ts similarity index 81% rename from src/types/base.ts rename to src/@types/base.ts index 5566e19..c1c4161 100644 --- a/src/types/base.ts +++ b/src/@types/base.ts @@ -13,3 +13,5 @@ export type Range = Exclude< Enumerate, Enumerate > + +export type Factory = (input: TInput) => TOutput \ No newline at end of file diff --git a/src/types/event.ts b/src/@types/event.ts similarity index 92% rename from src/types/event.ts rename to src/@types/event.ts index d874917..365975d 100644 --- a/src/types/event.ts +++ b/src/@types/event.ts @@ -1,5 +1,5 @@ import { EventKinds } from '../constants/base' -import { Pubkey, TagName } from '../types/base' +import { Pubkey, TagName } from './base' export type EventId = string diff --git a/src/@types/message-handlers.ts b/src/@types/message-handlers.ts new file mode 100644 index 0000000..e24a7a0 --- /dev/null +++ b/src/@types/message-handlers.ts @@ -0,0 +1,15 @@ +import { WebSocket } from 'ws' + +import { Message } from './messages' + +export interface IMessageHandler { + handleMessage(message: Message, client: WebSocket): Promise +} + +export interface IAbortable { + abort(): void +} + +export interface IEventStrategy { + execute(args: TInput): TOutput +} diff --git a/src/types/messages.ts b/src/@types/messages.ts similarity index 91% rename from src/types/messages.ts rename to src/@types/messages.ts index e74c95f..5a23837 100644 --- a/src/types/messages.ts +++ b/src/@types/messages.ts @@ -24,7 +24,8 @@ export type SubscribeMessage = { 1: SubscriptionId } & Array -export interface IncomingEventMessage { +export type IncomingEventMessage = EventMessage & [] +export interface EventMessage { 0: MessageType.EVENT 1: Event } diff --git a/src/types/repositories.ts b/src/@types/repositories.ts similarity index 54% rename from src/types/repositories.ts rename to src/@types/repositories.ts index 8294e81..85fdb43 100644 --- a/src/types/repositories.ts +++ b/src/@types/repositories.ts @@ -1,7 +1,9 @@ +import { PassThrough } from 'stream' import { Event } from './event' import { SubscriptionFilter } from './subscription' export interface IEventRepository { create(event: Event): Promise - findByfilters(filters: SubscriptionFilter[]): Promise + upsert(event: Event): Promise + findByfilters(filters: SubscriptionFilter[]): PassThrough } diff --git a/src/types/servers.ts b/src/@types/servers.ts similarity index 69% rename from src/types/servers.ts rename to src/@types/servers.ts index 7f1d051..0a2fb3e 100644 --- a/src/types/servers.ts +++ b/src/@types/servers.ts @@ -1,14 +1,13 @@ +import { EventEmitter } from 'node:stream' import { WebSocket } from 'ws' import { Event } from './event' -import { IMessageHandler } from './message-handlers' import { SubscriptionFilter, SubscriptionId } from './subscription' export interface IWebSocketServerAdapter { - addMessageHandler(messageHandler: IMessageHandler): void getSubscriptions(client: WebSocket): Map | undefined broadcastEvent(event: Event): Promise } -export interface IWebServerAdapter { +export interface IWebServerAdapter extends EventEmitter { listen(port: number) -} \ No newline at end of file +} diff --git a/src/@types/settings.ts b/src/@types/settings.ts new file mode 100644 index 0000000..857cffd --- /dev/null +++ b/src/@types/settings.ts @@ -0,0 +1,61 @@ +import { EventKinds } from '../constants/base' +import { Pubkey } from './base' + +interface Info { + relay_url?: string + name?: string + description?: string + pubkey?: string + contact?: string +} + +interface EventIdLimits { + minimumZeroBits?: number +} + +interface PubkeyLimits { + whitelist?: Pubkey[] + blacklist?: Pubkey[] +} + +interface KindLimits { + whitelist?: EventKinds[] + blacklist?: EventKinds[] +} + +interface CreatedAtLimits { + /** + * Maximum number of seconds allowed before the current unix timestamp + */ + maximumNegativeSkew?: number + /** + * Maximum number of seconds allowed after the current unix timestamp + */ + maximumPositiveSkew?: number +} + +interface EventLimits { + eventId?: EventIdLimits + pubkey?: PubkeyLimits + kind?: KindLimits + createdAt?: CreatedAtLimits +} + +interface ClientSubscriptionLimits { + maximumCount?: number + maximumFilters?: number +} + +interface ClientLimits { + subscription?: ClientSubscriptionLimits +} + +interface Limits { + client?: ClientLimits + event?: EventLimits +} + +export interface ISettings { + info: Info + limits?: Limits +} diff --git a/src/types/subscription.ts b/src/@types/subscription.ts similarity index 100% rename from src/types/subscription.ts rename to src/@types/subscription.ts diff --git a/src/adapters/web-server-adapter.ts b/src/adapters/web-server-adapter.ts index e72b96c..bb2756b 100644 --- a/src/adapters/web-server-adapter.ts +++ b/src/adapters/web-server-adapter.ts @@ -1,15 +1,16 @@ import { IncomingMessage, Server, ServerResponse } from 'http' -import { Duplex } from 'stream' +import { Duplex, EventEmitter } from 'stream' import packageJson from '../../package.json' import { Settings } from '../settings' -import { IWebServerAdapter } from '../types/servers' +import { IWebServerAdapter } from '../@types/servers' -export class WebServerAdapter implements IWebServerAdapter { +export class WebServerAdapter extends EventEmitter implements IWebServerAdapter { public constructor( private readonly webServer: Server, ) { + super() this.webServer.on('request', this.onWebServerRequest.bind(this)) this.webServer.on('clientError', this.onWebServerSocketError.bind(this)) } @@ -20,7 +21,7 @@ export class WebServerAdapter implements IWebServerAdapter { } private onWebServerRequest(request: IncomingMessage, response: ServerResponse) { - if (request.headers['accept'] === 'application/nostr+json') { + if (request.method === 'GET' && request.headers['accept'] === 'application/nostr+json') { const { info: { name, description, pubkey, contact }, } = Settings @@ -30,7 +31,7 @@ export class WebServerAdapter implements IWebServerAdapter { description, pubkey, contact, - supported_nips: [11, 12, 15], + supported_nips: [11, 12, 15, 16], software: packageJson.repository.url, version: packageJson.version, } @@ -38,7 +39,8 @@ export class WebServerAdapter implements IWebServerAdapter { response.setHeader('content-type', 'application/nostr+json') response.end(JSON.stringify(relayInformationDocument)) } else { - response.end() + response.setHeader('content-type', 'application/text') + response.end('Please use a Nostr client to connect.') } } @@ -48,4 +50,4 @@ export class WebServerAdapter implements IWebServerAdapter { } socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') } -} \ No newline at end of file +} diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index a4ca4d6..088d47b 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -3,12 +3,13 @@ import WebSocket, { WebSocketServer } from 'ws' import { isEventMatchingFilter } from '../utils/event' import { createOutgoingEventMessage } from '../messages' -import { Event } from '../types/event' -import { IMessageHandler } from '../types/message-handlers' -import { Message } from '../types/messages' -import { IWebSocketServerAdapter } from '../types/servers' -import { SubscriptionId, SubscriptionFilter } from '../types/subscription' +import { Event } from '../@types/event' +import { IAbortable, IMessageHandler } from '../@types/message-handlers' +import { Message } from '../@types/messages' +import { IWebSocketServerAdapter } from '../@types/servers' +import { SubscriptionId, SubscriptionFilter } from '../@types/subscription' import { WebServerAdapter } from './web-server-adapter' +import { Factory } from '../@types/base' const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 @@ -26,6 +27,7 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock public constructor( webServer: Server, private readonly webSocketServer: WebSocketServer, + private readonly messageHandlerFactory: Factory, ) { super(webServer) @@ -38,10 +40,6 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) } - public addMessageHandler(messageHandler: IMessageHandler): void { - this.handlers.push(messageHandler) - } - public getSubscriptions(client: WebSocket): Map | undefined { return this.subscriptions.get(client) } @@ -71,34 +69,34 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.heartbeats.set(client, true) this.subscriptions.set(client, new Map()) - client.on('message', (raw: WebSocket.RawData) => { - try { - const message = JSON.parse(raw.toString('utf-8')) - this.onWebSocketClientMessage(client, message) - } catch (error) { - console.error('Unable to parse message', raw.toString('utf-8')) - } - }) + client.on('message', (raw: WebSocket.RawData) => this.onWebSocketClientMessage(client, raw)) - client.on('close', (_code: number) => { - this.onWebSocketClientClose(client) - }) + client.on('close', (code: number) => this.onWebSocketClientClose(client, code)) - client.on('pong', () => this.onWebSocketClientPong.call(this, client)) + client.on('pong', (data: Buffer) => this.onWebSocketClientPong(client, data)) } - private async onWebSocketClientMessage(client: WebSocket, message: Message) { - for (const handler of this.handlers) { - if (handler.canHandleMessageType(message[0])) { - const handled = await handler.handleMessage(message, client, this) - if (handled) { - break - } + private async onWebSocketClientMessage(client: WebSocket, raw: WebSocket.RawData) { + let abort + try { + const message = JSON.parse(raw.toString('utf-8')) + const messageHandler = this.messageHandlerFactory([message, this]) as IMessageHandler & IAbortable + if (typeof messageHandler.abort === 'function') { + abort = messageHandler.abort.bind(messageHandler) + client.once('close', abort) + } + + await messageHandler?.handleMessage(message, client) + } catch (error) { + console.error(`Unable to handle message: ${error.message}`) + } finally { + if (abort) { + client.removeEventListener('close', abort) } } } - private onWebSocketClientPong(client: WebSocket) { + private onWebSocketClientPong(client: WebSocket, _data: Buffer) { this.heartbeats.set(client, true) } @@ -116,12 +114,14 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock } private onWebSocketServerClose() { + console.debug('websocket server closing') clearInterval(this.heartbeatInterval) } - private onWebSocketClientClose(client: WebSocket) { + private onWebSocketClientClose(client: WebSocket, code: number) { + console.debug('client closing', code) this.subscriptions.delete(client) client.removeAllListeners() } -} \ No newline at end of file +} diff --git a/src/client.ts b/src/client.ts deleted file mode 100644 index 444bc58..0000000 --- a/src/client.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { WebSocket } from 'ws' - -import { IClient } from './types/clients' -import { Message } from './types/messages' - -export class Client implements IClient { - public constructor( - private readonly websocket: WebSocket - ) { } - - public from(websocket: WebSocket): IClient { - return new Client(websocket) - } - - public isConnected(): boolean { - return this.websocket.readyState === WebSocket.OPEN - } - - public send(message: Message): void { - this.websocket.send(JSON.stringify(message)) - } - -} \ No newline at end of file diff --git a/src/database/client.ts b/src/database/client.ts index f2f7425..eb94963 100644 --- a/src/database/client.ts +++ b/src/database/client.ts @@ -1,4 +1,5 @@ import 'pg' +import 'pg-query-stream' import knex, { Knex } from 'knex' const createDbConfig = (): Knex.Config => ({ diff --git a/src/factories/event-strategy-factory.ts b/src/factories/event-strategy-factory.ts new file mode 100644 index 0000000..f8d1ccb --- /dev/null +++ b/src/factories/event-strategy-factory.ts @@ -0,0 +1,27 @@ +import { WebSocket } from 'ws' +import { DefaultEventStrategy } from '../handlers/event-strategies/default-event-strategy' +import { EphemeralEventStrategy } from '../handlers/event-strategies/ephemeral-event-strategy' +import { NullEventStrategy } from '../handlers/event-strategies/null-event-strategy copy' +import { ReplaceableEventStrategy } from '../handlers/event-strategies/replaceable-event-strategy' +import { Factory } from '../@types/base' +import { Event } from '../@types/event' +import { IEventStrategy } from '../@types/message-handlers' +import { IEventRepository } from '../@types/repositories' +import { IWebSocketServerAdapter } from '../@types/servers' +import { isEphemeralEvent, isNullEvent, isReplaceableEvent } from '../utils/event' + + +export const createEventStrategyFactory = ( + adapter: IWebSocketServerAdapter, + eventRepository: IEventRepository, +): Factory>, Event> => (event: Event) => { + if (isReplaceableEvent(event)) { + return new ReplaceableEventStrategy(adapter, eventRepository) + } else if (isEphemeralEvent(event)) { + return new EphemeralEventStrategy(adapter) + } else if (isNullEvent(event)) { + return new NullEventStrategy() + } + + return new DefaultEventStrategy(adapter, eventRepository) +} diff --git a/src/factories/message-handler-factory.ts b/src/factories/message-handler-factory.ts new file mode 100644 index 0000000..c699001 --- /dev/null +++ b/src/factories/message-handler-factory.ts @@ -0,0 +1,24 @@ +import { EventMessageHandler } from '../handlers/event-message-handler' +import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler' +import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler' +import { Message, MessageType } from '../@types/messages' +import { IEventRepository } from '../@types/repositories' +import { IWebSocketServerAdapter } from '../@types/servers' +import { createEventStrategyFactory } from './event-strategy-factory' + + +export const createMessageHandlerFactory = ( + eventRepository: IEventRepository, +) => ([message, adapter]: [Message, IWebSocketServerAdapter]) => { + console.debug('Received message', message) + switch (message[0]) { + case MessageType.EVENT: + return new EventMessageHandler(createEventStrategyFactory(adapter, eventRepository)) + case MessageType.REQ: + return new SubscribeMessageHandler(adapter, eventRepository) + case MessageType.CLOSE: + return new UnsubscribeMessageHandler(adapter) + default: + throw new Error(`Unknown message type: ${String(message[0])}`) + } +} diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index 8e0f5aa..b65033b 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -1,38 +1,32 @@ -import { IMessageHandler } from '../types/message-handlers' -import { MessageType, IncomingEventMessage } from '../types/messages' -import { IWebSocketServerAdapter } from '../types/servers' -import { IEventRepository } from '../types/repositories' -import { isEventSignatureValid } from '../utils/event' +import { IMessageHandler, IEventStrategy } from '../@types/message-handlers' +import { IncomingEventMessage } from '../@types/messages' import { WebSocket } from 'ws' +import { Event } from '../@types/event' +import { Factory } from '../@types/base' +import { isEventSignatureValid } from '../utils/event' export class EventMessageHandler implements IMessageHandler { public constructor( - private readonly eventRepository: IEventRepository, + private readonly strategy: Factory>, Event> ) { } - public canHandleMessageType(messageType: MessageType): boolean { - return messageType === MessageType.EVENT - } + public async handleMessage(message: IncomingEventMessage, client: WebSocket): Promise { + const [, event] = message + if (!await isEventSignatureValid(event)) { + console.warn(`Event ${event.id} from ${event.pubkey} with signature ${event.sig} is not valid`) + return + } - public async handleMessage(message: IncomingEventMessage, _client: WebSocket, adapter: IWebSocketServerAdapter): Promise { - if (!await isEventSignatureValid(message[1])) { - console.warn(`Event ${message[1].id} from ${message[1].pubkey} with signature ${message[1].sig} is not valid`) + const strategy = this.strategy(event) + + if (typeof strategy?.execute !== 'function') { return } try { - const count = await this.eventRepository.create(message[1]) - if (!count) { - return true - } - - await adapter.broadcastEvent(message[1]) - - return true + await strategy.execute([event, client]) } catch (error) { - console.error(`Unable to add event. Reason: ${error.message}`) - - return false + console.error('Error handling message:', message, error) } } -} \ No newline at end of file +} diff --git a/src/handlers/event-strategies/default-event-strategy.ts b/src/handlers/event-strategies/default-event-strategy.ts new file mode 100644 index 0000000..2896e7d --- /dev/null +++ b/src/handlers/event-strategies/default-event-strategy.ts @@ -0,0 +1,31 @@ +import { WebSocket } from 'ws' +import { Event } from '../../@types/event' +import { IEventStrategy } from '../../@types/message-handlers' +import { IEventRepository } from '../../@types/repositories' +import { IWebSocketServerAdapter } from '../../@types/servers' + + +export class DefaultEventStrategy implements IEventStrategy<[Event, WebSocket], Promise> { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + private readonly eventRepository: IEventRepository, + ) { } + + public async execute([event,]: [Event, WebSocket]): Promise { + try { + const count = await this.eventRepository.create(event) + if (!count) { + return true + } + + await this.adapter.broadcastEvent(event) + + return true + } catch (error) { + console.error('Unable to handle event. Reason:', error) + + return false + } + } + +} diff --git a/src/handlers/event-strategies/ephemeral-event-strategy.ts b/src/handlers/event-strategies/ephemeral-event-strategy.ts new file mode 100644 index 0000000..b907836 --- /dev/null +++ b/src/handlers/event-strategies/ephemeral-event-strategy.ts @@ -0,0 +1,22 @@ +import { WebSocket } from 'ws' +import { Event } from '../../@types/event' +import { IEventStrategy } from '../../@types/message-handlers' +import { IWebSocketServerAdapter } from '../../@types/servers' + + +export class EphemeralEventStrategy implements IEventStrategy<[Event, WebSocket], Promise> { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + ) { } + + public async execute([event,]: [Event, WebSocket]): Promise { + console.log('Ephemeral event') + try { + await this.adapter.broadcastEvent(event) + } catch (error) { + console.error('Unable to handle event. Reason:', error) + + return false + } + } +} diff --git a/src/handlers/event-strategies/null-event-strategy copy.ts b/src/handlers/event-strategies/null-event-strategy copy.ts new file mode 100644 index 0000000..121151d --- /dev/null +++ b/src/handlers/event-strategies/null-event-strategy copy.ts @@ -0,0 +1,10 @@ +import { IEventStrategy } from '../../@types/message-handlers' + +/** + * An event strategy that refuses to do anything useful + */ +export class NullEventStrategy implements IEventStrategy> { + public async execute(): Promise { + return true + } +} diff --git a/src/handlers/event-strategies/replaceable-event-strategy.ts b/src/handlers/event-strategies/replaceable-event-strategy.ts new file mode 100644 index 0000000..be603d1 --- /dev/null +++ b/src/handlers/event-strategies/replaceable-event-strategy.ts @@ -0,0 +1,31 @@ +import { WebSocket } from 'ws' +import { Event } from '../../@types/event' +import { IEventStrategy } from '../../@types/message-handlers' +import { IEventRepository } from '../../@types/repositories' +import { IWebSocketServerAdapter } from '../../@types/servers' + + +export class ReplaceableEventStrategy implements IEventStrategy<[Event, WebSocket], Promise> { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + private readonly eventRepository: IEventRepository, + ) { } + + public async execute([event,]: [Event, WebSocket]): Promise { + console.log('Replaceable event') + try { + const count = await this.eventRepository.upsert(event) + if (!count) { + return true + } + + await this.adapter.broadcastEvent(event) + + return true + } catch (error) { + console.error('Unable to handle event. Reason:', error) + + return false + } + } +} diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index 8abb2eb..2360883 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -1,30 +1,39 @@ +import { pipeline } from 'node:stream/promises' import { inspect } from 'util' import { WebSocket } from 'ws' import { createOutgoingEventMessage, createEndOfStoredEventsNoticeMessage } from '../messages' -import { IMessageHandler } from '../types/message-handlers' -import { MessageType, SubscribeMessage } from '../types/messages' -import { IWebSocketServerAdapter } from '../types/servers' -import { IEventRepository } from '../types/repositories' -import { SubscriptionId, SubscriptionFilter } from '../types/subscription' +import { IAbortable, IMessageHandler } from '../@types/message-handlers' +import { SubscribeMessage } from '../@types/messages' +import { IWebSocketServerAdapter } from '../@types/servers' +import { IEventRepository } from '../@types/repositories' +import { SubscriptionId, SubscriptionFilter } from '../@types/subscription' +import { toNostrEvent } from '../utils/event' +import { streamEach, streamMap } from '../utils/transforms' +import { Event } from '../@types/event' -export class SubscribeMessageHandler implements IMessageHandler { +export class SubscribeMessageHandler implements IMessageHandler, IAbortable { + private readonly abortController: AbortController + public constructor( + private readonly adapter: IWebSocketServerAdapter, private readonly eventRepository: IEventRepository, - ) { } - - public canHandleMessageType(messageType: MessageType): boolean { - return messageType === MessageType.REQ + ) { + this.abortController = new AbortController() } - public async handleMessage(message: SubscribeMessage, client: WebSocket, adapter: IWebSocketServerAdapter): Promise { + public abort(): void { + this.abortController.abort() + } + + public async handleMessage(message: SubscribeMessage, client: WebSocket): Promise { const subscriptionId = message[1] as SubscriptionId const filters = message.slice(2) as SubscriptionFilter[] - const exists = adapter.getSubscriptions(client)?.get(subscriptionId) + const exists = this.adapter.getSubscriptions(client)?.get(subscriptionId) - adapter.getSubscriptions(client)?.set(subscriptionId, filters) + this.adapter.getSubscriptions(client)?.set(subscriptionId, filters) console.log( `Subscription ${subscriptionId} ${exists ? 'updated' : 'created' @@ -32,31 +41,29 @@ export class SubscribeMessageHandler implements IMessageHandler { inspect(filters) ) - // TODO: search for matching events on the DB, then send ESOE + const sendEvent = (event: Event) => client.send(JSON.stringify(createOutgoingEventMessage(subscriptionId, event))) + const sendEOSE = () => client.send(JSON.stringify(createEndOfStoredEventsNoticeMessage(subscriptionId))) - return this.eventRepository.findByfilters(filters).then( - (events) => { - events.forEach((event) => { - client.send( - JSON.stringify( - createOutgoingEventMessage(subscriptionId, event) - ) - ) - }) - console.debug(`Sent ${events.length} events to:`, subscriptionId) - client.send( - JSON.stringify( - createEndOfStoredEventsNoticeMessage(subscriptionId) - ) - ) - console.debug('Sent EOSE to:', subscriptionId) - return true - }, - (error) => { - console.error('Unable to find by filters: ', error) - return true + const findEvents = this.eventRepository.findByfilters(filters) + try { + await pipeline( + findEvents, + streamMap(toNostrEvent), + streamEach( + sendEvent, + sendEOSE, // NIP-15: End of Stored Events Notice + ), + { + signal: this.abortController.signal, + }, + ) + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + console.log('AbortError when finding events') + findEvents.end() } - ) + throw error + } } -} \ No newline at end of file +} diff --git a/src/handlers/unsubscribe-message-handler.ts b/src/handlers/unsubscribe-message-handler.ts index 72dfa96..0358591 100644 --- a/src/handlers/unsubscribe-message-handler.ts +++ b/src/handlers/unsubscribe-message-handler.ts @@ -1,20 +1,18 @@ import { WebSocket } from 'ws' -import { IMessageHandler } from '../types/message-handlers' -import { MessageType, UnsubscribeMessage } from '../types/messages' -import { IWebSocketServerAdapter } from '../types/servers' +import { IMessageHandler } from '../@types/message-handlers' +import { UnsubscribeMessage } from '../@types/messages' +import { IWebSocketServerAdapter } from '../@types/servers' export class UnsubscribeMessageHandler implements IMessageHandler { - public canHandleMessageType(messageType: MessageType): boolean { - return messageType === MessageType.CLOSE - } + public constructor( + private readonly adapter: IWebSocketServerAdapter, + ) { } - public async handleMessage(message: UnsubscribeMessage, client: WebSocket, adapter: IWebSocketServerAdapter): Promise { + public async handleMessage(message: UnsubscribeMessage, client: WebSocket): Promise { const subscriptionId = message[1] - adapter.getSubscriptions(client)?.delete(subscriptionId) - - return true + this.adapter.getSubscriptions(client)?.delete(subscriptionId) } -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index da0a852..91ec03b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,9 +4,7 @@ import { WebSocketServer } from 'ws' import { getDbClient } from './database/client' import { EventRepository } from './repositories/event-repository' import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter' -import { SubscribeMessageHandler } from './handlers/subscribe-message-handler' -import { UnsubscribeMessageHandler } from './handlers/unsubscribe-message-handler' -import { EventMessageHandler } from './handlers/event-message-handler' +import { createMessageHandlerFactory } from './factories/message-handler-factory' const server = http.createServer() const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) @@ -16,10 +14,8 @@ const eventRepository = new EventRepository(dbClient) const adapter = new WebSocketServerAdapter( server, wss, + createMessageHandlerFactory(eventRepository) ) -adapter.addMessageHandler(new SubscribeMessageHandler(eventRepository)) -adapter.addMessageHandler(new UnsubscribeMessageHandler()) -adapter.addMessageHandler(new EventMessageHandler(eventRepository)) const port = Number(process.env.SERVER_PORT) || 8008 adapter.listen(port) diff --git a/src/messages.ts b/src/messages.ts index 42a79f6..406f842 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -1,11 +1,11 @@ -import { Event } from './types/event' -import { SubscriptionId } from './types/subscription' +import { Event } from './@types/event' +import { SubscriptionId } from './@types/subscription' import { EndOfStoredEventsNotice, MessageType, Notice, OutgoingEventMessage, -} from './types/messages' +} from './@types/messages' export const createNotice = (notice: string): Notice => { return [MessageType.NOTICE, notice] diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index baeb809..c7da83b 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -1,19 +1,18 @@ import { Knex } from 'knex' -import { applySpec, pipe, prop } from 'ramda' +import { applySpec, omit, pipe, prop } from 'ramda' +import { PassThrough } from 'stream' -import { DBEvent, Event } from '../types/event' -import { IEventRepository } from '../types/repositories' -import { SubscriptionFilter } from '../types/subscription' +import { DBEvent, Event } from '../@types/event' +import { IEventRepository } from '../@types/repositories' +import { SubscriptionFilter } from '../@types/subscription' import { isGenericTagQuery } from '../utils/filter' +import { toBuffer, toJSON } from '../utils/transforms' -const toBuffer = (input: any) => Buffer.from(input, 'hex') - -const fromBuffer = (input: Buffer) => input.toString('hex') export class EventRepository implements IEventRepository { public constructor(private readonly dbClient: Knex) {} - public async findByfilters(filters: SubscriptionFilter[]): Promise { + public findByfilters(filters: SubscriptionFilter[]): PassThrough { const queries = filters.map((filter) => { const builder = this.dbClient('events') @@ -53,7 +52,6 @@ export class EventRepository implements IEventRepository { }) }) - return builder }) @@ -62,31 +60,30 @@ export class EventRepository implements IEventRepository { query.union(subqueries, true) } - console.log('Query', query.toString()) - - return query.then((rows) => { - const result = rows.map( - (row) => - applySpec({ - id: pipe(prop('event_id'), fromBuffer), - kind: prop('event_kind'), - pubkey: pipe(prop('event_pubkey'), fromBuffer), - created_at: prop('event_created_at'), - content: prop('event_content'), - tags: prop('event_tags'), - sig: pipe(prop('event_signature'), fromBuffer), - })(row) as Event, - ) - - console.debug('result', result[0]) - - return result - }) + return query.stream() } public async create(event: Event): Promise { - console.log('Creating event', event) + const row = applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + })(event) + + return this.dbClient('events') + .insert(row) + .onConflict() + .ignore() + .then(prop('rowCount') as () => number) + } + + + public async upsert(event: Event): Promise { const toJSON = (input: any) => JSON.stringify(input) const row = applySpec({ @@ -101,10 +98,10 @@ export class EventRepository implements IEventRepository { return this.dbClient('events') .insert(row) - .onConflict('event_id') - .ignore() - .then( - (({ rowCount }: { rowCount: number }) => rowCount) as any, - ) + // NIP-16: Replaceable Events + .onConflict(this.dbClient.raw('(event_pubkey, event_kind) WHERE event_kind = 0 OR event_kind >= 10000 AND event_kind < 2000')) + .merge(omit(['event_pubkey', 'event_kind'])(row)) + .where('events.event_created_at', '<', row.event_created_at) + .then(prop('rowCount') as () => number) } } diff --git a/src/schemas/base-schema.ts b/src/schemas/base-schema.ts index 05e8ce3..e176a3a 100644 --- a/src/schemas/base-schema.ts +++ b/src/schemas/base-schema.ts @@ -1,10 +1,10 @@ import Schema from 'joi' -export const pubkeySchema = Schema.string().length(64) +export const pubkeySchema = Schema.string().case('lower').hex().length(64) export const kindSchema = Schema.number().min(0).multiple(1) -export const signatureSchema = Schema.string().length(128) +export const signatureSchema = Schema.string().case('lower').hex().length(128) export const subscriptionSchema = Schema.string().min(1).max(255) diff --git a/src/settings.ts b/src/settings.ts index 9650b74..c24da90 100644 --- a/src/settings.ts +++ b/src/settings.ts @@ -1,22 +1,14 @@ import { readFileSync } from 'fs' import { homedir } from 'os' import { join } from 'path' +import { mergeDeepRight } from 'ramda' import packageJson from '../package.json' +import { ISettings } from './@types/settings' -interface Info { - relay_url?: string - name?: string - description?: string - pubkey?: string - contact?: string -} +let _settings: ISettings -interface Settings { - info: Info -} - -const getDefaultSettings = (): Settings => ({ +const getDefaultSettings = (): ISettings => ({ info: { relay_url: undefined, name: `Unnamed ${packageJson.name}`, @@ -24,9 +16,30 @@ const getDefaultSettings = (): Settings => ({ pubkey: undefined, contact: undefined, }, + limits: { + event: { + eventId: { + minimumZeroBits: 0, + }, + kind: { + whitelist: [], + blacklist: [], + }, + pubkey: { + whitelist: [], + blacklist: [], + }, + }, + client: { + subscription: { + maximumCount: 10, + maximumFilters: 5, + }, + }, + }, }) -const createSettingsFromFile = (defaults: Settings) => { +const createSettingsFromFile = (defaults: ISettings) => { const contents = JSON.parse( readFileSync( join( @@ -37,23 +50,22 @@ const createSettingsFromFile = (defaults: Settings) => { ), ) - return { - info: { - ...defaults.info, - ...contents.info, - }, - } + return mergeDeepRight(defaults, contents) } -const createSettings = (): Settings => { - const defaultSettings = getDefaultSettings() +const createSettings = (): ISettings => { try { - return createSettingsFromFile(defaultSettings) + if (_settings) { + return _settings + } + _settings = createSettingsFromFile(getDefaultSettings()) + + return _settings } catch (err) { console.error('Unable to read config file. Reason: %s', err.message) - return defaultSettings + return getDefaultSettings() } } diff --git a/src/types/message-handlers.ts b/src/types/message-handlers.ts deleted file mode 100644 index 1ec0bac..0000000 --- a/src/types/message-handlers.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { WebSocket } from 'ws' - -import { Message, MessageType } from './messages' -import { IWebSocketServerAdapter } from './servers' - -export interface IMessageHandler { - canHandleMessageType(messageType: MessageType): boolean - handleMessage(message: Message, client: WebSocket, adapter: IWebSocketServerAdapter): Promise -} diff --git a/src/utils/event.ts b/src/utils/event.ts index 3d789a9..bcc9490 100644 --- a/src/utils/event.ts +++ b/src/utils/event.ts @@ -1,7 +1,9 @@ import * as secp256k1 from '@noble/secp256k1' -import { CanonicalEvent, Event } from '../types/event' -import { SubscriptionFilter } from '../types/subscription' +import { applySpec, pipe, prop } from 'ramda' +import { CanonicalEvent, Event } from '../@types/event' +import { SubscriptionFilter } from '../@types/subscription' import { isGenericTagQuery } from './filter' +import { fromBuffer } from './transforms' export const serializeEvent = (event: Partial): CanonicalEvent => [ 0, @@ -12,6 +14,16 @@ export const serializeEvent = (event: Partial): CanonicalEvent => [ event.content, ] +export const toNostrEvent = applySpec({ + id: pipe(prop('event_id'), fromBuffer), + kind: prop('event_kind'), + pubkey: pipe(prop('event_pubkey'), fromBuffer), + created_at: prop('event_created_at'), + content: prop('event_content'), + tags: prop('event_tags'), + sig: pipe(prop('event_signature'), fromBuffer), +}) + export const isEventMatchingFilter = (filter: SubscriptionFilter) => (event: Event): boolean => { @@ -62,3 +74,15 @@ export const isEventMatchingFilter = export const isEventSignatureValid = async (event: Event): Promise => { return secp256k1.schnorr.verify(event.sig, event.id, event.pubkey) } + +export const isReplaceableEvent = (event: Event): boolean => { + return event.kind >= 10000 && event.kind < 20000 +} + +export const isEphemeralEvent = (event: Event): boolean => { + return event.kind >= 20000 && event.kind < 30000 +} + +export const isNullEvent = (event: Event): boolean => { + return event.kind === Number.MAX_SAFE_INTEGER +} diff --git a/src/utils/hash-event.ts b/src/utils/hash-event.ts index 4a0ee83..88f6d2e 100644 --- a/src/utils/hash-event.ts +++ b/src/utils/hash-event.ts @@ -1,6 +1,6 @@ import { createHash } from 'crypto' -import { Event } from '../types/event' +import { Event } from '../@types/event' import { serializeEvent } from './serialize-event' export const getEventHash = (event: Event) => { diff --git a/src/utils/serialize-event.ts b/src/utils/serialize-event.ts index 3732b4a..13e7d22 100644 --- a/src/utils/serialize-event.ts +++ b/src/utils/serialize-event.ts @@ -1,4 +1,4 @@ -import { Event } from '../types/event' +import { Event } from '../@types/event' export const serializeEvent = ({ pubkey, diff --git a/src/utils/transforms.ts b/src/utils/transforms.ts new file mode 100644 index 0000000..6b099e9 --- /dev/null +++ b/src/utils/transforms.ts @@ -0,0 +1,26 @@ +import { Transform, Writable } from 'stream' + +export const toJSON = (input: any) => JSON.stringify(input) + +export const toBuffer = (input: any) => Buffer.from(input, 'hex') + +export const fromBuffer = (input: Buffer) => input.toString('hex') + +export const streamMap = (fn: (chunk) => any) => new Transform({ + objectMode: true, + transform(chunk, _encoding, callback) { + callback(null, fn(chunk)) + } +}) + +export const streamEach = (writeFn: (chunk: any) => void, finalFn: () => void) => new Writable({ + objectMode: true, + write(chunk, _encoding, callback) { + writeFn(chunk) + callback() + }, + final(callback) { + finalFn() + callback() + }, +}) diff --git a/test/unit/events.spec.ts b/test/unit/events.spec.ts index 0207507..c6ff696 100644 --- a/test/unit/events.spec.ts +++ b/test/unit/events.spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai' -import { Event, CanonicalEvent } from '../../src/types/event' +import { Event, CanonicalEvent } from '../../src/@types/event' import { isEventMatchingFilter, isEventSignatureValid, serializeEvent } from '../../src/utils/event' import { EventKinds } from '../../src/constants/base' @@ -227,4 +227,4 @@ describe('isEventSignatureValid', () => { await isEventSignatureValid(event) ).to.be.false }) -}) \ No newline at end of file +})