diff --git a/src/@types/repositories.ts b/src/@types/repositories.ts index 85fdb43..47a0747 100644 --- a/src/@types/repositories.ts +++ b/src/@types/repositories.ts @@ -1,9 +1,15 @@ import { PassThrough } from 'stream' -import { Event } from './event' +import { DBEvent, Event } from './event' import { SubscriptionFilter } from './subscription' +export type ExposedPromiseKeys = 'then' | 'catch' | 'finally' + +export interface IQueryResult extends Pick, keyof Promise & ExposedPromiseKeys> { + stream(): PassThrough & AsyncIterable +} + export interface IEventRepository { create(event: Event): Promise upsert(event: Event): Promise - findByfilters(filters: SubscriptionFilter[]): PassThrough + findByFilters(filters: SubscriptionFilter[]): IQueryResult } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index bce824c..c9f9674 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -27,7 +27,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter this.client .on('message', this.onClientMessage.bind(this)) - .once('close', this.onClientClose.bind(this)) + .on('close', this.onClientClose.bind(this)) .on('pong', this.onClientPong.bind(this)) this @@ -91,7 +91,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable if (typeof messageHandler.abort === 'function') { abort = messageHandler.abort.bind(messageHandler) - this.client.once('close', abort) + this.client.prependOnceListener('close', abort) } await messageHandler?.handleMessage(message) @@ -105,7 +105,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } } finally { if (abort) { - this.client.removeEventListener('close', abort) + this.client.removeListener('close', abort) } } } @@ -120,5 +120,6 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter console.debug(`client disconnected code ${code} - ${connected}/${this.webSocketServer.getClients().size} clients connected`) this.removeAllListeners() + this.client.removeAllListeners() } } diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index cb9786b..ecbf0d2 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -34,7 +34,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { const sendEvent = (event: Event) => this.webSocket.sendMessage(createOutgoingEventMessage(subscriptionId, event)) const sendEOSE = () => this.webSocket.sendMessage(createEndOfStoredEventsNoticeMessage(subscriptionId)) - const findEvents = this.eventRepository.findByfilters(filters) + const findEvents = this.eventRepository.findByFilters(filters).stream() try { await pipeline( findEvents, diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 6a3b2ee..1b41854 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -3,7 +3,7 @@ import { applySpec, omit, pipe, prop } from 'ramda' import { PassThrough } from 'stream' import { DBEvent, Event } from '../@types/event' -import { IEventRepository } from '../@types/repositories' +import { IEventRepository, IQueryResult } from '../@types/repositories' import { SubscriptionFilter } from '../@types/subscription' import { isGenericTagQuery } from '../utils/filter' import { toBuffer, toJSON } from '../utils/transform' @@ -14,7 +14,10 @@ const evenLengthTruncate = (input: string) => input.substring(0, input.length >> export class EventRepository implements IEventRepository { public constructor(private readonly dbClient: Knex) {} - public findByfilters(filters: SubscriptionFilter[]): PassThrough { + public findByFilters(filters: SubscriptionFilter[]): IQueryResult { + if (!Array.isArray(filters) || !filters.length) { + throw new Error('Filters cannot be empty') + } const queries = filters.map((filter) => { const builder = this.dbClient('events') @@ -89,7 +92,7 @@ export class EventRepository implements IEventRepository { console.log('query', query.toString()) - return query.stream() + return query } public async create(event: Event): Promise { diff --git a/src/schemas/base-schema.ts b/src/schemas/base-schema.ts index 84ed177..7486029 100644 --- a/src/schemas/base-schema.ts +++ b/src/schemas/base-schema.ts @@ -1,19 +1,20 @@ import Schema from 'joi' -export const prefixSchema = Schema.string().case('lower').hex().min(1).max(64) +export const prefixSchema = Schema.string().case('lower').hex().min(1).max(64).label('prefix') -export const idSchema = Schema.string().case('lower').hex().length(64) +export const idSchema = Schema.string().case('lower').hex().length(64).label('id') -export const pubkeySchema = Schema.string().case('lower').hex().length(64) +export const pubkeySchema = Schema.string().case('lower').hex().length(64).label('pubkey') -export const kindSchema = Schema.number().min(0).multiple(1) +export const kindSchema = Schema.number().min(0).multiple(1).label('kind') -export const signatureSchema = Schema.string().case('lower').hex().length(128) +export const signatureSchema = Schema.string().case('lower').hex().length(128).label('sig') -export const subscriptionSchema = Schema.string().min(1).max(255) +export const subscriptionSchema = Schema.string().min(1).max(255).label('subscriptionId') // [, 0..*] export const tagSchema = Schema.array() - .ordered(Schema.string().max(255).required()) - .items(Schema.string().allow('').max(1024)) + .ordered(Schema.string().max(255).required().label('identifier')) + .items(Schema.string().allow('').max(1024).label('value')) .max(10) + .label('tag') diff --git a/src/schemas/filter-schema.ts b/src/schemas/filter-schema.ts index 082706a..6257f60 100644 --- a/src/schemas/filter-schema.ts +++ b/src/schemas/filter-schema.ts @@ -3,8 +3,8 @@ import Schema from 'joi' import { kindSchema, prefixSchema } from './base-schema' export const filterSchema = Schema.object({ - ids: Schema.array().items(prefixSchema).max(256), - authors: Schema.array().items(prefixSchema).max(256), + ids: Schema.array().items(prefixSchema.label('prefixOrId')).max(256), + authors: Schema.array().items(prefixSchema.label('prefixOrAuthor')).max(256), kinds: Schema.array().items(kindSchema).max(20), since: Schema.number().min(0).multiple(1), until: Schema.number().min(0).multiple(1), diff --git a/src/schemas/message-schema.ts b/src/schemas/message-schema.ts index 58ed08c..25b3803 100644 --- a/src/schemas/message-schema.ts +++ b/src/schemas/message-schema.ts @@ -21,7 +21,6 @@ export const closeMessageSchema = Schema.array().ordered( subscriptionSchema.required().label('subscriptionId'), ).label('CLOSE message') - export const messageSchema = Schema.alternatives() .conditional(Joi.ref('.'), { switch: [ diff --git a/src/utils/event.ts b/src/utils/event.ts index b08e369..833ab9d 100644 --- a/src/utils/event.ts +++ b/src/utils/event.ts @@ -4,7 +4,7 @@ import { applySpec, pipe, prop } from 'ramda' import { CanonicalEvent, Event } from '../@types/event' import { SubscriptionFilter } from '../@types/subscription' import { isGenericTagQuery } from './filter' -import { fromBuffer } from './stream' +import { fromBuffer } from './transform' export const serializeEvent = (event: Partial): CanonicalEvent => [ 0, diff --git a/test/unit/utils/stream.spec.ts b/test/unit/utils/stream.spec.ts index 98d1faa..15912a9 100644 --- a/test/unit/utils/stream.spec.ts +++ b/test/unit/utils/stream.spec.ts @@ -1,7 +1,6 @@ import * as chai from 'chai' import * as sinon from 'sinon' import sinonChai from 'sinon-chai' -import { PassThrough } from 'stream' import { streamEach, streamEnd, streamMap } from '../../../src/utils/stream'