chore: stream events

This commit is contained in:
Ricardo Arturo Cabral Mejia
2022-08-11 23:13:28 +00:00
parent e2a74fb2c0
commit a7d8561bea
36 changed files with 517 additions and 225 deletions

View File

@@ -1,7 +1,7 @@
exports.up = function (knex) {
return knex.schema.createTable('events', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('uuid_generate_v4()'))
table.binary('event_id').unique().notNullable()
table.binary('event_id').unique().notNullable().index()
table.binary('event_pubkey').notNullable().index()
table.integer('event_kind').unsigned().notNullable().index()
table.integer('event_created_at').unsigned().notNullable().index()

View File

@@ -0,0 +1,10 @@
exports.up = function (knex) {
// NIP-16: Replaceable Events
return knex.raw(
'CREATE UNIQUE INDEX replaceable_events_idx ON events ( event_pubkey, event_kind ) WHERE event_kind = 0 OR event_kind >= 10000 AND event_kind < 20000;',
)
}
exports.down = function (knex) {
return knex.raw('DROP INDEX IF EXISTS replaceable_events_idx')
}

31
package-lock.json generated
View File

@@ -13,6 +13,7 @@
"joi": "^17.6.0",
"knex": "^2.0.0",
"pg": "^8.7.3",
"pg-query-stream": "4.2.3",
"ramda": "^0.28.0",
"ws": "^8.5.0"
},
@@ -2180,6 +2181,14 @@
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz",
"integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ=="
},
"node_modules/pg-cursor": {
"version": "2.7.3",
"resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.7.3.tgz",
"integrity": "sha512-vmjXRMD4jZK/oHaaYk6clTypgHNlzCCAqyLCO5d/UeI42egJVE5H4ZfZWACub3jzkHUXXyvibH207zAJg9iBOw==",
"peerDependencies": {
"pg": "^8"
}
},
"node_modules/pg-int8": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
@@ -2201,6 +2210,14 @@
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz",
"integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ=="
},
"node_modules/pg-query-stream": {
"version": "4.2.3",
"resolved": "https://registry.npmjs.org/pg-query-stream/-/pg-query-stream-4.2.3.tgz",
"integrity": "sha512-3mrOzffAoGGi2EqsfTdKanKn444ZB+E+Gbz/EJL3rd0thlXD3kb3ZBrwX42bRnQssrEd7/kVFM1FbiIMSQ5ung==",
"dependencies": {
"pg-cursor": "^2.7.3"
}
},
"node_modules/pg-types": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",
@@ -4694,6 +4711,12 @@
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz",
"integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ=="
},
"pg-cursor": {
"version": "2.7.3",
"resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.7.3.tgz",
"integrity": "sha512-vmjXRMD4jZK/oHaaYk6clTypgHNlzCCAqyLCO5d/UeI42egJVE5H4ZfZWACub3jzkHUXXyvibH207zAJg9iBOw==",
"requires": {}
},
"pg-int8": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
@@ -4710,6 +4733,14 @@
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz",
"integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ=="
},
"pg-query-stream": {
"version": "4.2.3",
"resolved": "https://registry.npmjs.org/pg-query-stream/-/pg-query-stream-4.2.3.tgz",
"integrity": "sha512-3mrOzffAoGGi2EqsfTdKanKn444ZB+E+Gbz/EJL3rd0thlXD3kb3ZBrwX42bRnQssrEd7/kVFM1FbiIMSQ5ung==",
"requires": {
"pg-cursor": "^2.7.3"
}
},
"pg-types": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",

View File

@@ -56,6 +56,7 @@
"joi": "^17.6.0",
"knex": "^2.0.0",
"pg": "^8.7.3",
"pg-query-stream": "4.2.3",
"ramda": "^0.28.0",
"ws": "^8.5.0"
}

View File

@@ -13,3 +13,5 @@ export type Range<F extends number, T extends number> = Exclude<
Enumerate<T>,
Enumerate<F>
>
export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput

View File

@@ -1,5 +1,5 @@
import { EventKinds } from '../constants/base'
import { Pubkey, TagName } from '../types/base'
import { Pubkey, TagName } from './base'
export type EventId = string

View File

@@ -0,0 +1,15 @@
import { WebSocket } from 'ws'
import { Message } from './messages'
export interface IMessageHandler {
handleMessage(message: Message, client: WebSocket): Promise<void>
}
export interface IAbortable {
abort(): void
}
export interface IEventStrategy<TInput, TOutput> {
execute(args: TInput): TOutput
}

View File

@@ -24,7 +24,8 @@ export type SubscribeMessage = {
1: SubscriptionId
} & Array<SubscriptionFilter>
export interface IncomingEventMessage {
export type IncomingEventMessage = EventMessage & []
export interface EventMessage {
0: MessageType.EVENT
1: Event
}

View File

@@ -1,7 +1,9 @@
import { PassThrough } from 'stream'
import { Event } from './event'
import { SubscriptionFilter } from './subscription'
export interface IEventRepository {
create(event: Event): Promise<number>
findByfilters(filters: SubscriptionFilter[]): Promise<Event[]>
upsert(event: Event): Promise<number>
findByfilters(filters: SubscriptionFilter[]): PassThrough
}

View File

@@ -1,14 +1,13 @@
import { EventEmitter } from 'node:stream'
import { WebSocket } from 'ws'
import { Event } from './event'
import { IMessageHandler } from './message-handlers'
import { SubscriptionFilter, SubscriptionId } from './subscription'
export interface IWebSocketServerAdapter {
addMessageHandler(messageHandler: IMessageHandler): void
getSubscriptions(client: WebSocket): Map<SubscriptionId, SubscriptionFilter[]> | undefined
broadcastEvent(event: Event): Promise<void>
}
export interface IWebServerAdapter {
export interface IWebServerAdapter extends EventEmitter {
listen(port: number)
}
}

61
src/@types/settings.ts Normal file
View File

@@ -0,0 +1,61 @@
import { EventKinds } from '../constants/base'
import { Pubkey } from './base'
interface Info {
relay_url?: string
name?: string
description?: string
pubkey?: string
contact?: string
}
interface EventIdLimits {
minimumZeroBits?: number
}
interface PubkeyLimits {
whitelist?: Pubkey[]
blacklist?: Pubkey[]
}
interface KindLimits {
whitelist?: EventKinds[]
blacklist?: EventKinds[]
}
interface CreatedAtLimits {
/**
* Maximum number of seconds allowed before the current unix timestamp
*/
maximumNegativeSkew?: number
/**
* Maximum number of seconds allowed after the current unix timestamp
*/
maximumPositiveSkew?: number
}
interface EventLimits {
eventId?: EventIdLimits
pubkey?: PubkeyLimits
kind?: KindLimits
createdAt?: CreatedAtLimits
}
interface ClientSubscriptionLimits {
maximumCount?: number
maximumFilters?: number
}
interface ClientLimits {
subscription?: ClientSubscriptionLimits
}
interface Limits {
client?: ClientLimits
event?: EventLimits
}
export interface ISettings {
info: Info
limits?: Limits
}

View File

@@ -1,15 +1,16 @@
import { IncomingMessage, Server, ServerResponse } from 'http'
import { Duplex } from 'stream'
import { Duplex, EventEmitter } from 'stream'
import packageJson from '../../package.json'
import { Settings } from '../settings'
import { IWebServerAdapter } from '../types/servers'
import { IWebServerAdapter } from '../@types/servers'
export class WebServerAdapter implements IWebServerAdapter {
export class WebServerAdapter extends EventEmitter implements IWebServerAdapter {
public constructor(
private readonly webServer: Server,
) {
super()
this.webServer.on('request', this.onWebServerRequest.bind(this))
this.webServer.on('clientError', this.onWebServerSocketError.bind(this))
}
@@ -20,7 +21,7 @@ export class WebServerAdapter implements IWebServerAdapter {
}
private onWebServerRequest(request: IncomingMessage, response: ServerResponse) {
if (request.headers['accept'] === 'application/nostr+json') {
if (request.method === 'GET' && request.headers['accept'] === 'application/nostr+json') {
const {
info: { name, description, pubkey, contact },
} = Settings
@@ -30,7 +31,7 @@ export class WebServerAdapter implements IWebServerAdapter {
description,
pubkey,
contact,
supported_nips: [11, 12, 15],
supported_nips: [11, 12, 15, 16],
software: packageJson.repository.url,
version: packageJson.version,
}
@@ -38,7 +39,8 @@ export class WebServerAdapter implements IWebServerAdapter {
response.setHeader('content-type', 'application/nostr+json')
response.end(JSON.stringify(relayInformationDocument))
} else {
response.end()
response.setHeader('content-type', 'application/text')
response.end('Please use a Nostr client to connect.')
}
}
@@ -48,4 +50,4 @@ export class WebServerAdapter implements IWebServerAdapter {
}
socket.end('HTTP/1.1 400 Bad Request\r\n\r\n')
}
}
}

View File

@@ -3,12 +3,13 @@ import WebSocket, { WebSocketServer } from 'ws'
import { isEventMatchingFilter } from '../utils/event'
import { createOutgoingEventMessage } from '../messages'
import { Event } from '../types/event'
import { IMessageHandler } from '../types/message-handlers'
import { Message } from '../types/messages'
import { IWebSocketServerAdapter } from '../types/servers'
import { SubscriptionId, SubscriptionFilter } from '../types/subscription'
import { Event } from '../@types/event'
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
import { Message } from '../@types/messages'
import { IWebSocketServerAdapter } from '../@types/servers'
import { SubscriptionId, SubscriptionFilter } from '../@types/subscription'
import { WebServerAdapter } from './web-server-adapter'
import { Factory } from '../@types/base'
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000
@@ -26,6 +27,7 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
public constructor(
webServer: Server,
private readonly webSocketServer: WebSocketServer,
private readonly messageHandlerFactory: Factory<IMessageHandler, [Message, IWebSocketServerAdapter]>,
) {
super(webServer)
@@ -38,10 +40,6 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
}
public addMessageHandler(messageHandler: IMessageHandler): void {
this.handlers.push(messageHandler)
}
public getSubscriptions(client: WebSocket): Map<string, SubscriptionFilter[]> | undefined {
return this.subscriptions.get(client)
}
@@ -71,34 +69,34 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
this.heartbeats.set(client, true)
this.subscriptions.set(client, new Map())
client.on('message', (raw: WebSocket.RawData) => {
try {
const message = JSON.parse(raw.toString('utf-8'))
this.onWebSocketClientMessage(client, message)
} catch (error) {
console.error('Unable to parse message', raw.toString('utf-8'))
}
})
client.on('message', (raw: WebSocket.RawData) => this.onWebSocketClientMessage(client, raw))
client.on('close', (_code: number) => {
this.onWebSocketClientClose(client)
})
client.on('close', (code: number) => this.onWebSocketClientClose(client, code))
client.on('pong', () => this.onWebSocketClientPong.call(this, client))
client.on('pong', (data: Buffer) => this.onWebSocketClientPong(client, data))
}
private async onWebSocketClientMessage(client: WebSocket, message: Message) {
for (const handler of this.handlers) {
if (handler.canHandleMessageType(message[0])) {
const handled = await handler.handleMessage(message, client, this)
if (handled) {
break
}
private async onWebSocketClientMessage(client: WebSocket, raw: WebSocket.RawData) {
let abort
try {
const message = JSON.parse(raw.toString('utf-8'))
const messageHandler = this.messageHandlerFactory([message, this]) as IMessageHandler & IAbortable
if (typeof messageHandler.abort === 'function') {
abort = messageHandler.abort.bind(messageHandler)
client.once('close', abort)
}
await messageHandler?.handleMessage(message, client)
} catch (error) {
console.error(`Unable to handle message: ${error.message}`)
} finally {
if (abort) {
client.removeEventListener('close', abort)
}
}
}
private onWebSocketClientPong(client: WebSocket) {
private onWebSocketClientPong(client: WebSocket, _data: Buffer) {
this.heartbeats.set(client, true)
}
@@ -116,12 +114,14 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
}
private onWebSocketServerClose() {
console.debug('websocket server closing')
clearInterval(this.heartbeatInterval)
}
private onWebSocketClientClose(client: WebSocket) {
private onWebSocketClientClose(client: WebSocket, code: number) {
console.debug('client closing', code)
this.subscriptions.delete(client)
client.removeAllListeners()
}
}
}

View File

@@ -1,23 +0,0 @@
import { WebSocket } from 'ws'
import { IClient } from './types/clients'
import { Message } from './types/messages'
export class Client implements IClient {
public constructor(
private readonly websocket: WebSocket
) { }
public from(websocket: WebSocket): IClient {
return new Client(websocket)
}
public isConnected(): boolean {
return this.websocket.readyState === WebSocket.OPEN
}
public send(message: Message): void {
this.websocket.send(JSON.stringify(message))
}
}

View File

@@ -1,4 +1,5 @@
import 'pg'
import 'pg-query-stream'
import knex, { Knex } from 'knex'
const createDbConfig = (): Knex.Config => ({

View File

@@ -0,0 +1,27 @@
import { WebSocket } from 'ws'
import { DefaultEventStrategy } from '../handlers/event-strategies/default-event-strategy'
import { EphemeralEventStrategy } from '../handlers/event-strategies/ephemeral-event-strategy'
import { NullEventStrategy } from '../handlers/event-strategies/null-event-strategy copy'
import { ReplaceableEventStrategy } from '../handlers/event-strategies/replaceable-event-strategy'
import { Factory } from '../@types/base'
import { Event } from '../@types/event'
import { IEventStrategy } from '../@types/message-handlers'
import { IEventRepository } from '../@types/repositories'
import { IWebSocketServerAdapter } from '../@types/servers'
import { isEphemeralEvent, isNullEvent, isReplaceableEvent } from '../utils/event'
export const createEventStrategyFactory = (
adapter: IWebSocketServerAdapter,
eventRepository: IEventRepository,
): Factory<IEventStrategy<[Event, WebSocket], Promise<boolean>>, Event> => (event: Event) => {
if (isReplaceableEvent(event)) {
return new ReplaceableEventStrategy(adapter, eventRepository)
} else if (isEphemeralEvent(event)) {
return new EphemeralEventStrategy(adapter)
} else if (isNullEvent(event)) {
return new NullEventStrategy()
}
return new DefaultEventStrategy(adapter, eventRepository)
}

View File

@@ -0,0 +1,24 @@
import { EventMessageHandler } from '../handlers/event-message-handler'
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'
import { Message, MessageType } from '../@types/messages'
import { IEventRepository } from '../@types/repositories'
import { IWebSocketServerAdapter } from '../@types/servers'
import { createEventStrategyFactory } from './event-strategy-factory'
export const createMessageHandlerFactory = (
eventRepository: IEventRepository,
) => ([message, adapter]: [Message, IWebSocketServerAdapter]) => {
console.debug('Received message', message)
switch (message[0]) {
case MessageType.EVENT:
return new EventMessageHandler(createEventStrategyFactory(adapter, eventRepository))
case MessageType.REQ:
return new SubscribeMessageHandler(adapter, eventRepository)
case MessageType.CLOSE:
return new UnsubscribeMessageHandler(adapter)
default:
throw new Error(`Unknown message type: ${String(message[0])}`)
}
}

View File

@@ -1,38 +1,32 @@
import { IMessageHandler } from '../types/message-handlers'
import { MessageType, IncomingEventMessage } from '../types/messages'
import { IWebSocketServerAdapter } from '../types/servers'
import { IEventRepository } from '../types/repositories'
import { isEventSignatureValid } from '../utils/event'
import { IMessageHandler, IEventStrategy } from '../@types/message-handlers'
import { IncomingEventMessage } from '../@types/messages'
import { WebSocket } from 'ws'
import { Event } from '../@types/event'
import { Factory } from '../@types/base'
import { isEventSignatureValid } from '../utils/event'
export class EventMessageHandler implements IMessageHandler {
public constructor(
private readonly eventRepository: IEventRepository,
private readonly strategy: Factory<IEventStrategy<[Event, WebSocket], Promise<boolean>>, Event>
) { }
public canHandleMessageType(messageType: MessageType): boolean {
return messageType === MessageType.EVENT
}
public async handleMessage(message: IncomingEventMessage, client: WebSocket): Promise<void> {
const [, event] = message
if (!await isEventSignatureValid(event)) {
console.warn(`Event ${event.id} from ${event.pubkey} with signature ${event.sig} is not valid`)
return
}
public async handleMessage(message: IncomingEventMessage, _client: WebSocket, adapter: IWebSocketServerAdapter): Promise<boolean> {
if (!await isEventSignatureValid(message[1])) {
console.warn(`Event ${message[1].id} from ${message[1].pubkey} with signature ${message[1].sig} is not valid`)
const strategy = this.strategy(event)
if (typeof strategy?.execute !== 'function') {
return
}
try {
const count = await this.eventRepository.create(message[1])
if (!count) {
return true
}
await adapter.broadcastEvent(message[1])
return true
await strategy.execute([event, client])
} catch (error) {
console.error(`Unable to add event. Reason: ${error.message}`)
return false
console.error('Error handling message:', message, error)
}
}
}
}

View File

@@ -0,0 +1,31 @@
import { WebSocket } from 'ws'
import { Event } from '../../@types/event'
import { IEventStrategy } from '../../@types/message-handlers'
import { IEventRepository } from '../../@types/repositories'
import { IWebSocketServerAdapter } from '../../@types/servers'
export class DefaultEventStrategy implements IEventStrategy<[Event, WebSocket], Promise<boolean>> {
public constructor(
private readonly adapter: IWebSocketServerAdapter,
private readonly eventRepository: IEventRepository,
) { }
public async execute([event,]: [Event, WebSocket]): Promise<boolean> {
try {
const count = await this.eventRepository.create(event)
if (!count) {
return true
}
await this.adapter.broadcastEvent(event)
return true
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
}
}
}

View File

@@ -0,0 +1,22 @@
import { WebSocket } from 'ws'
import { Event } from '../../@types/event'
import { IEventStrategy } from '../../@types/message-handlers'
import { IWebSocketServerAdapter } from '../../@types/servers'
export class EphemeralEventStrategy implements IEventStrategy<[Event, WebSocket], Promise<boolean>> {
public constructor(
private readonly adapter: IWebSocketServerAdapter,
) { }
public async execute([event,]: [Event, WebSocket]): Promise<boolean> {
console.log('Ephemeral event')
try {
await this.adapter.broadcastEvent(event)
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
}
}
}

View File

@@ -0,0 +1,10 @@
import { IEventStrategy } from '../../@types/message-handlers'
/**
* An event strategy that refuses to do anything useful
*/
export class NullEventStrategy implements IEventStrategy<void, Promise<boolean>> {
public async execute(): Promise<boolean> {
return true
}
}

View File

@@ -0,0 +1,31 @@
import { WebSocket } from 'ws'
import { Event } from '../../@types/event'
import { IEventStrategy } from '../../@types/message-handlers'
import { IEventRepository } from '../../@types/repositories'
import { IWebSocketServerAdapter } from '../../@types/servers'
export class ReplaceableEventStrategy implements IEventStrategy<[Event, WebSocket], Promise<boolean>> {
public constructor(
private readonly adapter: IWebSocketServerAdapter,
private readonly eventRepository: IEventRepository,
) { }
public async execute([event,]: [Event, WebSocket]): Promise<boolean> {
console.log('Replaceable event')
try {
const count = await this.eventRepository.upsert(event)
if (!count) {
return true
}
await this.adapter.broadcastEvent(event)
return true
} catch (error) {
console.error('Unable to handle event. Reason:', error)
return false
}
}
}

View File

@@ -1,30 +1,39 @@
import { pipeline } from 'node:stream/promises'
import { inspect } from 'util'
import { WebSocket } from 'ws'
import { createOutgoingEventMessage, createEndOfStoredEventsNoticeMessage } from '../messages'
import { IMessageHandler } from '../types/message-handlers'
import { MessageType, SubscribeMessage } from '../types/messages'
import { IWebSocketServerAdapter } from '../types/servers'
import { IEventRepository } from '../types/repositories'
import { SubscriptionId, SubscriptionFilter } from '../types/subscription'
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
import { SubscribeMessage } from '../@types/messages'
import { IWebSocketServerAdapter } from '../@types/servers'
import { IEventRepository } from '../@types/repositories'
import { SubscriptionId, SubscriptionFilter } from '../@types/subscription'
import { toNostrEvent } from '../utils/event'
import { streamEach, streamMap } from '../utils/transforms'
import { Event } from '../@types/event'
export class SubscribeMessageHandler implements IMessageHandler {
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
private readonly abortController: AbortController
public constructor(
private readonly adapter: IWebSocketServerAdapter,
private readonly eventRepository: IEventRepository,
) { }
public canHandleMessageType(messageType: MessageType): boolean {
return messageType === MessageType.REQ
) {
this.abortController = new AbortController()
}
public async handleMessage(message: SubscribeMessage, client: WebSocket, adapter: IWebSocketServerAdapter): Promise<boolean> {
public abort(): void {
this.abortController.abort()
}
public async handleMessage(message: SubscribeMessage, client: WebSocket): Promise<void> {
const subscriptionId = message[1] as SubscriptionId
const filters = message.slice(2) as SubscriptionFilter[]
const exists = adapter.getSubscriptions(client)?.get(subscriptionId)
const exists = this.adapter.getSubscriptions(client)?.get(subscriptionId)
adapter.getSubscriptions(client)?.set(subscriptionId, filters)
this.adapter.getSubscriptions(client)?.set(subscriptionId, filters)
console.log(
`Subscription ${subscriptionId} ${exists ? 'updated' : 'created'
@@ -32,31 +41,29 @@ export class SubscribeMessageHandler implements IMessageHandler {
inspect(filters)
)
// TODO: search for matching events on the DB, then send ESOE
const sendEvent = (event: Event) => client.send(JSON.stringify(createOutgoingEventMessage(subscriptionId, event)))
const sendEOSE = () => client.send(JSON.stringify(createEndOfStoredEventsNoticeMessage(subscriptionId)))
return this.eventRepository.findByfilters(filters).then(
(events) => {
events.forEach((event) => {
client.send(
JSON.stringify(
createOutgoingEventMessage(subscriptionId, event)
)
)
})
console.debug(`Sent ${events.length} events to:`, subscriptionId)
client.send(
JSON.stringify(
createEndOfStoredEventsNoticeMessage(subscriptionId)
)
)
console.debug('Sent EOSE to:', subscriptionId)
return true
},
(error) => {
console.error('Unable to find by filters: ', error)
return true
const findEvents = this.eventRepository.findByfilters(filters)
try {
await pipeline(
findEvents,
streamMap(toNostrEvent),
streamEach(
sendEvent,
sendEOSE, // NIP-15: End of Stored Events Notice
),
{
signal: this.abortController.signal,
},
)
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.log('AbortError when finding events')
findEvents.end()
}
)
throw error
}
}
}
}

View File

@@ -1,20 +1,18 @@
import { WebSocket } from 'ws'
import { IMessageHandler } from '../types/message-handlers'
import { MessageType, UnsubscribeMessage } from '../types/messages'
import { IWebSocketServerAdapter } from '../types/servers'
import { IMessageHandler } from '../@types/message-handlers'
import { UnsubscribeMessage } from '../@types/messages'
import { IWebSocketServerAdapter } from '../@types/servers'
export class UnsubscribeMessageHandler implements IMessageHandler {
public canHandleMessageType(messageType: MessageType): boolean {
return messageType === MessageType.CLOSE
}
public constructor(
private readonly adapter: IWebSocketServerAdapter,
) { }
public async handleMessage(message: UnsubscribeMessage, client: WebSocket, adapter: IWebSocketServerAdapter): Promise<boolean> {
public async handleMessage(message: UnsubscribeMessage, client: WebSocket): Promise<void> {
const subscriptionId = message[1]
adapter.getSubscriptions(client)?.delete(subscriptionId)
return true
this.adapter.getSubscriptions(client)?.delete(subscriptionId)
}
}
}

View File

@@ -4,9 +4,7 @@ import { WebSocketServer } from 'ws'
import { getDbClient } from './database/client'
import { EventRepository } from './repositories/event-repository'
import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter'
import { SubscribeMessageHandler } from './handlers/subscribe-message-handler'
import { UnsubscribeMessageHandler } from './handlers/unsubscribe-message-handler'
import { EventMessageHandler } from './handlers/event-message-handler'
import { createMessageHandlerFactory } from './factories/message-handler-factory'
const server = http.createServer()
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
@@ -16,10 +14,8 @@ const eventRepository = new EventRepository(dbClient)
const adapter = new WebSocketServerAdapter(
server,
wss,
createMessageHandlerFactory(eventRepository)
)
adapter.addMessageHandler(new SubscribeMessageHandler(eventRepository))
adapter.addMessageHandler(new UnsubscribeMessageHandler())
adapter.addMessageHandler(new EventMessageHandler(eventRepository))
const port = Number(process.env.SERVER_PORT) || 8008
adapter.listen(port)

View File

@@ -1,11 +1,11 @@
import { Event } from './types/event'
import { SubscriptionId } from './types/subscription'
import { Event } from './@types/event'
import { SubscriptionId } from './@types/subscription'
import {
EndOfStoredEventsNotice,
MessageType,
Notice,
OutgoingEventMessage,
} from './types/messages'
} from './@types/messages'
export const createNotice = (notice: string): Notice => {
return [MessageType.NOTICE, notice]

View File

@@ -1,19 +1,18 @@
import { Knex } from 'knex'
import { applySpec, pipe, prop } from 'ramda'
import { applySpec, omit, pipe, prop } from 'ramda'
import { PassThrough } from 'stream'
import { DBEvent, Event } from '../types/event'
import { IEventRepository } from '../types/repositories'
import { SubscriptionFilter } from '../types/subscription'
import { DBEvent, Event } from '../@types/event'
import { IEventRepository } from '../@types/repositories'
import { SubscriptionFilter } from '../@types/subscription'
import { isGenericTagQuery } from '../utils/filter'
import { toBuffer, toJSON } from '../utils/transforms'
const toBuffer = (input: any) => Buffer.from(input, 'hex')
const fromBuffer = (input: Buffer) => input.toString('hex')
export class EventRepository implements IEventRepository {
public constructor(private readonly dbClient: Knex) {}
public async findByfilters(filters: SubscriptionFilter[]): Promise<Event[]> {
public findByfilters(filters: SubscriptionFilter[]): PassThrough {
const queries = filters.map((filter) => {
const builder = this.dbClient<DBEvent>('events')
@@ -53,7 +52,6 @@ export class EventRepository implements IEventRepository {
})
})
return builder
})
@@ -62,31 +60,30 @@ export class EventRepository implements IEventRepository {
query.union(subqueries, true)
}
console.log('Query', query.toString())
return query.then((rows) => {
const result = rows.map(
(row) =>
applySpec({
id: pipe(prop('event_id'), fromBuffer),
kind: prop('event_kind'),
pubkey: pipe(prop('event_pubkey'), fromBuffer),
created_at: prop('event_created_at'),
content: prop('event_content'),
tags: prop('event_tags'),
sig: pipe(prop('event_signature'), fromBuffer),
})(row) as Event,
)
console.debug('result', result[0])
return result
})
return query.stream()
}
public async create(event: Event): Promise<number> {
console.log('Creating event', event)
const row = applySpec({
event_id: pipe(prop('id'), toBuffer),
event_pubkey: pipe(prop('pubkey'), toBuffer),
event_created_at: prop('created_at'),
event_kind: prop('kind'),
event_tags: pipe(prop('tags'), toJSON),
event_content: prop('content'),
event_signature: pipe(prop('sig'), toBuffer),
})(event)
return this.dbClient('events')
.insert(row)
.onConflict()
.ignore()
.then(prop('rowCount') as () => number)
}
public async upsert(event: Event): Promise<number> {
const toJSON = (input: any) => JSON.stringify(input)
const row = applySpec({
@@ -101,10 +98,10 @@ export class EventRepository implements IEventRepository {
return this.dbClient('events')
.insert(row)
.onConflict('event_id')
.ignore()
.then(
(({ rowCount }: { rowCount: number }) => rowCount) as any,
)
// NIP-16: Replaceable Events
.onConflict(this.dbClient.raw('(event_pubkey, event_kind) WHERE event_kind = 0 OR event_kind >= 10000 AND event_kind < 2000'))
.merge(omit(['event_pubkey', 'event_kind'])(row))
.where('events.event_created_at', '<', row.event_created_at)
.then(prop('rowCount') as () => number)
}
}

View File

@@ -1,10 +1,10 @@
import Schema from 'joi'
export const pubkeySchema = Schema.string().length(64)
export const pubkeySchema = Schema.string().case('lower').hex().length(64)
export const kindSchema = Schema.number().min(0).multiple(1)
export const signatureSchema = Schema.string().length(128)
export const signatureSchema = Schema.string().case('lower').hex().length(128)
export const subscriptionSchema = Schema.string().min(1).max(255)

View File

@@ -1,22 +1,14 @@
import { readFileSync } from 'fs'
import { homedir } from 'os'
import { join } from 'path'
import { mergeDeepRight } from 'ramda'
import packageJson from '../package.json'
import { ISettings } from './@types/settings'
interface Info {
relay_url?: string
name?: string
description?: string
pubkey?: string
contact?: string
}
let _settings: ISettings
interface Settings {
info: Info
}
const getDefaultSettings = (): Settings => ({
const getDefaultSettings = (): ISettings => ({
info: {
relay_url: undefined,
name: `Unnamed ${packageJson.name}`,
@@ -24,9 +16,30 @@ const getDefaultSettings = (): Settings => ({
pubkey: undefined,
contact: undefined,
},
limits: {
event: {
eventId: {
minimumZeroBits: 0,
},
kind: {
whitelist: [],
blacklist: [],
},
pubkey: {
whitelist: [],
blacklist: [],
},
},
client: {
subscription: {
maximumCount: 10,
maximumFilters: 5,
},
},
},
})
const createSettingsFromFile = (defaults: Settings) => {
const createSettingsFromFile = (defaults: ISettings) => {
const contents = JSON.parse(
readFileSync(
join(
@@ -37,23 +50,22 @@ const createSettingsFromFile = (defaults: Settings) => {
),
)
return {
info: {
...defaults.info,
...contents.info,
},
}
return mergeDeepRight(defaults, contents)
}
const createSettings = (): Settings => {
const defaultSettings = getDefaultSettings()
const createSettings = (): ISettings => {
try {
return createSettingsFromFile(defaultSettings)
if (_settings) {
return _settings
}
_settings = createSettingsFromFile(getDefaultSettings())
return _settings
} catch (err) {
console.error('Unable to read config file. Reason: %s', err.message)
return defaultSettings
return getDefaultSettings()
}
}

View File

@@ -1,9 +0,0 @@
import { WebSocket } from 'ws'
import { Message, MessageType } from './messages'
import { IWebSocketServerAdapter } from './servers'
export interface IMessageHandler {
canHandleMessageType(messageType: MessageType): boolean
handleMessage(message: Message, client: WebSocket, adapter: IWebSocketServerAdapter): Promise<boolean>
}

View File

@@ -1,7 +1,9 @@
import * as secp256k1 from '@noble/secp256k1'
import { CanonicalEvent, Event } from '../types/event'
import { SubscriptionFilter } from '../types/subscription'
import { applySpec, pipe, prop } from 'ramda'
import { CanonicalEvent, Event } from '../@types/event'
import { SubscriptionFilter } from '../@types/subscription'
import { isGenericTagQuery } from './filter'
import { fromBuffer } from './transforms'
export const serializeEvent = (event: Partial<Event>): CanonicalEvent => [
0,
@@ -12,6 +14,16 @@ export const serializeEvent = (event: Partial<Event>): CanonicalEvent => [
event.content,
]
export const toNostrEvent = applySpec({
id: pipe(prop('event_id'), fromBuffer),
kind: prop('event_kind'),
pubkey: pipe(prop('event_pubkey'), fromBuffer),
created_at: prop('event_created_at'),
content: prop('event_content'),
tags: prop('event_tags'),
sig: pipe(prop('event_signature'), fromBuffer),
})
export const isEventMatchingFilter =
(filter: SubscriptionFilter) =>
(event: Event): boolean => {
@@ -62,3 +74,15 @@ export const isEventMatchingFilter =
export const isEventSignatureValid = async (event: Event): Promise<boolean> => {
return secp256k1.schnorr.verify(event.sig, event.id, event.pubkey)
}
export const isReplaceableEvent = (event: Event): boolean => {
return event.kind >= 10000 && event.kind < 20000
}
export const isEphemeralEvent = (event: Event): boolean => {
return event.kind >= 20000 && event.kind < 30000
}
export const isNullEvent = (event: Event): boolean => {
return event.kind === Number.MAX_SAFE_INTEGER
}

View File

@@ -1,6 +1,6 @@
import { createHash } from 'crypto'
import { Event } from '../types/event'
import { Event } from '../@types/event'
import { serializeEvent } from './serialize-event'
export const getEventHash = (event: Event) => {

View File

@@ -1,4 +1,4 @@
import { Event } from '../types/event'
import { Event } from '../@types/event'
export const serializeEvent = ({
pubkey,

26
src/utils/transforms.ts Normal file
View File

@@ -0,0 +1,26 @@
import { Transform, Writable } from 'stream'
export const toJSON = (input: any) => JSON.stringify(input)
export const toBuffer = (input: any) => Buffer.from(input, 'hex')
export const fromBuffer = (input: Buffer) => input.toString('hex')
export const streamMap = (fn: (chunk) => any) => new Transform({
objectMode: true,
transform(chunk, _encoding, callback) {
callback(null, fn(chunk))
}
})
export const streamEach = (writeFn: (chunk: any) => void, finalFn: () => void) => new Writable({
objectMode: true,
write(chunk, _encoding, callback) {
writeFn(chunk)
callback()
},
final(callback) {
finalFn()
callback()
},
})

View File

@@ -1,5 +1,5 @@
import { expect } from 'chai'
import { Event, CanonicalEvent } from '../../src/types/event'
import { Event, CanonicalEvent } from '../../src/@types/event'
import { isEventMatchingFilter, isEventSignatureValid, serializeEvent } from '../../src/utils/event'
import { EventKinds } from '../../src/constants/base'
@@ -227,4 +227,4 @@ describe('isEventSignatureValid', () => {
await isEventSignatureValid(event)
).to.be.false
})
})
})