feat: rate limit msgs/events, send command results

This commit is contained in:
Ricardo Arturo Cabral Mejía
2022-11-14 23:06:52 -05:00
parent a46fcc64ce
commit ff9b87f8d6
12 changed files with 184 additions and 68 deletions

View File

@@ -23,7 +23,7 @@ export type Range<F extends number, T extends number> = Exclude<
Enumerate<F>
>
export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput
export type Factory<TOutput = any, TInput = void> = (input: TInput) => TOutput
export type DatabaseClient = Knex

View File

@@ -1,6 +1,6 @@
import { EventId, Range } from './base'
import { SubscriptionFilter, SubscriptionId } from './subscription'
import { Event } from './event'
import { Range } from './base'
export enum MessageType {
REQ = 'REQ',
@@ -8,6 +8,7 @@ export enum MessageType {
CLOSE = 'CLOSE',
NOTICE = 'NOTICE',
EOSE = 'EOSE',
OK = 'OK'
}
export type IncomingMessage =
@@ -20,6 +21,7 @@ export type OutgoingMessage =
| OutgoingEventMessage
| EndOfStoredEventsNotice
| NoticeMessage
| CommandResult
export type SubscribeMessage = {
[index in Range<2, 100>]: SubscriptionFilter
@@ -51,6 +53,13 @@ export interface NoticeMessage {
1: string
}
export interface CommandResult {
0: MessageType.OK
1: EventId
2: boolean
3: string
}
export interface EndOfStoredEventsNotice {
0: MessageType.EOSE
1: SubscriptionId

View File

@@ -13,6 +13,8 @@ import { attemptValidation } from '../utils/validation'
import { createLogger } from '../factories/logger-factory'
import { Event } from '../@types/event'
import { Factory } from '../@types/base'
import { IRateLimiter } from '../@types/utils'
import { ISettings } from '../@types/settings'
import { isEventMatchingFilter } from '../utils/event'
import { messageSchema } from '../schemas/message-schema'
@@ -21,7 +23,7 @@ const debugHeartbeat = debug.extend('heartbeat')
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
public clientId: string
// private clientAddress: string
private clientAddress: string
private alive: boolean
private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
@@ -30,13 +32,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private readonly request: IncomingHttpMessage,
private readonly webSocketServer: IWebSocketServerAdapter,
private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
private readonly settingsFactory: Factory<ISettings>,
) {
super()
this.alive = true
this.subscriptions = new Map()
this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
// this.clientAddress = this.request.headers['x-forwarded-for'] as string
this.clientAddress = (this.request.headers['x-forwarded-for'] ?? this.request.socket.remoteAddress) as string
debug('client %s from address %s', this.clientId, this.clientAddress)
this.client
.on('message', this.onClientMessage.bind(this))
@@ -120,10 +126,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private async onClientMessage(raw: Buffer) {
let abort: () => void
try {
if (await this.isRateLimited(this.clientAddress)) {
this.sendMessage(createNoticeMessage('rate limited'))
return
}
const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))
const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
if (typeof messageHandler.abort === 'function') {
if (typeof messageHandler?.abort === 'function') {
abort = messageHandler.abort.bind(messageHandler)
this.client.prependOnceListener('close', abort)
}
@@ -145,6 +156,36 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}
}
private async isRateLimited(client: string): Promise<boolean> {
const {
rateLimits,
ipWhitelist = [],
} = this.settingsFactory().limits?.message ?? {}
if (ipWhitelist.includes(client)) {
debug('rate limit check %s: skipped', client)
return false
}
const rateLimiter = this.slidingWindowRateLimiter()
const hit = (period: number, rate: number) =>
rateLimiter.hit(
`${client}:message:${period}`,
1,
{ period: period, rate: rate },
)
const hits = await Promise.all(
rateLimits
.map(({ period, rate }) => hit(period, rate))
)
debug('rate limit check %s: %o = %o', client, rateLimits.map(({ period }) => period), hits)
return hits.some((thresholdCrossed) => thresholdCrossed)
}
private onClientPong() {
debugHeartbeat('client %s pong', this.clientId)
this.alive = true

View File

@@ -1,9 +1,11 @@
import { IncomingMessage } from 'http'
import { WebSocket } from 'ws'
import { createSettings } from './settings-factory'
import { IEventRepository } from '../@types/repositories'
import { IWebSocketServerAdapter } from '../@types/adapters'
import { messageHandlerFactory } from './message-handler-factory'
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
import { WebSocketAdapter } from '../adapters/web-socket-adapter'
@@ -14,5 +16,7 @@ export const webSocketAdapterFactory = (
client,
request,
webSocketServerAdapter,
messageHandlerFactory(eventRepository)
messageHandlerFactory(eventRepository),
slidingWindowRateLimiterFactory,
createSettings,
)

View File

@@ -1,9 +1,7 @@
import { mergeDeepLeft } from 'ramda'
import { DelegatedEvent, Event } from '../@types/event'
import { EventDelegatorMetadataKey, EventTags } from '../constants/base'
import { createCommandResult } from '../utils/messages'
import { createLogger } from '../factories/logger-factory'
import { createNoticeMessage } from '../utils/messages'
import { DelegatedEvent } from '../@types/event'
import { EventMessageHandler } from './event-message-handler'
import { IMessageHandler } from '../@types/message-handlers'
import { IncomingEventMessage } from '../@types/messages'
@@ -14,33 +12,39 @@ const debug = createLogger('delegated-event-message-handler')
export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler {
public async handleMessage(message: IncomingEventMessage): Promise<void> {
debug('received message: %o', message)
const [, event] = message
let reason = this.canAcceptEvent(event)
let reason = await this.isEventValid(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}
reason = await this.isEventValid(event)
if (await this.isRateLimited(event)) {
debug('event %s rejected: rate-limited')
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
return
}
reason = this.canAcceptEvent(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}
const [, delegator] = event.tags.find((tag) => tag.length === 4 && tag[0] === EventTags.Delegation)
const delegatedEvent: DelegatedEvent = mergeDeepLeft(
event,
{
const delegatedEvent: DelegatedEvent = {
...event,
[EventDelegatorMetadataKey]: delegator,
}
)
}
const strategy = this.strategyFactory([delegatedEvent, this.webSocket])
if (typeof strategy?.execute !== 'function') {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
return
}
@@ -48,16 +52,18 @@ export class DelegatedEventMessageHandler extends EventMessageHandler implements
await strategy.execute(delegatedEvent)
} catch (error) {
debug('error handling message %o: %o', message, error)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
}
}
protected async isEventValid(event: Event): Promise<string | undefined> {
protected async isEventValid(event: DelegatedEvent): Promise<string | undefined> {
const reason = await super.isEventValid(event)
if (reason) {
return reason
}
if (!await isDelegatedEventValid(event)) {
return `Event with id ${event.id} from ${event.pubkey} is invalid delegated event`
return 'invalid: delegation verification failed'
}
}
}

View File

@@ -1,12 +1,13 @@
import { EventKindsRange, ISettings } from '../@types/settings'
import { EventKindsRange, EventRateLimit, ISettings } from '../@types/settings'
import { getEventProofOfWork, getPubkeyProofOfWork, isEventIdValid, isEventSignatureValid } from '../utils/event'
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
import { createCommandResult } from '../utils/messages'
import { createLogger } from '../factories/logger-factory'
import { createNoticeMessage } from '../utils/messages'
import { Event } from '../@types/event'
import { EventKinds } from '../constants/base'
import { Factory } from '../@types/base'
import { IncomingEventMessage } from '../@types/messages'
import { IRateLimiter } from '../@types/utils'
import { IWebSocketAdapter } from '../@types/adapters'
import { WebSocketAdapterEvent } from '../constants/adapter'
@@ -16,7 +17,8 @@ export class EventMessageHandler implements IMessageHandler {
public constructor(
protected readonly webSocket: IWebSocketAdapter,
protected readonly strategyFactory: Factory<IEventStrategy<Event, Promise<void>>, [Event, IWebSocketAdapter]>,
private readonly settings: () => ISettings
private readonly settings: () => ISettings,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
) { }
public async handleMessage(message: IncomingEventMessage): Promise<void> {
@@ -26,20 +28,27 @@ export class EventMessageHandler implements IMessageHandler {
let reason = await this.isEventValid(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}
if (await this.isRateLimited(event)) {
debug('event %s rejected: rate-limited')
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
return
}
reason = this.canAcceptEvent(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}
const strategy = this.strategyFactory([event, this.webSocket])
if (typeof strategy?.execute !== 'function') {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
return
}
@@ -47,6 +56,7 @@ export class EventMessageHandler implements IMessageHandler {
await strategy.execute(event)
} catch (error) {
debug('error handling message %o: %o', message, error)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
}
}
@@ -55,37 +65,39 @@ export class EventMessageHandler implements IMessageHandler {
const limits = this.settings().limits.event
if (limits.createdAt.maxPositiveDelta > 0) {
if (event.created_at > now + limits.createdAt.maxPositiveDelta) {
return `created_at is more than ${limits.createdAt.maxPositiveDelta} seconds in the future`
return `rejected: created_at is more than ${limits.createdAt.maxPositiveDelta} seconds in the future`
}
}
if (limits.createdAt.maxNegativeDelta > 0) {
if (event.created_at < now - limits.createdAt.maxNegativeDelta) {
return `created_at is more than ${limits.createdAt.maxNegativeDelta} seconds in the past`
return `rejected: created_at is more than ${limits.createdAt.maxNegativeDelta} seconds in the past`
}
}
if (limits.eventId.minLeadingZeroBits > 0) {
if (getEventProofOfWork(event) < limits.eventId.minLeadingZeroBits) {
return `insufficient proof of work: event Id has less than ${limits.eventId.minLeadingZeroBits} leading zero bits`
const pow = getEventProofOfWork(event.id)
if (pow < limits.eventId.minLeadingZeroBits) {
return `pow: difficulty ${pow}<${limits.eventId.minLeadingZeroBits}`
}
}
if (limits.pubkey.minLeadingZeroBits > 0) {
if (getPubkeyProofOfWork(event.pubkey) < limits.pubkey.minLeadingZeroBits) {
return `insufficient proof of work: pubkey has less than ${limits.pubkey.minLeadingZeroBits} leading zero bits`
const pow = getPubkeyProofOfWork(event.pubkey)
if (pow < limits.pubkey.minLeadingZeroBits) {
return `pow: pubkey difficulty ${pow}<${limits.pubkey.minLeadingZeroBits}`
}
}
if (limits.pubkey.whitelist.length > 0) {
if (!limits.pubkey.whitelist.some((prefix) => event.pubkey.startsWith(prefix))) {
return `pubkey ${event.pubkey} is not allowed`
return 'blocked: pubkey not allowed'
}
}
if (limits.pubkey.blacklist.length > 0) {
if (limits.pubkey.blacklist.some((prefix) => event.pubkey.startsWith(prefix))) {
return `pubkey ${event.pubkey} is not allowed`
return 'blocked: pubkey not allowed'
}
}
@@ -96,23 +108,57 @@ export class EventMessageHandler implements IMessageHandler {
if (limits.kind.whitelist.length > 0) {
if (!limits.kind.whitelist.some(isEventKindMatch)) {
return `event kind ${event.kind} is not allowed`
return `blocked: event kind ${event.kind} not allowed`
}
}
if (limits.kind.blacklist.length > 0) {
if (limits.kind.blacklist.some(isEventKindMatch)) {
return `event kind ${event.kind} is not allowed`
return `blocked: event kind ${event.kind} not allowed`
}
}
}
protected async isEventValid(event: Event): Promise<string | undefined> {
if (!await isEventIdValid(event)) {
return `Event with id ${event.id} from ${event.pubkey} is not valid`
return 'invalid: event id does not match'
}
if (!await isEventSignatureValid(event)) {
return `Event with id ${event.id} from ${event.pubkey} has invalid signature`
return 'invalid: event signature verification failed'
}
}
protected async isRateLimited(event: Event): Promise<boolean> {
const rateLimits = this.settings().limits.event?.rateLimits
if (!rateLimits || !rateLimits.length) {
return
}
const rateLimiter = this.slidingWindowRateLimiter()
const toString = (input: any | any[]): string => {
return Array.isArray(input) ? `[${input.map(toString)}]` : input.toString()
}
const hit = ({ period, rate, kinds = undefined }: EventRateLimit) => {
const key = Array.isArray(kinds)
? `${event.pubkey}:events:${period}:${toString(kinds)}`
: `${event.pubkey}:events:${period}`
return rateLimiter.hit(
key,
1,
{ period, rate },
)
}
const hits = await Promise.all(
rateLimits
.map(async (rateLimit) => ({ ...rateLimit, active: await hit(rateLimit) }))
)
debug('rate limit check %s: %o', event.pubkey, hits)
return hits.some(({ active }) => active)
}
}

View File

@@ -1,3 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories'
@@ -16,9 +17,10 @@ export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>
public async execute(event: Event): Promise<void> {
debug('received event: %o', event)
const count = await this.eventRepository.create(event)
if (!count) {
return
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
}

View File

@@ -1,3 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { EventTags } from '../../constants/base'
@@ -16,17 +17,25 @@ export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>>
public async execute(event: Event): Promise<void> {
debug('received event: %o', event)
await this.eventRepository.create(event)
const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event)
const ids = event.tags.reduce(
(eventIds, tag) => (tag.length >= 2 && tag[0] === EventTags.Event && tag[1].length === 64)
? [...eventIds, tag[1]]
: eventIds,
[] as string[]
)
if (eTags.length) {
if (ids.length) {
await this.eventRepository.deleteByPubkeyAndIds(
event.pubkey,
eTags.map((tag) => tag[1])
ids
)
}
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
}
}

View File

@@ -1,5 +1,6 @@
import { Event, ParameterizedReplaceableEvent } from '../../@types/event'
import { EventDeduplicationMetadataKey, EventTags } from '../../constants/base'
import { createCommandResult } from '../../utils/messages'
import { createLogger } from '../../factories/logger-factory'
import { IEventRepository } from '../../@types/repositories'
import { IEventStrategy } from '../../@types/message-handlers'
@@ -8,7 +9,8 @@ import { WebSocketAdapterEvent } from '../../constants/adapter'
const debug = createLogger('parameterized-replaceable-event-strategy')
export class ParameterizedReplaceableEventStrategy implements IEventStrategy<Event, Promise<void>> {
export class ParameterizedReplaceableEventStrategy
implements IEventStrategy<Event, Promise<void>> {
public constructor(
private readonly webSocket: IWebSocketAdapter,
private readonly eventRepository: IEventRepository,
@@ -25,10 +27,10 @@ export class ParameterizedReplaceableEventStrategy implements IEventStrategy<Eve
}
const count = await this.eventRepository.upsert(parameterizedReplaceableEvent)
if (!count) {
return
}
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
}
}

View File

@@ -1,3 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories'
@@ -16,10 +17,9 @@ export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<v
public async execute(event: Event): Promise<void> {
debug('received event: %o', event)
const count = await this.eventRepository.upsert(event)
if (!count) {
return
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
}

View File

@@ -2,11 +2,11 @@ import * as secp256k1 from '@noble/secp256k1'
import { applySpec, converge, curry, mergeLeft, nth, omit, pipe, prop, reduceBy } from 'ramda'
import { CanonicalEvent, Event } from '../@types/event'
import { EventId, Pubkey } from '../@types/base'
import { EventKinds, EventTags } from '../constants/base'
import { fromBuffer } from './transform'
import { getLeadingZeroBits } from './proof-of-work'
import { isGenericTagQuery } from './filter'
import { Pubkey } from '../@types/base'
import { RuneLike } from './runes/rune-like'
import { SubscriptionFilter } from '../@types/subscription'
@@ -143,16 +143,6 @@ export const isDelegatedEventValid = async (event: Event): Promise<boolean> => {
const token = await secp256k1.utils.sha256(Buffer.from(serializedDelegationTag))
// Token generation to be decided:
// const serializedDelegationTag = [
// delegation[0], // 'delegation'
// delegation[1], // <delegator>
// event.pubkey, // <delegatee>
// delegation[2], // <rules>
// ]
// const token = await secp256k1.utils.sha256(Buffer.from(JSON.stringify(serializedDelegationTag)))
// Validate delegation signature
return secp256k1.schnorr.verify(delegation[3], token, delegation[1])
}
@@ -185,8 +175,8 @@ export const isDeleteEvent = (event: Event): boolean => {
return event.kind === EventKinds.DELETE
}
export const getEventProofOfWork = (event: Event): number => {
return getLeadingZeroBits(Buffer.from(event.id, 'hex'))
export const getEventProofOfWork = (eventId: EventId): number => {
return getLeadingZeroBits(Buffer.from(eventId, 'hex'))
}
export const getPubkeyProofOfWork = (pubkey: Pubkey): number => {

View File

@@ -5,6 +5,7 @@ import {
OutgoingMessage,
} from '../@types/messages'
import { Event } from '../@types/event'
import { EventId } from '../@types/base'
import { SubscriptionId } from '../@types/subscription'
export const createNoticeMessage = (notice: string): NoticeMessage => {
@@ -18,8 +19,14 @@ export const createOutgoingEventMessage = (
return [MessageType.EVENT, subscriptionId, event]
}
// NIP-15
export const createEndOfStoredEventsNoticeMessage = (
subscriptionId: SubscriptionId,
): EndOfStoredEventsNotice => {
return [MessageType.EOSE, subscriptionId]
}
// NIP-20
export const createCommandResult = (eventId: EventId, successful: boolean, message = '') => {
return [MessageType.OK, eventId, successful, message]
}