diff --git a/README.md b/README.md index 04333a5..386e7c4 100644 --- a/README.md +++ b/README.md @@ -14,16 +14,29 @@ NIPs with a relay-specific implementation are listed here. - [ ] NIP-03: OpenTimestams Attestations for Events - [ ] NIP-04: Encrypted Direct Message - [ ] NIP-05: Mapping Nostr keys to DNS identifiers +- [ ] NIP-06: Basic key derivation from mnemonic seed phrase - [ ] NIP-09: Event deletion - [ ] NIP-11: Relay information document - [ ] NIP-12: Generic tag queries -- [ ] NIP-13: Alias (_experimental_) -- [ ] NIP-14: Reputation scheme (_experimental_) -- [ ] NIP-15: Relating events to resources (_experimental_) +- [ ] NIP-13: Proof of Work +- [ ] NIP-14: Subject tag in text events. +- [x] NIP-15: End of Stored Events Notice +- [ ] NIP-16: Event Treatment +- [ ] NIP-25: Reactions ## Quick Start -TBD +Install dependencies: + + ``` + npm install + ``` + +To start in development mode: + + ``` + npm run dev + ``` ## Configuration diff --git a/src/database/client.ts b/src/database/client.ts index e1f3d62..94ef293 100644 --- a/src/database/client.ts +++ b/src/database/client.ts @@ -16,14 +16,13 @@ const createDbConfig = ( min: 2, max: 3, idleTimeoutMillis: 10000, - afterCreate: function (conn, done) { - conn.query('LISTEN event_added', function (err) { - if (err) { - done(err, conn) - } - conn.on('notification', onNotificationCallback) - done(err, conn) + afterCreate: function (connection, callback) { + connection.on('error', function (error) { + console.error('PG error', error) }) + connection.query('LISTEN event_added') + connection.on('notification', onNotificationCallback) + callback(null, connection) }, }, acquireConnectionTimeout: 2000, diff --git a/src/index.ts b/src/index.ts index 9155e8f..7feca2e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,8 +2,12 @@ import * as http from 'http' import { WebSocket, WebSocketServer } from 'ws' import { applySpec, prop, pipe } from 'ramda' import Joi from 'joi' +import util from 'util' -import { createNotice, createOutgoingEventMessage } from './messages' +import { + createEndOfStoredEventsNoticeMessage, + createOutgoingEventMessage, +} from './messages' import packageJson from '../package.json' import { Settings } from './settings' import { Message, MessageType } from './types/messages' @@ -11,16 +15,22 @@ import { SubscriptionFilter, SubscriptionId } from './types/subscription' import { getDbClient } from './database/client' import { messageSchema } from './schemas/message-schema' import { Event } from './types/event' +import { isEventMatchingFilter } from './event' +import { EventRepository } from './repositories/event-repository' + +const inspect = (myObject) => + util.inspect(myObject, { showHidden: false, depth: null, colors: true }) const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 const server = http.createServer() const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) const dbClient = getDbClient() +const eventRepository = new EventRepository(dbClient) dbClient.raw('SELECT 1=1').then(() => void 0) -const stripEscape = (flerp) => flerp.slice(3) +const stripEscape = (flerp) => flerp.slice(2) const createEventFromDb = applySpec({ id: pipe(prop('event_id'), stripEscape), @@ -35,21 +45,29 @@ const createEventFromDb = applySpec({ dbClient.on('event_added', (event) => { const nostrEvent = createEventFromDb(event) as Event - wss.clients.forEach((client) => { - if (client.readyState !== WebSocket.OPEN) { + wss.clients.forEach((ws) => { + if (ws.readyState !== WebSocket.OPEN) { return } - console.log( - `broadcasting to client with subscriptions`, - (client as any).subscriptions, - nostrEvent, - ) Object.entries( - (client as any).subscriptions as { - [subscriptionId: SubscriptionId]: SubscriptionFilter + (ws as any).subscriptions as { + [subscriptionId: SubscriptionId]: SubscriptionFilter[] }, - ).forEach(([subscriptionId]) => { - client.send( + ).forEach(([subscriptionId, filters]) => { + if ( + !filters + .map(isEventMatchingFilter) + .some((isMatch) => isMatch(nostrEvent)) + ) { + return + } + console.log( + `Broadcasting to client with subscription ${subscriptionId}`, + inspect(filters), + inspect(nostrEvent), + ) + + ws.send( JSON.stringify(createOutgoingEventMessage(subscriptionId, nostrEvent)), ) }) @@ -60,6 +78,113 @@ function heartbeat() { this.isAlive = true } +wss.on('connection', function (ws, _req) { + ws['subscriptions'] = {} + ws['isAlive'] = true + + ws.on('message', function onMessage(raw) { + let message: Message + + try { + message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, { + stripUnknown: true, + abortEarly: true, + }) as Message + } catch (error) { + console.error('Invalid message', error, JSON.stringify(raw)) + return + } + + const command = message[0] + switch (command) { + case MessageType.EVENT: + { + if (message[1] === null || typeof message[1] !== 'object') { + // ws.send(JSON.stringify(createNotice(`Invalid event`))) + return + } + + eventRepository.create(message[1]).catch((error) => { + console.error(`Unable to add event. Reason: ${error.message}`) + }) + } + break + case MessageType.REQ: + { + const subscriptionId = message[1] as SubscriptionId + const filters = message.slice(2) as SubscriptionFilter[] + + const exists = subscriptionId in ws['subscriptions'] + + ws['subscriptions'][subscriptionId] = filters + + console.log( + `Subscription ${subscriptionId} ${ + exists ? 'updated' : 'created' + } with filters:`, + inspect(filters), + ) + + // TODO: search for matching events on the DB, then send ESOE + + eventRepository.findByfilters(filters).then( + (events) => { + events.forEach((event) => { + ws.send( + JSON.stringify( + createOutgoingEventMessage(subscriptionId, event), + ), + ) + }) + ws.send( + JSON.stringify( + createEndOfStoredEventsNoticeMessage(subscriptionId), + ), + ) + console.log(`Found ${events.length} events matching filter.`) + }, + (error) => { + console.error('Unable to find by filters: ', error) + }, + ) + } + break + case MessageType.CLOSE: + { + const subscriptionId = message[1] as SubscriptionId + delete ws['subscriptions'][subscriptionId] + } + break + } + }) + + ws.on('pong', heartbeat) + + ws.on('close', function onClose(code) { + Object.keys(ws['subscriptions']).forEach( + (subscriptionId) => delete ws['subscriptions'][subscriptionId], + ) + delete ws['subscriptions'] + // TODO: Clean up subscriptions + console.log('disconnected %s', code) + }) +}) + +const heartbeatInterval = setInterval(function ping() { + wss.clients.forEach(function each(ws) { + if (!ws['isAlive']) { + return ws.terminate() + } + + ws['isAlive'] = false + ws.ping() + }) +}, WSS_CLIENT_HEALTH_PROBE_INTERVAL) + +wss.on('close', function close() { + clearInterval(heartbeatInterval) +}) + server.on('request', async (req, res) => { if (req.headers['accept'] === 'application/nostr+json') { const { @@ -83,142 +208,6 @@ server.on('request', async (req, res) => { } }) -wss.on('connection', (ws, _req) => { - ws['subscriptions'] = {} - ws['isAlive'] = true - - ws.on('message', function onMessage(raw) { - let message: Message - - try { - message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, { - stripUnknown: true, - abortEarly: true, - }) as Message - } catch (error) { - console.error('Invalid message', error) - ws.send( - JSON.stringify(createNotice('Message does not match any known schema')), - ) - return - } - - const command = message[0] - switch (command) { - case MessageType.EVENT: - { - if (message[1] === null || typeof message[1] !== 'object') { - ws.send(JSON.stringify(createNotice(`Invalid event`))) - return - } - - const toJSON = (input) => JSON.stringify(input) - const toBuffer = (input) => Buffer.from(input, 'hex') - - const row = applySpec({ - event_id: pipe(prop('id'), toBuffer), - event_pubkey: pipe(prop('pubkey'), toBuffer), - event_created_at: prop('created_at'), - event_kind: prop('kind'), - event_tags: pipe(prop('tags'), toJSON), - event_content: prop('content'), - event_signature: pipe(prop('sig'), toBuffer), - })(message[1]) - - dbClient('events') - .insert(row) - .onConflict('event_id') - .ignore() - .asCallback(function (error, rows) { - if (error) { - console.log('Unable to add event', error) - return - } - console.log(`Added ${rows.length} events.`) - }) - } - break - case MessageType.REQ: - { - const subscriptionId = message[1] as SubscriptionId - const filter = message[2] as SubscriptionFilter - - const exists = subscriptionId in ws['subscriptions'] - - ws['subscriptions'][subscriptionId] = filter - - console.log( - `Subscription ${subscriptionId} ${ - exists ? 'updated' : 'created' - } with filters ${JSON.stringify(filter)}`, - ) - - // TODO: search for matching events on the DB, then send ESOE - - // ws.send( - // JSON.stringify( - // createNotice( - // `Subscription ${subscriptionId} ${ - // exists ? 'updated' : 'created' - // } with filters ${JSON.stringify(filter)}`, - // ), - // ), - // ) - } - break - case MessageType.CLOSE: - { - const subscriptionId = message[1] as SubscriptionId - - const exists = subscriptionId in ws['subscriptions'] - if (!exists) { - ws.send( - JSON.stringify( - createNotice(`Subscription ${subscriptionId} not found`), - ), - ) - return - } - - delete ws['subscriptions'][subscriptionId] - - ws.send( - JSON.stringify( - createNotice(`Subscription ${subscriptionId} closed`), - ), - ) - } - break - } - }) - - ws.on('pong', heartbeat) - - ws.on('close', function onClose(code) { - if (this.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(createNotice('Goodbye'))) - } - console.log('disconnected %s', code) - }) - - ws.send(JSON.stringify(createNotice('Howdy!'))) -}) - -const heartbeatInterval = setInterval(function ping() { - wss.clients.forEach(function each(ws) { - if (!ws['isAlive']) { - return ws.terminate() - } - - ws['isAlive'] = false - ws.ping() - }) -}, WSS_CLIENT_HEALTH_PROBE_INTERVAL) - -wss.on('close', function close() { - clearInterval(heartbeatInterval) -}) - server.on('clientError', (err, socket) => { if (err['code'] === 'ECONNRESET' || !socket.writable) { return @@ -229,3 +218,9 @@ server.on('clientError', (err, socket) => { const port = process.env.SERVER_PORT ?? 8008 console.log(`Listening on port: ${port}`) server.listen(port) + +process.on('SIGINT', function () { + console.log('Caught interrupt signal') + server.close() + process.exit() +}) diff --git a/src/messages.ts b/src/messages.ts index 084c534..42a79f6 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -1,6 +1,11 @@ import { Event } from './types/event' import { SubscriptionId } from './types/subscription' -import { MessageType, Notice, OutgoingEventMessage } from './types/messages' +import { + EndOfStoredEventsNotice, + MessageType, + Notice, + OutgoingEventMessage, +} from './types/messages' export const createNotice = (notice: string): Notice => { return [MessageType.NOTICE, notice] @@ -12,3 +17,9 @@ export const createOutgoingEventMessage = ( ): OutgoingEventMessage => { return [MessageType.EVENT, subscriptionId, event] } + +export const createEndOfStoredEventsNoticeMessage = ( + subscriptionId: SubscriptionId, +): EndOfStoredEventsNotice => { + return [MessageType.EOSE, subscriptionId] +} diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts new file mode 100644 index 0000000..101c1b6 --- /dev/null +++ b/src/repositories/event-repository.ts @@ -0,0 +1,94 @@ +import { Knex } from 'knex' +import { applySpec, pipe, prop } from 'ramda' + +import { DBEvent, Event } from '../types/event' +import { IEventRepository } from '../types/repositories' +import { SubscriptionFilter } from '../types/subscription' + +const toBuffer = (input: any) => Buffer.from(input, 'hex') + +const fromBuffer = (input: Buffer) => input.toString('hex') + +export class EventRepository implements IEventRepository { + public constructor(private readonly dbClient: Knex) {} + + public async findByfilters(filters: SubscriptionFilter[]): Promise { + const queries = filters.map((filter) => { + const builder = this.dbClient('events') + + if (Array.isArray(filter.authors)) { + builder.whereIn('event_pubkey', filter.authors.map(toBuffer)) + } + + if (Array.isArray(filter.ids)) { + builder.whereIn('event_id', filter.ids.map(toBuffer)) + } + + if (Array.isArray(filter.kinds)) { + builder.whereIn('event_kind', filter.kinds) + } + + if (typeof filter.since === 'number') { + builder.where('event_created_at', '>=', filter.since) + } + + if (typeof filter.until === 'number') { + builder.where('event_created_at', '<=', filter.until) + } + + if (typeof filter.limit === 'number') { + builder.limit(filter.limit).orderBy('event_created_at', 'DESC') + } else { + builder.orderBy('event_created_at', 'asc') + } + + return builder + }) + + const [query, ...subqueries] = queries + if (subqueries.length) { + query.union(subqueries) + } + + console.log('Query', query.toString()) + + return query.then((rows) => + rows.map( + (row) => + applySpec({ + id: pipe(prop('event_id'), fromBuffer), + kind: prop('event_kind'), + pubkey: pipe(prop('event_pubkey'), fromBuffer), + created_at: prop('event_created_at'), + content: prop('event_content'), + tags: prop('event_tags'), + sig: pipe(prop('event_signature'), fromBuffer), + })(row) as Event, + ), + ) + } + + public async create(event: Event): Promise { + console.log('Creating event', event) + + const toJSON = (input: any) => JSON.stringify(input) + + const row = applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + })(event) + + return void this.dbClient('events') + .insert(row) + .onConflict('event_id') + .ignore() + .then((number) => { + console.log(`Rows added`, (number as any).rowCount) + }) + } +} diff --git a/src/schemas/message-schema.ts b/src/schemas/message-schema.ts index 1ce892b..c498e92 100644 --- a/src/schemas/message-schema.ts +++ b/src/schemas/message-schema.ts @@ -8,11 +8,9 @@ export const eventMessageSchema = Schema.array().ordered( eventSchema.required(), ) -export const reqMessageSchema = Schema.array().ordered( - Schema.string().valid('REQ').required(), - Schema.string().required(), - filterSchema.required(), -) +export const reqMessageSchema = Schema.array() + .ordered(Schema.string().valid('REQ').required(), Schema.string().required()) + .items(filterSchema.required()) export const closeMessageSchema = Schema.array().ordered( Schema.string().valid('CLOSE').required(), diff --git a/src/schemas/request-schema.ts b/src/schemas/request-schema.ts index ef55f08..af2523c 100644 --- a/src/schemas/request-schema.ts +++ b/src/schemas/request-schema.ts @@ -6,7 +6,5 @@ export const filterSchema = Schema.object({ kinds: Schema.array().items(Schema.number().min(0)), since: Schema.number().min(0).multiple(1), until: Schema.number().min(0).multiple(1), - limit: Schema.number().min(1).multiple(1).max(500), - '#e': Schema.array().items(Schema.string()), - '#p': Schema.array().items(Schema.string()), -}) + limit: Schema.number().min(1).multiple(1).max(10000), +}).pattern(/^#[a-z]$/, Schema.array().items(Schema.string())) diff --git a/src/types/base.ts b/src/types/base.ts index da9a9e9..5566e19 100644 --- a/src/types/base.ts +++ b/src/types/base.ts @@ -1,3 +1,15 @@ export type Pubkey = string export type TagName = string export type Signature = string + +type Enumerate< + N extends number, + Acc extends number[] = [], +> = Acc['length'] extends N + ? Acc[number] + : Enumerate + +export type Range = Exclude< + Enumerate, + Enumerate +> diff --git a/src/types/event.ts b/src/types/event.ts index 180ef0a..d874917 100644 --- a/src/types/event.ts +++ b/src/types/event.ts @@ -18,6 +18,18 @@ export interface Event { content: string } +export interface DBEvent { + id: string + event_id: Buffer + event_pubkey: Buffer + event_kind: number + event_created_at: number + event_content: string + event_tags: Tag[] + event_signature: Buffer + first_seen: Date +} + export interface CanonicalEvent { 0: 0 1: string diff --git a/src/types/messages.ts b/src/types/messages.ts index a7346b3..4c56539 100644 --- a/src/types/messages.ts +++ b/src/types/messages.ts @@ -1,3 +1,4 @@ +import { Range } from './base' import { Event } from './event' import { SubscriptionId, SubscriptionFilter } from './subscription' @@ -6,6 +7,7 @@ export enum MessageType { EVENT = 'EVENT', CLOSE = 'CLOSE', NOTICE = 'NOTICE', + EOSE = 'EOSE', } export type Message = @@ -13,12 +15,14 @@ export type Message = | IncomingEventMessage | UnsubscribeMessage | Notice + | EndOfStoredEventsNotice -export interface SubscriptionMessage { +export type SubscriptionMessage = { + [index in Range<2, 100>]: SubscriptionFilter +} & { 0: MessageType.REQ 1: SubscriptionId - 2: SubscriptionFilter -} +} & Array export interface IncomingEventMessage { 0: MessageType.EVENT @@ -40,3 +44,8 @@ export interface Notice { 0: MessageType.NOTICE 1: string } + +export interface EndOfStoredEventsNotice { + 0: MessageType.EOSE + 1: SubscriptionId +} diff --git a/src/types/repositories.ts b/src/types/repositories.ts new file mode 100644 index 0000000..1a24fa9 --- /dev/null +++ b/src/types/repositories.ts @@ -0,0 +1,7 @@ +import { Event } from './event' +import { SubscriptionFilter } from './subscription' + +export interface IEventRepository { + create(event: Event): Promise + findByfilters(filters: SubscriptionFilter[]): Promise +} diff --git a/src/types/subscription.ts b/src/types/subscription.ts index 43dd476..ae8e1a0 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -1,4 +1,4 @@ -import { Pubkey } from 'types' +import { Pubkey } from './base' import { EventId } from './event' export type SubscriptionId = string @@ -9,5 +9,6 @@ export interface SubscriptionFilter { since?: number until?: number authors?: Pubkey[] + limit?: number [key: `#${string}`]: string[] }