mirror of
https://github.com/Cameri/nostream.git
synced 2025-08-02 18:52:14 +02:00
feat: rate limit msgs/events, send command results
This commit is contained in:
@@ -23,7 +23,7 @@ export type Range<F extends number, T extends number> = Exclude<
|
||||
Enumerate<F>
|
||||
>
|
||||
|
||||
export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput
|
||||
export type Factory<TOutput = any, TInput = void> = (input: TInput) => TOutput
|
||||
|
||||
export type DatabaseClient = Knex
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import { EventId, Range } from './base'
|
||||
import { SubscriptionFilter, SubscriptionId } from './subscription'
|
||||
import { Event } from './event'
|
||||
import { Range } from './base'
|
||||
|
||||
export enum MessageType {
|
||||
REQ = 'REQ',
|
||||
@@ -8,6 +8,7 @@ export enum MessageType {
|
||||
CLOSE = 'CLOSE',
|
||||
NOTICE = 'NOTICE',
|
||||
EOSE = 'EOSE',
|
||||
OK = 'OK'
|
||||
}
|
||||
|
||||
export type IncomingMessage =
|
||||
@@ -20,6 +21,7 @@ export type OutgoingMessage =
|
||||
| OutgoingEventMessage
|
||||
| EndOfStoredEventsNotice
|
||||
| NoticeMessage
|
||||
| CommandResult
|
||||
|
||||
export type SubscribeMessage = {
|
||||
[index in Range<2, 100>]: SubscriptionFilter
|
||||
@@ -51,6 +53,13 @@ export interface NoticeMessage {
|
||||
1: string
|
||||
}
|
||||
|
||||
export interface CommandResult {
|
||||
0: MessageType.OK
|
||||
1: EventId
|
||||
2: boolean
|
||||
3: string
|
||||
}
|
||||
|
||||
export interface EndOfStoredEventsNotice {
|
||||
0: MessageType.EOSE
|
||||
1: SubscriptionId
|
||||
|
@@ -13,6 +13,8 @@ import { attemptValidation } from '../utils/validation'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { Event } from '../@types/event'
|
||||
import { Factory } from '../@types/base'
|
||||
import { IRateLimiter } from '../@types/utils'
|
||||
import { ISettings } from '../@types/settings'
|
||||
import { isEventMatchingFilter } from '../utils/event'
|
||||
import { messageSchema } from '../schemas/message-schema'
|
||||
|
||||
@@ -21,7 +23,7 @@ const debugHeartbeat = debug.extend('heartbeat')
|
||||
|
||||
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
|
||||
public clientId: string
|
||||
// private clientAddress: string
|
||||
private clientAddress: string
|
||||
private alive: boolean
|
||||
private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
|
||||
|
||||
@@ -30,13 +32,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
private readonly request: IncomingHttpMessage,
|
||||
private readonly webSocketServer: IWebSocketServerAdapter,
|
||||
private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
|
||||
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
|
||||
private readonly settingsFactory: Factory<ISettings>,
|
||||
) {
|
||||
super()
|
||||
this.alive = true
|
||||
this.subscriptions = new Map()
|
||||
|
||||
this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
|
||||
// this.clientAddress = this.request.headers['x-forwarded-for'] as string
|
||||
this.clientAddress = (this.request.headers['x-forwarded-for'] ?? this.request.socket.remoteAddress) as string
|
||||
|
||||
debug('client %s from address %s', this.clientId, this.clientAddress)
|
||||
|
||||
this.client
|
||||
.on('message', this.onClientMessage.bind(this))
|
||||
@@ -120,10 +126,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
private async onClientMessage(raw: Buffer) {
|
||||
let abort: () => void
|
||||
try {
|
||||
if (await this.isRateLimited(this.clientAddress)) {
|
||||
this.sendMessage(createNoticeMessage('rate limited'))
|
||||
return
|
||||
}
|
||||
|
||||
const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))
|
||||
|
||||
const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
|
||||
if (typeof messageHandler.abort === 'function') {
|
||||
if (typeof messageHandler?.abort === 'function') {
|
||||
abort = messageHandler.abort.bind(messageHandler)
|
||||
this.client.prependOnceListener('close', abort)
|
||||
}
|
||||
@@ -145,6 +156,36 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
}
|
||||
}
|
||||
|
||||
private async isRateLimited(client: string): Promise<boolean> {
|
||||
const {
|
||||
rateLimits,
|
||||
ipWhitelist = [],
|
||||
} = this.settingsFactory().limits?.message ?? {}
|
||||
|
||||
if (ipWhitelist.includes(client)) {
|
||||
debug('rate limit check %s: skipped', client)
|
||||
return false
|
||||
}
|
||||
|
||||
const rateLimiter = this.slidingWindowRateLimiter()
|
||||
|
||||
const hit = (period: number, rate: number) =>
|
||||
rateLimiter.hit(
|
||||
`${client}:message:${period}`,
|
||||
1,
|
||||
{ period: period, rate: rate },
|
||||
)
|
||||
|
||||
const hits = await Promise.all(
|
||||
rateLimits
|
||||
.map(({ period, rate }) => hit(period, rate))
|
||||
)
|
||||
|
||||
debug('rate limit check %s: %o = %o', client, rateLimits.map(({ period }) => period), hits)
|
||||
|
||||
return hits.some((thresholdCrossed) => thresholdCrossed)
|
||||
}
|
||||
|
||||
private onClientPong() {
|
||||
debugHeartbeat('client %s pong', this.clientId)
|
||||
this.alive = true
|
||||
|
@@ -1,9 +1,11 @@
|
||||
import { IncomingMessage } from 'http'
|
||||
import { WebSocket } from 'ws'
|
||||
|
||||
import { createSettings } from './settings-factory'
|
||||
import { IEventRepository } from '../@types/repositories'
|
||||
import { IWebSocketServerAdapter } from '../@types/adapters'
|
||||
import { messageHandlerFactory } from './message-handler-factory'
|
||||
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
|
||||
import { WebSocketAdapter } from '../adapters/web-socket-adapter'
|
||||
|
||||
|
||||
@@ -14,5 +16,7 @@ export const webSocketAdapterFactory = (
|
||||
client,
|
||||
request,
|
||||
webSocketServerAdapter,
|
||||
messageHandlerFactory(eventRepository)
|
||||
messageHandlerFactory(eventRepository),
|
||||
slidingWindowRateLimiterFactory,
|
||||
createSettings,
|
||||
)
|
||||
|
@@ -1,9 +1,7 @@
|
||||
import { mergeDeepLeft } from 'ramda'
|
||||
|
||||
import { DelegatedEvent, Event } from '../@types/event'
|
||||
import { EventDelegatorMetadataKey, EventTags } from '../constants/base'
|
||||
import { createCommandResult } from '../utils/messages'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { createNoticeMessage } from '../utils/messages'
|
||||
import { DelegatedEvent } from '../@types/event'
|
||||
import { EventMessageHandler } from './event-message-handler'
|
||||
import { IMessageHandler } from '../@types/message-handlers'
|
||||
import { IncomingEventMessage } from '../@types/messages'
|
||||
@@ -14,33 +12,39 @@ const debug = createLogger('delegated-event-message-handler')
|
||||
|
||||
export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler {
|
||||
public async handleMessage(message: IncomingEventMessage): Promise<void> {
|
||||
debug('received message: %o', message)
|
||||
const [, event] = message
|
||||
|
||||
let reason = this.canAcceptEvent(event)
|
||||
let reason = await this.isEventValid(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
|
||||
return
|
||||
}
|
||||
|
||||
reason = await this.isEventValid(event)
|
||||
if (await this.isRateLimited(event)) {
|
||||
debug('event %s rejected: rate-limited')
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
|
||||
return
|
||||
}
|
||||
|
||||
reason = this.canAcceptEvent(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
|
||||
return
|
||||
}
|
||||
|
||||
const [, delegator] = event.tags.find((tag) => tag.length === 4 && tag[0] === EventTags.Delegation)
|
||||
const delegatedEvent: DelegatedEvent = mergeDeepLeft(
|
||||
event,
|
||||
{
|
||||
const delegatedEvent: DelegatedEvent = {
|
||||
...event,
|
||||
[EventDelegatorMetadataKey]: delegator,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const strategy = this.strategyFactory([delegatedEvent, this.webSocket])
|
||||
|
||||
if (typeof strategy?.execute !== 'function') {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -48,16 +52,18 @@ export class DelegatedEventMessageHandler extends EventMessageHandler implements
|
||||
await strategy.execute(delegatedEvent)
|
||||
} catch (error) {
|
||||
debug('error handling message %o: %o', message, error)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
|
||||
}
|
||||
}
|
||||
|
||||
protected async isEventValid(event: Event): Promise<string | undefined> {
|
||||
protected async isEventValid(event: DelegatedEvent): Promise<string | undefined> {
|
||||
const reason = await super.isEventValid(event)
|
||||
if (reason) {
|
||||
return reason
|
||||
}
|
||||
|
||||
if (!await isDelegatedEventValid(event)) {
|
||||
return `Event with id ${event.id} from ${event.pubkey} is invalid delegated event`
|
||||
return 'invalid: delegation verification failed'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,12 +1,13 @@
|
||||
import { EventKindsRange, ISettings } from '../@types/settings'
|
||||
import { EventKindsRange, EventRateLimit, ISettings } from '../@types/settings'
|
||||
import { getEventProofOfWork, getPubkeyProofOfWork, isEventIdValid, isEventSignatureValid } from '../utils/event'
|
||||
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
|
||||
import { createCommandResult } from '../utils/messages'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { createNoticeMessage } from '../utils/messages'
|
||||
import { Event } from '../@types/event'
|
||||
import { EventKinds } from '../constants/base'
|
||||
import { Factory } from '../@types/base'
|
||||
import { IncomingEventMessage } from '../@types/messages'
|
||||
import { IRateLimiter } from '../@types/utils'
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
@@ -16,7 +17,8 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
public constructor(
|
||||
protected readonly webSocket: IWebSocketAdapter,
|
||||
protected readonly strategyFactory: Factory<IEventStrategy<Event, Promise<void>>, [Event, IWebSocketAdapter]>,
|
||||
private readonly settings: () => ISettings
|
||||
private readonly settings: () => ISettings,
|
||||
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
|
||||
) { }
|
||||
|
||||
public async handleMessage(message: IncomingEventMessage): Promise<void> {
|
||||
@@ -26,20 +28,27 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
let reason = await this.isEventValid(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
|
||||
return
|
||||
}
|
||||
|
||||
if (await this.isRateLimited(event)) {
|
||||
debug('event %s rejected: rate-limited')
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
|
||||
return
|
||||
}
|
||||
|
||||
reason = this.canAcceptEvent(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
|
||||
return
|
||||
}
|
||||
|
||||
const strategy = this.strategyFactory([event, this.webSocket])
|
||||
|
||||
if (typeof strategy?.execute !== 'function') {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -47,6 +56,7 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
await strategy.execute(event)
|
||||
} catch (error) {
|
||||
debug('error handling message %o: %o', message, error)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,37 +65,39 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
const limits = this.settings().limits.event
|
||||
if (limits.createdAt.maxPositiveDelta > 0) {
|
||||
if (event.created_at > now + limits.createdAt.maxPositiveDelta) {
|
||||
return `created_at is more than ${limits.createdAt.maxPositiveDelta} seconds in the future`
|
||||
return `rejected: created_at is more than ${limits.createdAt.maxPositiveDelta} seconds in the future`
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.createdAt.maxNegativeDelta > 0) {
|
||||
if (event.created_at < now - limits.createdAt.maxNegativeDelta) {
|
||||
return `created_at is more than ${limits.createdAt.maxNegativeDelta} seconds in the past`
|
||||
return `rejected: created_at is more than ${limits.createdAt.maxNegativeDelta} seconds in the past`
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.eventId.minLeadingZeroBits > 0) {
|
||||
if (getEventProofOfWork(event) < limits.eventId.minLeadingZeroBits) {
|
||||
return `insufficient proof of work: event Id has less than ${limits.eventId.minLeadingZeroBits} leading zero bits`
|
||||
const pow = getEventProofOfWork(event.id)
|
||||
if (pow < limits.eventId.minLeadingZeroBits) {
|
||||
return `pow: difficulty ${pow}<${limits.eventId.minLeadingZeroBits}`
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.pubkey.minLeadingZeroBits > 0) {
|
||||
if (getPubkeyProofOfWork(event.pubkey) < limits.pubkey.minLeadingZeroBits) {
|
||||
return `insufficient proof of work: pubkey has less than ${limits.pubkey.minLeadingZeroBits} leading zero bits`
|
||||
const pow = getPubkeyProofOfWork(event.pubkey)
|
||||
if (pow < limits.pubkey.minLeadingZeroBits) {
|
||||
return `pow: pubkey difficulty ${pow}<${limits.pubkey.minLeadingZeroBits}`
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.pubkey.whitelist.length > 0) {
|
||||
if (!limits.pubkey.whitelist.some((prefix) => event.pubkey.startsWith(prefix))) {
|
||||
return `pubkey ${event.pubkey} is not allowed`
|
||||
return 'blocked: pubkey not allowed'
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.pubkey.blacklist.length > 0) {
|
||||
if (limits.pubkey.blacklist.some((prefix) => event.pubkey.startsWith(prefix))) {
|
||||
return `pubkey ${event.pubkey} is not allowed`
|
||||
return 'blocked: pubkey not allowed'
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,23 +108,57 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
|
||||
if (limits.kind.whitelist.length > 0) {
|
||||
if (!limits.kind.whitelist.some(isEventKindMatch)) {
|
||||
return `event kind ${event.kind} is not allowed`
|
||||
return `blocked: event kind ${event.kind} not allowed`
|
||||
}
|
||||
}
|
||||
|
||||
if (limits.kind.blacklist.length > 0) {
|
||||
if (limits.kind.blacklist.some(isEventKindMatch)) {
|
||||
return `event kind ${event.kind} is not allowed`
|
||||
return `blocked: event kind ${event.kind} not allowed`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected async isEventValid(event: Event): Promise<string | undefined> {
|
||||
if (!await isEventIdValid(event)) {
|
||||
return `Event with id ${event.id} from ${event.pubkey} is not valid`
|
||||
return 'invalid: event id does not match'
|
||||
}
|
||||
if (!await isEventSignatureValid(event)) {
|
||||
return `Event with id ${event.id} from ${event.pubkey} has invalid signature`
|
||||
return 'invalid: event signature verification failed'
|
||||
}
|
||||
}
|
||||
|
||||
protected async isRateLimited(event: Event): Promise<boolean> {
|
||||
const rateLimits = this.settings().limits.event?.rateLimits
|
||||
if (!rateLimits || !rateLimits.length) {
|
||||
return
|
||||
}
|
||||
|
||||
const rateLimiter = this.slidingWindowRateLimiter()
|
||||
|
||||
const toString = (input: any | any[]): string => {
|
||||
return Array.isArray(input) ? `[${input.map(toString)}]` : input.toString()
|
||||
}
|
||||
|
||||
const hit = ({ period, rate, kinds = undefined }: EventRateLimit) => {
|
||||
const key = Array.isArray(kinds)
|
||||
? `${event.pubkey}:events:${period}:${toString(kinds)}`
|
||||
: `${event.pubkey}:events:${period}`
|
||||
|
||||
return rateLimiter.hit(
|
||||
key,
|
||||
1,
|
||||
{ period, rate },
|
||||
)
|
||||
}
|
||||
|
||||
const hits = await Promise.all(
|
||||
rateLimits
|
||||
.map(async (rateLimit) => ({ ...rateLimit, active: await hit(rateLimit) }))
|
||||
)
|
||||
|
||||
debug('rate limit check %s: %o', event.pubkey, hits)
|
||||
|
||||
return hits.some(({ active }) => active)
|
||||
}
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import { createCommandResult } from '../../utils/messages'
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
@@ -16,9 +17,10 @@ export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
const count = await this.eventRepository.create(event)
|
||||
if (!count) {
|
||||
return
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
|
||||
|
||||
if (count) {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import { createCommandResult } from '../../utils/messages'
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { EventTags } from '../../constants/base'
|
||||
@@ -16,17 +17,25 @@ export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>>
|
||||
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
await this.eventRepository.create(event)
|
||||
const count = await this.eventRepository.create(event)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
|
||||
|
||||
const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event)
|
||||
const ids = event.tags.reduce(
|
||||
(eventIds, tag) => (tag.length >= 2 && tag[0] === EventTags.Event && tag[1].length === 64)
|
||||
? [...eventIds, tag[1]]
|
||||
: eventIds,
|
||||
[] as string[]
|
||||
)
|
||||
|
||||
if (eTags.length) {
|
||||
if (ids.length) {
|
||||
await this.eventRepository.deleteByPubkeyAndIds(
|
||||
event.pubkey,
|
||||
eTags.map((tag) => tag[1])
|
||||
ids
|
||||
)
|
||||
}
|
||||
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
if (count) {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import { Event, ParameterizedReplaceableEvent } from '../../@types/event'
|
||||
import { EventDeduplicationMetadataKey, EventTags } from '../../constants/base'
|
||||
import { createCommandResult } from '../../utils/messages'
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
import { IEventStrategy } from '../../@types/message-handlers'
|
||||
@@ -8,7 +9,8 @@ import { WebSocketAdapterEvent } from '../../constants/adapter'
|
||||
|
||||
const debug = createLogger('parameterized-replaceable-event-strategy')
|
||||
|
||||
export class ParameterizedReplaceableEventStrategy implements IEventStrategy<Event, Promise<void>> {
|
||||
export class ParameterizedReplaceableEventStrategy
|
||||
implements IEventStrategy<Event, Promise<void>> {
|
||||
public constructor(
|
||||
private readonly webSocket: IWebSocketAdapter,
|
||||
private readonly eventRepository: IEventRepository,
|
||||
@@ -25,10 +27,10 @@ export class ParameterizedReplaceableEventStrategy implements IEventStrategy<Eve
|
||||
}
|
||||
|
||||
const count = await this.eventRepository.upsert(parameterizedReplaceableEvent)
|
||||
if (!count) {
|
||||
return
|
||||
}
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
|
||||
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
if (count) {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import { createCommandResult } from '../../utils/messages'
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
@@ -16,10 +17,9 @@ export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<v
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
const count = await this.eventRepository.upsert(event)
|
||||
if (!count) {
|
||||
return
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'))
|
||||
if (count) {
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
}
|
||||
|
@@ -2,11 +2,11 @@ import * as secp256k1 from '@noble/secp256k1'
|
||||
import { applySpec, converge, curry, mergeLeft, nth, omit, pipe, prop, reduceBy } from 'ramda'
|
||||
|
||||
import { CanonicalEvent, Event } from '../@types/event'
|
||||
import { EventId, Pubkey } from '../@types/base'
|
||||
import { EventKinds, EventTags } from '../constants/base'
|
||||
import { fromBuffer } from './transform'
|
||||
import { getLeadingZeroBits } from './proof-of-work'
|
||||
import { isGenericTagQuery } from './filter'
|
||||
import { Pubkey } from '../@types/base'
|
||||
import { RuneLike } from './runes/rune-like'
|
||||
import { SubscriptionFilter } from '../@types/subscription'
|
||||
|
||||
@@ -143,16 +143,6 @@ export const isDelegatedEventValid = async (event: Event): Promise<boolean> => {
|
||||
|
||||
const token = await secp256k1.utils.sha256(Buffer.from(serializedDelegationTag))
|
||||
|
||||
// Token generation to be decided:
|
||||
// const serializedDelegationTag = [
|
||||
// delegation[0], // 'delegation'
|
||||
// delegation[1], // <delegator>
|
||||
// event.pubkey, // <delegatee>
|
||||
// delegation[2], // <rules>
|
||||
// ]
|
||||
// const token = await secp256k1.utils.sha256(Buffer.from(JSON.stringify(serializedDelegationTag)))
|
||||
|
||||
// Validate delegation signature
|
||||
return secp256k1.schnorr.verify(delegation[3], token, delegation[1])
|
||||
}
|
||||
|
||||
@@ -185,8 +175,8 @@ export const isDeleteEvent = (event: Event): boolean => {
|
||||
return event.kind === EventKinds.DELETE
|
||||
}
|
||||
|
||||
export const getEventProofOfWork = (event: Event): number => {
|
||||
return getLeadingZeroBits(Buffer.from(event.id, 'hex'))
|
||||
export const getEventProofOfWork = (eventId: EventId): number => {
|
||||
return getLeadingZeroBits(Buffer.from(eventId, 'hex'))
|
||||
}
|
||||
|
||||
export const getPubkeyProofOfWork = (pubkey: Pubkey): number => {
|
||||
|
@@ -5,6 +5,7 @@ import {
|
||||
OutgoingMessage,
|
||||
} from '../@types/messages'
|
||||
import { Event } from '../@types/event'
|
||||
import { EventId } from '../@types/base'
|
||||
import { SubscriptionId } from '../@types/subscription'
|
||||
|
||||
export const createNoticeMessage = (notice: string): NoticeMessage => {
|
||||
@@ -18,8 +19,14 @@ export const createOutgoingEventMessage = (
|
||||
return [MessageType.EVENT, subscriptionId, event]
|
||||
}
|
||||
|
||||
// NIP-15
|
||||
export const createEndOfStoredEventsNoticeMessage = (
|
||||
subscriptionId: SubscriptionId,
|
||||
): EndOfStoredEventsNotice => {
|
||||
return [MessageType.EOSE, subscriptionId]
|
||||
}
|
||||
|
||||
// NIP-20
|
||||
export const createCommandResult = (eventId: EventId, successful: boolean, message = '') => {
|
||||
return [MessageType.OK, eventId, successful, message]
|
||||
}
|
||||
|
Reference in New Issue
Block a user