feat: reply with stored events, send eose

This commit is contained in:
Ricardo Arturo Cabral Mejia 2022-08-07 07:38:18 +00:00
parent d8cd553e24
commit 301dd9d052
No known key found for this signature in database
GPG Key ID: 5931EBF43A650245
12 changed files with 323 additions and 174 deletions

View File

@ -14,16 +14,29 @@ NIPs with a relay-specific implementation are listed here.
- [ ] NIP-03: OpenTimestams Attestations for Events
- [ ] NIP-04: Encrypted Direct Message
- [ ] NIP-05: Mapping Nostr keys to DNS identifiers
- [ ] NIP-06: Basic key derivation from mnemonic seed phrase
- [ ] NIP-09: Event deletion
- [ ] NIP-11: Relay information document
- [ ] NIP-12: Generic tag queries
- [ ] NIP-13: Alias (_experimental_)
- [ ] NIP-14: Reputation scheme (_experimental_)
- [ ] NIP-15: Relating events to resources (_experimental_)
- [ ] NIP-13: Proof of Work
- [ ] NIP-14: Subject tag in text events.
- [x] NIP-15: End of Stored Events Notice
- [ ] NIP-16: Event Treatment
- [ ] NIP-25: Reactions
## Quick Start
TBD
Install dependencies:
```
npm install
```
To start in development mode:
```
npm run dev
```
## Configuration

View File

@ -16,14 +16,13 @@ const createDbConfig = (
min: 2,
max: 3,
idleTimeoutMillis: 10000,
afterCreate: function (conn, done) {
conn.query('LISTEN event_added', function (err) {
if (err) {
done(err, conn)
}
conn.on('notification', onNotificationCallback)
done(err, conn)
afterCreate: function (connection, callback) {
connection.on('error', function (error) {
console.error('PG error', error)
})
connection.query('LISTEN event_added')
connection.on('notification', onNotificationCallback)
callback(null, connection)
},
},
acquireConnectionTimeout: 2000,

View File

@ -2,8 +2,12 @@ import * as http from 'http'
import { WebSocket, WebSocketServer } from 'ws'
import { applySpec, prop, pipe } from 'ramda'
import Joi from 'joi'
import util from 'util'
import { createNotice, createOutgoingEventMessage } from './messages'
import {
createEndOfStoredEventsNoticeMessage,
createOutgoingEventMessage,
} from './messages'
import packageJson from '../package.json'
import { Settings } from './settings'
import { Message, MessageType } from './types/messages'
@ -11,16 +15,22 @@ import { SubscriptionFilter, SubscriptionId } from './types/subscription'
import { getDbClient } from './database/client'
import { messageSchema } from './schemas/message-schema'
import { Event } from './types/event'
import { isEventMatchingFilter } from './event'
import { EventRepository } from './repositories/event-repository'
const inspect = (myObject) =>
util.inspect(myObject, { showHidden: false, depth: null, colors: true })
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000
const server = http.createServer()
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
const dbClient = getDbClient()
const eventRepository = new EventRepository(dbClient)
dbClient.raw('SELECT 1=1').then(() => void 0)
const stripEscape = (flerp) => flerp.slice(3)
const stripEscape = (flerp) => flerp.slice(2)
const createEventFromDb = applySpec({
id: pipe(prop('event_id'), stripEscape),
@ -35,21 +45,29 @@ const createEventFromDb = applySpec({
dbClient.on('event_added', (event) => {
const nostrEvent = createEventFromDb(event) as Event
wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
wss.clients.forEach((ws) => {
if (ws.readyState !== WebSocket.OPEN) {
return
}
console.log(
`broadcasting to client with subscriptions`,
(client as any).subscriptions,
nostrEvent,
)
Object.entries(
(client as any).subscriptions as {
[subscriptionId: SubscriptionId]: SubscriptionFilter
(ws as any).subscriptions as {
[subscriptionId: SubscriptionId]: SubscriptionFilter[]
},
).forEach(([subscriptionId]) => {
client.send(
).forEach(([subscriptionId, filters]) => {
if (
!filters
.map(isEventMatchingFilter)
.some((isMatch) => isMatch(nostrEvent))
) {
return
}
console.log(
`Broadcasting to client with subscription ${subscriptionId}`,
inspect(filters),
inspect(nostrEvent),
)
ws.send(
JSON.stringify(createOutgoingEventMessage(subscriptionId, nostrEvent)),
)
})
@ -60,6 +78,113 @@ function heartbeat() {
this.isAlive = true
}
wss.on('connection', function (ws, _req) {
ws['subscriptions'] = {}
ws['isAlive'] = true
ws.on('message', function onMessage(raw) {
let message: Message
try {
message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, {
stripUnknown: true,
abortEarly: true,
}) as Message
} catch (error) {
console.error('Invalid message', error, JSON.stringify(raw))
return
}
const command = message[0]
switch (command) {
case MessageType.EVENT:
{
if (message[1] === null || typeof message[1] !== 'object') {
// ws.send(JSON.stringify(createNotice(`Invalid event`)))
return
}
eventRepository.create(message[1]).catch((error) => {
console.error(`Unable to add event. Reason: ${error.message}`)
})
}
break
case MessageType.REQ:
{
const subscriptionId = message[1] as SubscriptionId
const filters = message.slice(2) as SubscriptionFilter[]
const exists = subscriptionId in ws['subscriptions']
ws['subscriptions'][subscriptionId] = filters
console.log(
`Subscription ${subscriptionId} ${
exists ? 'updated' : 'created'
} with filters:`,
inspect(filters),
)
// TODO: search for matching events on the DB, then send ESOE
eventRepository.findByfilters(filters).then(
(events) => {
events.forEach((event) => {
ws.send(
JSON.stringify(
createOutgoingEventMessage(subscriptionId, event),
),
)
})
ws.send(
JSON.stringify(
createEndOfStoredEventsNoticeMessage(subscriptionId),
),
)
console.log(`Found ${events.length} events matching filter.`)
},
(error) => {
console.error('Unable to find by filters: ', error)
},
)
}
break
case MessageType.CLOSE:
{
const subscriptionId = message[1] as SubscriptionId
delete ws['subscriptions'][subscriptionId]
}
break
}
})
ws.on('pong', heartbeat)
ws.on('close', function onClose(code) {
Object.keys(ws['subscriptions']).forEach(
(subscriptionId) => delete ws['subscriptions'][subscriptionId],
)
delete ws['subscriptions']
// TODO: Clean up subscriptions
console.log('disconnected %s', code)
})
})
const heartbeatInterval = setInterval(function ping() {
wss.clients.forEach(function each(ws) {
if (!ws['isAlive']) {
return ws.terminate()
}
ws['isAlive'] = false
ws.ping()
})
}, WSS_CLIENT_HEALTH_PROBE_INTERVAL)
wss.on('close', function close() {
clearInterval(heartbeatInterval)
})
server.on('request', async (req, res) => {
if (req.headers['accept'] === 'application/nostr+json') {
const {
@ -83,142 +208,6 @@ server.on('request', async (req, res) => {
}
})
wss.on('connection', (ws, _req) => {
ws['subscriptions'] = {}
ws['isAlive'] = true
ws.on('message', function onMessage(raw) {
let message: Message
try {
message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, {
stripUnknown: true,
abortEarly: true,
}) as Message
} catch (error) {
console.error('Invalid message', error)
ws.send(
JSON.stringify(createNotice('Message does not match any known schema')),
)
return
}
const command = message[0]
switch (command) {
case MessageType.EVENT:
{
if (message[1] === null || typeof message[1] !== 'object') {
ws.send(JSON.stringify(createNotice(`Invalid event`)))
return
}
const toJSON = (input) => JSON.stringify(input)
const toBuffer = (input) => Buffer.from(input, 'hex')
const row = applySpec({
event_id: pipe(prop('id'), toBuffer),
event_pubkey: pipe(prop('pubkey'), toBuffer),
event_created_at: prop('created_at'),
event_kind: prop('kind'),
event_tags: pipe(prop('tags'), toJSON),
event_content: prop('content'),
event_signature: pipe(prop('sig'), toBuffer),
})(message[1])
dbClient('events')
.insert(row)
.onConflict('event_id')
.ignore()
.asCallback(function (error, rows) {
if (error) {
console.log('Unable to add event', error)
return
}
console.log(`Added ${rows.length} events.`)
})
}
break
case MessageType.REQ:
{
const subscriptionId = message[1] as SubscriptionId
const filter = message[2] as SubscriptionFilter
const exists = subscriptionId in ws['subscriptions']
ws['subscriptions'][subscriptionId] = filter
console.log(
`Subscription ${subscriptionId} ${
exists ? 'updated' : 'created'
} with filters ${JSON.stringify(filter)}`,
)
// TODO: search for matching events on the DB, then send ESOE
// ws.send(
// JSON.stringify(
// createNotice(
// `Subscription ${subscriptionId} ${
// exists ? 'updated' : 'created'
// } with filters ${JSON.stringify(filter)}`,
// ),
// ),
// )
}
break
case MessageType.CLOSE:
{
const subscriptionId = message[1] as SubscriptionId
const exists = subscriptionId in ws['subscriptions']
if (!exists) {
ws.send(
JSON.stringify(
createNotice(`Subscription ${subscriptionId} not found`),
),
)
return
}
delete ws['subscriptions'][subscriptionId]
ws.send(
JSON.stringify(
createNotice(`Subscription ${subscriptionId} closed`),
),
)
}
break
}
})
ws.on('pong', heartbeat)
ws.on('close', function onClose(code) {
if (this.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(createNotice('Goodbye')))
}
console.log('disconnected %s', code)
})
ws.send(JSON.stringify(createNotice('Howdy!')))
})
const heartbeatInterval = setInterval(function ping() {
wss.clients.forEach(function each(ws) {
if (!ws['isAlive']) {
return ws.terminate()
}
ws['isAlive'] = false
ws.ping()
})
}, WSS_CLIENT_HEALTH_PROBE_INTERVAL)
wss.on('close', function close() {
clearInterval(heartbeatInterval)
})
server.on('clientError', (err, socket) => {
if (err['code'] === 'ECONNRESET' || !socket.writable) {
return
@ -229,3 +218,9 @@ server.on('clientError', (err, socket) => {
const port = process.env.SERVER_PORT ?? 8008
console.log(`Listening on port: ${port}`)
server.listen(port)
process.on('SIGINT', function () {
console.log('Caught interrupt signal')
server.close()
process.exit()
})

View File

@ -1,6 +1,11 @@
import { Event } from './types/event'
import { SubscriptionId } from './types/subscription'
import { MessageType, Notice, OutgoingEventMessage } from './types/messages'
import {
EndOfStoredEventsNotice,
MessageType,
Notice,
OutgoingEventMessage,
} from './types/messages'
export const createNotice = (notice: string): Notice => {
return [MessageType.NOTICE, notice]
@ -12,3 +17,9 @@ export const createOutgoingEventMessage = (
): OutgoingEventMessage => {
return [MessageType.EVENT, subscriptionId, event]
}
export const createEndOfStoredEventsNoticeMessage = (
subscriptionId: SubscriptionId,
): EndOfStoredEventsNotice => {
return [MessageType.EOSE, subscriptionId]
}

View File

@ -0,0 +1,94 @@
import { Knex } from 'knex'
import { applySpec, pipe, prop } from 'ramda'
import { DBEvent, Event } from '../types/event'
import { IEventRepository } from '../types/repositories'
import { SubscriptionFilter } from '../types/subscription'
const toBuffer = (input: any) => Buffer.from(input, 'hex')
const fromBuffer = (input: Buffer) => input.toString('hex')
export class EventRepository implements IEventRepository {
public constructor(private readonly dbClient: Knex) {}
public async findByfilters(filters: SubscriptionFilter[]): Promise<Event[]> {
const queries = filters.map((filter) => {
const builder = this.dbClient<DBEvent>('events')
if (Array.isArray(filter.authors)) {
builder.whereIn('event_pubkey', filter.authors.map(toBuffer))
}
if (Array.isArray(filter.ids)) {
builder.whereIn('event_id', filter.ids.map(toBuffer))
}
if (Array.isArray(filter.kinds)) {
builder.whereIn('event_kind', filter.kinds)
}
if (typeof filter.since === 'number') {
builder.where('event_created_at', '>=', filter.since)
}
if (typeof filter.until === 'number') {
builder.where('event_created_at', '<=', filter.until)
}
if (typeof filter.limit === 'number') {
builder.limit(filter.limit).orderBy('event_created_at', 'DESC')
} else {
builder.orderBy('event_created_at', 'asc')
}
return builder
})
const [query, ...subqueries] = queries
if (subqueries.length) {
query.union(subqueries)
}
console.log('Query', query.toString())
return query.then((rows) =>
rows.map(
(row) =>
applySpec({
id: pipe(prop('event_id'), fromBuffer),
kind: prop('event_kind'),
pubkey: pipe(prop('event_pubkey'), fromBuffer),
created_at: prop('event_created_at'),
content: prop('event_content'),
tags: prop('event_tags'),
sig: pipe(prop('event_signature'), fromBuffer),
})(row) as Event,
),
)
}
public async create(event: Event): Promise<void> {
console.log('Creating event', event)
const toJSON = (input: any) => JSON.stringify(input)
const row = applySpec({
event_id: pipe(prop('id'), toBuffer),
event_pubkey: pipe(prop('pubkey'), toBuffer),
event_created_at: prop('created_at'),
event_kind: prop('kind'),
event_tags: pipe(prop('tags'), toJSON),
event_content: prop('content'),
event_signature: pipe(prop('sig'), toBuffer),
})(event)
return void this.dbClient('events')
.insert(row)
.onConflict('event_id')
.ignore()
.then((number) => {
console.log(`Rows added`, (number as any).rowCount)
})
}
}

View File

@ -8,11 +8,9 @@ export const eventMessageSchema = Schema.array().ordered(
eventSchema.required(),
)
export const reqMessageSchema = Schema.array().ordered(
Schema.string().valid('REQ').required(),
Schema.string().required(),
filterSchema.required(),
)
export const reqMessageSchema = Schema.array()
.ordered(Schema.string().valid('REQ').required(), Schema.string().required())
.items(filterSchema.required())
export const closeMessageSchema = Schema.array().ordered(
Schema.string().valid('CLOSE').required(),

View File

@ -6,7 +6,5 @@ export const filterSchema = Schema.object({
kinds: Schema.array().items(Schema.number().min(0)),
since: Schema.number().min(0).multiple(1),
until: Schema.number().min(0).multiple(1),
limit: Schema.number().min(1).multiple(1).max(500),
'#e': Schema.array().items(Schema.string()),
'#p': Schema.array().items(Schema.string()),
})
limit: Schema.number().min(1).multiple(1).max(10000),
}).pattern(/^#[a-z]$/, Schema.array().items(Schema.string()))

View File

@ -1,3 +1,15 @@
export type Pubkey = string
export type TagName = string
export type Signature = string
type Enumerate<
N extends number,
Acc extends number[] = [],
> = Acc['length'] extends N
? Acc[number]
: Enumerate<N, [...Acc, Acc['length']]>
export type Range<F extends number, T extends number> = Exclude<
Enumerate<T>,
Enumerate<F>
>

View File

@ -18,6 +18,18 @@ export interface Event {
content: string
}
export interface DBEvent {
id: string
event_id: Buffer
event_pubkey: Buffer
event_kind: number
event_created_at: number
event_content: string
event_tags: Tag[]
event_signature: Buffer
first_seen: Date
}
export interface CanonicalEvent {
0: 0
1: string

View File

@ -1,3 +1,4 @@
import { Range } from './base'
import { Event } from './event'
import { SubscriptionId, SubscriptionFilter } from './subscription'
@ -6,6 +7,7 @@ export enum MessageType {
EVENT = 'EVENT',
CLOSE = 'CLOSE',
NOTICE = 'NOTICE',
EOSE = 'EOSE',
}
export type Message =
@ -13,12 +15,14 @@ export type Message =
| IncomingEventMessage
| UnsubscribeMessage
| Notice
| EndOfStoredEventsNotice
export interface SubscriptionMessage {
export type SubscriptionMessage = {
[index in Range<2, 100>]: SubscriptionFilter
} & {
0: MessageType.REQ
1: SubscriptionId
2: SubscriptionFilter
}
} & Array<SubscriptionFilter>
export interface IncomingEventMessage {
0: MessageType.EVENT
@ -40,3 +44,8 @@ export interface Notice {
0: MessageType.NOTICE
1: string
}
export interface EndOfStoredEventsNotice {
0: MessageType.EOSE
1: SubscriptionId
}

View File

@ -0,0 +1,7 @@
import { Event } from './event'
import { SubscriptionFilter } from './subscription'
export interface IEventRepository {
create(event: Event): Promise<void>
findByfilters(filters: SubscriptionFilter[]): Promise<Event[]>
}

View File

@ -1,4 +1,4 @@
import { Pubkey } from 'types'
import { Pubkey } from './base'
import { EventId } from './event'
export type SubscriptionId = string
@ -9,5 +9,6 @@ export interface SubscriptionFilter {
since?: number
until?: number
authors?: Pubkey[]
limit?: number
[key: `#${string}`]: string[]
}