mirror of
https://github.com/Cameri/nostream.git
synced 2025-04-12 05:39:06 +02:00
chore: enforce max subscription & filter limits
This commit is contained in:
parent
9f8ccf21c9
commit
525630a49c
@ -1,4 +1,5 @@
|
||||
import { EventEmitter } from 'node:stream'
|
||||
import { SubscriptionFilter } from './subscription'
|
||||
|
||||
export interface IWebSocketServerAdapter extends EventEmitter {
|
||||
getConnectedClients(): number
|
||||
@ -10,4 +11,6 @@ export interface IWebServerAdapter extends EventEmitter {
|
||||
}
|
||||
|
||||
|
||||
export type IWebSocketAdapter = EventEmitter
|
||||
export type IWebSocketAdapter = EventEmitter & {
|
||||
getSubscriptions(): Map<string, Set<SubscriptionFilter>>
|
||||
}
|
||||
|
@ -14,11 +14,12 @@ export type IncomingMessage =
|
||||
| SubscribeMessage
|
||||
| IncomingEventMessage
|
||||
| UnsubscribeMessage
|
||||
| Notice
|
||||
|
||||
|
||||
export type OutgoingMessage =
|
||||
| OutgoingEventMessage
|
||||
| EndOfStoredEventsNotice
|
||||
| NoticeMessage
|
||||
|
||||
export type SubscribeMessage = {
|
||||
[index in Range<2, 100>]: SubscriptionFilter
|
||||
@ -44,7 +45,7 @@ export interface UnsubscribeMessage {
|
||||
1: SubscriptionId
|
||||
}
|
||||
|
||||
export interface Notice {
|
||||
export interface NoticeMessage {
|
||||
0: MessageType.NOTICE
|
||||
1: string
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { anyPass, map } from 'ramda'
|
||||
import { anyPass, equals, map, uniqWith } from 'ramda'
|
||||
import { pipeline } from 'stream/promises'
|
||||
|
||||
import { createEndOfStoredEventsNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
|
||||
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
|
||||
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
|
||||
import { isEventMatchingFilter, toNostrEvent } from '../utils/event'
|
||||
import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream'
|
||||
@ -9,10 +9,10 @@ import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
|
||||
import { Event } from '../@types/event'
|
||||
import { IEventRepository } from '../@types/repositories'
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { Settings } from '../utils/settings'
|
||||
import { SubscribeMessage } from '../@types/messages'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
|
||||
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
private readonly abortController: AbortController
|
||||
|
||||
@ -29,7 +29,13 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
|
||||
public async handleMessage(message: SubscribeMessage): Promise<void> {
|
||||
const subscriptionId = message[1] as SubscriptionId
|
||||
const filters = message.slice(2) as SubscriptionFilter[]
|
||||
const filters = uniqWith(equals, message.slice(2)) as SubscriptionFilter[]
|
||||
|
||||
const reason = this.canSubscribe(subscriptionId, filters)
|
||||
if (reason) {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Subscription request rejected: ${reason}`))
|
||||
return
|
||||
}
|
||||
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, new Set(filters))
|
||||
|
||||
@ -59,4 +65,20 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
}
|
||||
}
|
||||
|
||||
private canSubscribe(subscriptionId: string, filters: SubscriptionFilter[]): string | undefined {
|
||||
const maxSubscriptions = Settings.limits.client.subscription.maxSubscriptions
|
||||
if (maxSubscriptions > 0) {
|
||||
const subscriptions = this.webSocket.getSubscriptions()
|
||||
if (!subscriptions.has(subscriptionId) && subscriptions.size + 1 > maxSubscriptions) {
|
||||
return `Too many subscriptions: Number of subscriptions must be less than ${maxSubscriptions}`
|
||||
}
|
||||
}
|
||||
|
||||
const maxFilters = Settings.limits.client.subscription.maxFilters
|
||||
if (maxFilters > 0) {
|
||||
if (filters.length > maxFilters) {
|
||||
return `Too many filters: Number of filters per susbscription must be less or equal to ${maxFilters}`
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
import {
|
||||
EndOfStoredEventsNotice,
|
||||
MessageType,
|
||||
Notice,
|
||||
NoticeMessage,
|
||||
OutgoingMessage,
|
||||
} from '../@types/messages'
|
||||
import { Event } from '../@types/event'
|
||||
import { SubscriptionId } from '../@types/subscription'
|
||||
|
||||
export const createNotice = (notice: string): Notice => {
|
||||
export const createNoticeMessage = (notice: string): NoticeMessage => {
|
||||
return [MessageType.NOTICE, notice]
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,12 @@
|
||||
import { expect } from 'chai'
|
||||
|
||||
import { createEndOfStoredEventsNoticeMessage, createNotice, createOutgoingEventMessage } from '../../../src/utils/messages'
|
||||
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../../../src/utils/messages'
|
||||
import { Event } from '../../../src/@types/event'
|
||||
import { MessageType } from '../../../src/@types/messages'
|
||||
|
||||
describe('createNotice', () => {
|
||||
it('returns a notice message', () => {
|
||||
expect(createNotice('some notice')).to.deep.equal([MessageType.NOTICE, 'some notice'])
|
||||
expect(createNoticeMessage('some notice')).to.deep.equal([MessageType.NOTICE, 'some notice'])
|
||||
})
|
||||
})
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user