chore: refactor findByFilters

test: improve schema unit tests
chore: improve event listener cleanup
This commit is contained in:
Ricardo Arturo Cabral Mejia 2022-08-15 03:35:47 +00:00
parent fb2a2c98f1
commit 86f1382ed4
No known key found for this signature in database
GPG Key ID: 5931EBF43A650245
9 changed files with 31 additions and 22 deletions

View File

@ -1,9 +1,15 @@
import { PassThrough } from 'stream'
import { Event } from './event'
import { DBEvent, Event } from './event'
import { SubscriptionFilter } from './subscription'
export type ExposedPromiseKeys = 'then' | 'catch' | 'finally'
export interface IQueryResult<T> extends Pick<Promise<T>, keyof Promise<T> & ExposedPromiseKeys> {
stream(): PassThrough & AsyncIterable<T>
}
export interface IEventRepository {
create(event: Event): Promise<number>
upsert(event: Event): Promise<number>
findByfilters(filters: SubscriptionFilter[]): PassThrough
findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]>
}

View File

@ -27,7 +27,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
this.client
.on('message', this.onClientMessage.bind(this))
.once('close', this.onClientClose.bind(this))
.on('close', this.onClientClose.bind(this))
.on('pong', this.onClientPong.bind(this))
this
@ -91,7 +91,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
if (typeof messageHandler.abort === 'function') {
abort = messageHandler.abort.bind(messageHandler)
this.client.once('close', abort)
this.client.prependOnceListener('close', abort)
}
await messageHandler?.handleMessage(message)
@ -105,7 +105,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}
} finally {
if (abort) {
this.client.removeEventListener('close', abort)
this.client.removeListener('close', abort)
}
}
}
@ -120,5 +120,6 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
console.debug(`client disconnected code ${code} - ${connected}/${this.webSocketServer.getClients().size} clients connected`)
this.removeAllListeners()
this.client.removeAllListeners()
}
}

View File

@ -34,7 +34,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
const sendEvent = (event: Event) => this.webSocket.sendMessage(createOutgoingEventMessage(subscriptionId, event))
const sendEOSE = () => this.webSocket.sendMessage(createEndOfStoredEventsNoticeMessage(subscriptionId))
const findEvents = this.eventRepository.findByfilters(filters)
const findEvents = this.eventRepository.findByFilters(filters).stream()
try {
await pipeline(
findEvents,

View File

@ -3,7 +3,7 @@ import { applySpec, omit, pipe, prop } from 'ramda'
import { PassThrough } from 'stream'
import { DBEvent, Event } from '../@types/event'
import { IEventRepository } from '../@types/repositories'
import { IEventRepository, IQueryResult } from '../@types/repositories'
import { SubscriptionFilter } from '../@types/subscription'
import { isGenericTagQuery } from '../utils/filter'
import { toBuffer, toJSON } from '../utils/transform'
@ -14,7 +14,10 @@ const evenLengthTruncate = (input: string) => input.substring(0, input.length >>
export class EventRepository implements IEventRepository {
public constructor(private readonly dbClient: Knex) {}
public findByfilters(filters: SubscriptionFilter[]): PassThrough {
public findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]> {
if (!Array.isArray(filters) || !filters.length) {
throw new Error('Filters cannot be empty')
}
const queries = filters.map((filter) => {
const builder = this.dbClient<DBEvent>('events')
@ -89,7 +92,7 @@ export class EventRepository implements IEventRepository {
console.log('query', query.toString())
return query.stream()
return query
}
public async create(event: Event): Promise<number> {

View File

@ -1,19 +1,20 @@
import Schema from 'joi'
export const prefixSchema = Schema.string().case('lower').hex().min(1).max(64)
export const prefixSchema = Schema.string().case('lower').hex().min(1).max(64).label('prefix')
export const idSchema = Schema.string().case('lower').hex().length(64)
export const idSchema = Schema.string().case('lower').hex().length(64).label('id')
export const pubkeySchema = Schema.string().case('lower').hex().length(64)
export const pubkeySchema = Schema.string().case('lower').hex().length(64).label('pubkey')
export const kindSchema = Schema.number().min(0).multiple(1)
export const kindSchema = Schema.number().min(0).multiple(1).label('kind')
export const signatureSchema = Schema.string().case('lower').hex().length(128)
export const signatureSchema = Schema.string().case('lower').hex().length(128).label('sig')
export const subscriptionSchema = Schema.string().min(1).max(255)
export const subscriptionSchema = Schema.string().min(1).max(255).label('subscriptionId')
// [<string>, <string> 0..*]
export const tagSchema = Schema.array()
.ordered(Schema.string().max(255).required())
.items(Schema.string().allow('').max(1024))
.ordered(Schema.string().max(255).required().label('identifier'))
.items(Schema.string().allow('').max(1024).label('value'))
.max(10)
.label('tag')

View File

@ -3,8 +3,8 @@ import Schema from 'joi'
import { kindSchema, prefixSchema } from './base-schema'
export const filterSchema = Schema.object({
ids: Schema.array().items(prefixSchema).max(256),
authors: Schema.array().items(prefixSchema).max(256),
ids: Schema.array().items(prefixSchema.label('prefixOrId')).max(256),
authors: Schema.array().items(prefixSchema.label('prefixOrAuthor')).max(256),
kinds: Schema.array().items(kindSchema).max(20),
since: Schema.number().min(0).multiple(1),
until: Schema.number().min(0).multiple(1),

View File

@ -21,7 +21,6 @@ export const closeMessageSchema = Schema.array().ordered(
subscriptionSchema.required().label('subscriptionId'),
).label('CLOSE message')
export const messageSchema = Schema.alternatives()
.conditional(Joi.ref('.'), {
switch: [

View File

@ -4,7 +4,7 @@ import { applySpec, pipe, prop } from 'ramda'
import { CanonicalEvent, Event } from '../@types/event'
import { SubscriptionFilter } from '../@types/subscription'
import { isGenericTagQuery } from './filter'
import { fromBuffer } from './stream'
import { fromBuffer } from './transform'
export const serializeEvent = (event: Partial<Event>): CanonicalEvent => [
0,

View File

@ -1,7 +1,6 @@
import * as chai from 'chai'
import * as sinon from 'sinon'
import sinonChai from 'sinon-chai'
import { PassThrough } from 'stream'
import { streamEach, streamEnd, streamMap } from '../../../src/utils/stream'