mirror of
https://github.com/Cameri/nostream.git
synced 2025-06-08 22:19:11 +02:00
308 lines
9.7 KiB
TypeScript
308 lines
9.7 KiB
TypeScript
import { always } from 'ramda'
|
|
import chai from 'chai'
|
|
import chaiAsPromised from 'chai-as-promised'
|
|
import EventEmitter from 'events'
|
|
import Sinon from 'sinon'
|
|
|
|
import { IAbortable, IMessageHandler } from '../../../src/@types/message-handlers'
|
|
import { MessageType, SubscribeMessage } from '../../../src/@types/messages'
|
|
import { SubscriptionFilter, SubscriptionId } from '../../../src/@types/subscription'
|
|
import { Event } from '../../../src/@types/event'
|
|
import { IEventRepository } from '../../../src/@types/repositories'
|
|
import { IWebSocketAdapter } from '../../../src/@types/adapters'
|
|
import { PassThrough } from 'stream'
|
|
import { SubscribeMessageHandler } from '../../../src/handlers/subscribe-message-handler'
|
|
import { WebSocketAdapterEvent } from '../../../src/constants/adapter'
|
|
|
|
chai.use(chaiAsPromised)
|
|
const { expect } = chai
|
|
|
|
const toDbEvent = (event: Event) => ({
|
|
event_id: Buffer.from(event.id, 'hex'),
|
|
event_kind: event.kind,
|
|
event_pubkey: Buffer.from(event.pubkey, 'hex'),
|
|
event_created_at: event.created_at,
|
|
event_content: event.content,
|
|
event_tags: event.tags,
|
|
event_signature: Buffer.from(event.sig, 'hex'),
|
|
})
|
|
|
|
describe('SubscribeMessageHandler', () => {
|
|
const subscriptionId: SubscriptionId = 'subscriptionId'
|
|
let filters: SubscriptionFilter[]
|
|
let subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
|
|
let handler: IMessageHandler & IAbortable
|
|
let webSocket: IWebSocketAdapter
|
|
let eventRepository: IEventRepository
|
|
let message: SubscribeMessage
|
|
let stream: PassThrough
|
|
let settingsFactory: Sinon.SinonStub
|
|
let webSocketGetSubscriptionsStub: Sinon.SinonStub
|
|
let eventRepositoryFindByFiltersStub: Sinon.SinonSpy
|
|
|
|
let sandbox: Sinon.SinonSandbox
|
|
|
|
beforeEach(() => {
|
|
sandbox = Sinon.createSandbox()
|
|
filters = [{}]
|
|
subscriptions = new Map()
|
|
webSocket = new EventEmitter() as any
|
|
webSocketGetSubscriptionsStub = sandbox.stub().returns(subscriptions)
|
|
webSocket.getSubscriptions = webSocketGetSubscriptionsStub
|
|
settingsFactory = sandbox.stub()
|
|
stream = new PassThrough({
|
|
objectMode: true,
|
|
})
|
|
eventRepositoryFindByFiltersStub = sandbox.fake.returns({
|
|
stream: () => stream,
|
|
})
|
|
eventRepository = {
|
|
findByFilters: eventRepositoryFindByFiltersStub,
|
|
} as any
|
|
handler = new SubscribeMessageHandler(
|
|
webSocket,
|
|
eventRepository,
|
|
settingsFactory,
|
|
)
|
|
})
|
|
|
|
afterEach(() => {
|
|
sandbox.restore()
|
|
webSocket.removeAllListeners()
|
|
})
|
|
|
|
describe('#handleMessage', () => {
|
|
let webSocketOnMessageStub: Sinon.SinonStub
|
|
let webSocketOnSubscribeStub: Sinon.SinonStub
|
|
let canSubscribeStub: Sinon.SinonStub
|
|
let fetchAndSendStub: Sinon.SinonStub
|
|
|
|
beforeEach(() => {
|
|
webSocketOnMessageStub = sandbox.stub()
|
|
webSocketOnSubscribeStub = sandbox.stub()
|
|
webSocket.on(WebSocketAdapterEvent.Message, webSocketOnMessageStub)
|
|
webSocket.on(WebSocketAdapterEvent.Subscribe, webSocketOnSubscribeStub)
|
|
|
|
fetchAndSendStub = sandbox.stub(SubscribeMessageHandler.prototype, 'fetchAndSend' as any)
|
|
canSubscribeStub = sandbox.stub(SubscribeMessageHandler.prototype, 'canSubscribe' as any)
|
|
})
|
|
|
|
it('emits notice message if subscription is rejected', async () => {
|
|
canSubscribeStub.returns('reason')
|
|
message = [MessageType.REQ, subscriptionId, ...filters] as any
|
|
|
|
await handler.handleMessage(message)
|
|
|
|
expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly(
|
|
['NOTICE', 'Subscription rejected: reason']
|
|
)
|
|
})
|
|
|
|
it('emits subscribe event if subscription is accepted', async () => {
|
|
canSubscribeStub.returns(undefined)
|
|
message = [MessageType.REQ, subscriptionId, ...filters] as any
|
|
|
|
await handler.handleMessage(message)
|
|
|
|
expect(webSocketOnSubscribeStub).to.have.been.calledOnceWith(subscriptionId)
|
|
expect(fetchAndSendStub).to.have.been.calledOnceWithExactly(subscriptionId, filters)
|
|
})
|
|
|
|
})
|
|
|
|
describe('#fetchAndSend', () => {
|
|
let event: Event
|
|
let webSocketOnMessageStub: Sinon.SinonStub
|
|
let webSocketOnSubscribeStub: Sinon.SinonStub
|
|
let isClientSubscribedToEventStub: Sinon.SinonStub
|
|
|
|
beforeEach(() => {
|
|
event = {
|
|
'id': 'b1601d26958e6508b7b9df0af609c652346c09392b6534d93aead9819a51b4ef',
|
|
'pubkey': '22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793',
|
|
'created_at': 1648339664,
|
|
'kind': 1,
|
|
'tags': [],
|
|
'content': 'learning terraform rn!',
|
|
'sig': 'ec8b2bc640c8c7e92fbc0e0a6f539da2635068a99809186f15106174d727456132977c78f3371d0ab01c108173df75750f33d8e04c4d7980bbb3fb70ba1e3848',
|
|
}
|
|
|
|
isClientSubscribedToEventStub = sandbox.stub(SubscribeMessageHandler, 'isClientSubscribedToEvent' as any)
|
|
|
|
webSocketOnMessageStub = sandbox.stub()
|
|
webSocketOnSubscribeStub = sandbox.stub()
|
|
webSocket.on(WebSocketAdapterEvent.Message, webSocketOnMessageStub)
|
|
webSocket.on(WebSocketAdapterEvent.Subscribe, webSocketOnSubscribeStub)
|
|
//streamEndSpy = sandbox.spy(Stream, '_end' as any)
|
|
})
|
|
|
|
it('does not send event if client is not subscribed to it', async () => {
|
|
isClientSubscribedToEventStub.returns(always(false))
|
|
|
|
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
|
|
|
|
stream.write(toDbEvent(event))
|
|
stream.end()
|
|
|
|
await promise
|
|
|
|
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
|
|
})
|
|
|
|
it('sends event if client is subscribed', async () => {
|
|
isClientSubscribedToEventStub.returns(always(true))
|
|
|
|
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
|
|
|
|
stream.write(toDbEvent(event))
|
|
stream.end()
|
|
|
|
await promise
|
|
|
|
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
|
|
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
|
|
['EVENT', subscriptionId, event],
|
|
)
|
|
})
|
|
|
|
it('sends EOSE', async () => {
|
|
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
|
|
|
|
stream.end()
|
|
|
|
await promise
|
|
|
|
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
|
|
['EOSE', subscriptionId],
|
|
)
|
|
})
|
|
|
|
it('ends event stream if error occurs', async () => {
|
|
const error = new Error('mistakes were made')
|
|
isClientSubscribedToEventStub.returns(always(true))
|
|
|
|
const fetch = () => (handler as any).fetchAndSend(subscriptionId, filters)
|
|
|
|
const promise = fetch()
|
|
|
|
stream.emit('error', error)
|
|
|
|
const closeSpy = sandbox.spy()
|
|
stream.once('close', closeSpy)
|
|
|
|
await expect(promise).to.eventually.be.rejectedWith(error)
|
|
expect(closeSpy).to.have.been.called
|
|
})
|
|
})
|
|
|
|
describe('.isClientSubscribedToEvent', () => {
|
|
it('returns false if event matches no filter', () => {
|
|
const filters: SubscriptionFilter[] = [{ ids: ['aa'] }]
|
|
const event: Event = { id: 'bb' } as any
|
|
|
|
expect((SubscribeMessageHandler as any).isClientSubscribedToEvent(filters)(event)).to.be.false
|
|
})
|
|
|
|
it('returns true if event matches filter', () => {
|
|
const filters: SubscriptionFilter[] = [{ ids: ['aa'] }]
|
|
const event: Event = { id: 'aa' } as any
|
|
|
|
expect((SubscribeMessageHandler as any).isClientSubscribedToEvent(filters)(event)).to.be.true
|
|
})
|
|
})
|
|
|
|
describe('#canSubscribe', () => {
|
|
it('returns undefined if subscription & filter count are allowed', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxSubscriptions: 1,
|
|
maxFilters: 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters)).to.be.undefined
|
|
})
|
|
|
|
it('returns undefined if max subscription limit is disabled', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxSubscriptions: 0,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters)).to.be.undefined
|
|
})
|
|
|
|
it('returns undefined if filters limit is disabled', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxFilters: 0,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
filters = [{}]
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters)).to.be.undefined
|
|
})
|
|
|
|
it('returns reason if client is sending a duplicate subscription', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxSubscriptions: 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
filters = [{ authors: ['aa'] }]
|
|
subscriptions.set(subscriptionId, filters)
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters))
|
|
.to.equal('Duplicate subscription subscriptionId: Ignorning')
|
|
})
|
|
|
|
it('returns reason if client subscriptions exceed limits', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxSubscriptions: 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
subscriptions.set('other-sub', [])
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters)).to.equal('Too many subscriptions: Number of subscriptions must be less than or equal to 1')
|
|
})
|
|
|
|
it('returns reason if filter count exceeds limit', () => {
|
|
settingsFactory.returns({
|
|
limits: {
|
|
client: {
|
|
subscription: {
|
|
maxFilters: 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
filters = [
|
|
{}, {},
|
|
]
|
|
|
|
expect((handler as any).canSubscribe(subscriptionId, filters)).to.equal('Too many filters: Number of filters per susbscription must be less then or equal to 1')
|
|
})
|
|
})
|
|
}) |