mirror of
https://github.com/Cameri/nostream.git
synced 2025-06-02 11:10:24 +02:00
chore: add more intg tests & logging
This commit is contained in:
parent
77847627ed
commit
55df03df73
32
package-lock.json
generated
32
package-lock.json
generated
@ -10,6 +10,7 @@
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@noble/secp256k1": "1.7.0",
|
||||
"debug": "4.3.4",
|
||||
"joi": "17.6.1",
|
||||
"knex": "2.3.0",
|
||||
"pg": "8.8.0",
|
||||
@ -22,6 +23,7 @@
|
||||
"@cucumber/pretty-formatter": "1.0.0",
|
||||
"@types/chai": "^4.3.1",
|
||||
"@types/chai-as-promised": "^7.1.5",
|
||||
"@types/debug": "4.1.7",
|
||||
"@types/mocha": "^9.1.1",
|
||||
"@types/node": "^17.0.24",
|
||||
"@types/pg": "^8.6.5",
|
||||
@ -1077,6 +1079,15 @@
|
||||
"@types/chai": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/debug": {
|
||||
"version": "4.1.7",
|
||||
"resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.7.tgz",
|
||||
"integrity": "sha512-9AonUzyTjXXhEOa0DnqpzZi6VHlqKMswga9EXjpXnnqxwLtdvPPtlO8evrI5D9S6asFRCQ6v+wpiUKbw+vKqyg==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"@types/ms": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/json-schema": {
|
||||
"version": "7.0.11",
|
||||
"resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.11.tgz",
|
||||
@ -1095,6 +1106,12 @@
|
||||
"integrity": "sha512-Z61JK7DKDtdKTWwLeElSEBcWGRLY8g95ic5FoQqI9CMx0ns/Ghep3B4DfcEimiKMvtamNVULVNKEsiwV3aQmXw==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@types/ms": {
|
||||
"version": "0.7.31",
|
||||
"resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz",
|
||||
"integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@types/node": {
|
||||
"version": "17.0.24",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.24.tgz",
|
||||
@ -6507,6 +6524,15 @@
|
||||
"@types/chai": "*"
|
||||
}
|
||||
},
|
||||
"@types/debug": {
|
||||
"version": "4.1.7",
|
||||
"resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.7.tgz",
|
||||
"integrity": "sha512-9AonUzyTjXXhEOa0DnqpzZi6VHlqKMswga9EXjpXnnqxwLtdvPPtlO8evrI5D9S6asFRCQ6v+wpiUKbw+vKqyg==",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"@types/ms": "*"
|
||||
}
|
||||
},
|
||||
"@types/json-schema": {
|
||||
"version": "7.0.11",
|
||||
"resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.11.tgz",
|
||||
@ -6525,6 +6551,12 @@
|
||||
"integrity": "sha512-Z61JK7DKDtdKTWwLeElSEBcWGRLY8g95ic5FoQqI9CMx0ns/Ghep3B4DfcEimiKMvtamNVULVNKEsiwV3aQmXw==",
|
||||
"dev": true
|
||||
},
|
||||
"@types/ms": {
|
||||
"version": "0.7.31",
|
||||
"resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz",
|
||||
"integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==",
|
||||
"dev": true
|
||||
},
|
||||
"@types/node": {
|
||||
"version": "17.0.24",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.24.tgz",
|
||||
|
@ -64,6 +64,7 @@
|
||||
"@cucumber/pretty-formatter": "1.0.0",
|
||||
"@types/chai": "^4.3.1",
|
||||
"@types/chai-as-promised": "^7.1.5",
|
||||
"@types/debug": "4.1.7",
|
||||
"@types/mocha": "^9.1.1",
|
||||
"@types/node": "^17.0.24",
|
||||
"@types/pg": "^8.6.5",
|
||||
@ -89,8 +90,9 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@noble/secp256k1": "1.7.0",
|
||||
"debug": "4.3.4",
|
||||
"joi": "17.6.1",
|
||||
"knex": "^2.3.0",
|
||||
"knex": "2.3.0",
|
||||
"pg": "8.8.0",
|
||||
"pg-query-stream": "4.2.3",
|
||||
"ramda": "0.28.0",
|
||||
|
@ -12,5 +12,6 @@ export interface IWebServerAdapter extends EventEmitter {
|
||||
|
||||
|
||||
export type IWebSocketAdapter = EventEmitter & {
|
||||
getClientId(): string
|
||||
getSubscriptions(): Map<string, SubscriptionFilter[]>
|
||||
}
|
||||
|
@ -1,27 +1,39 @@
|
||||
import { Duplex, EventEmitter } from 'stream'
|
||||
import { IncomingMessage, Server, ServerResponse } from 'http'
|
||||
|
||||
import packageJson from '../../package.json'
|
||||
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { ISettings } from '../@types/settings'
|
||||
import { IWebServerAdapter } from '../@types/adapters'
|
||||
|
||||
export class WebServerAdapter extends EventEmitter implements IWebServerAdapter {
|
||||
const debug = createLogger('web-server-adapter')
|
||||
|
||||
export class WebServerAdapter extends EventEmitter implements IWebServerAdapter {
|
||||
public constructor(
|
||||
protected readonly webServer: Server,
|
||||
private readonly settings: () => ISettings,
|
||||
) {
|
||||
debug('web server starting')
|
||||
super()
|
||||
this.webServer.on('request', this.onWebServerRequest.bind(this))
|
||||
.on('clientError', this.onWebServerSocketError.bind(this))
|
||||
.on('close', this.onClose.bind(this))
|
||||
this.webServer
|
||||
.on('request', this.onRequest.bind(this))
|
||||
.on('clientError', this.onError.bind(this))
|
||||
.once('close', this.onClose.bind(this))
|
||||
.once('listening', this.onListening.bind(this))
|
||||
}
|
||||
|
||||
public listen(port: number): void {
|
||||
debug('attempt to listen on port %d', port)
|
||||
this.webServer.listen(port)
|
||||
}
|
||||
|
||||
private onWebServerRequest(request: IncomingMessage, response: ServerResponse) {
|
||||
private onListening() {
|
||||
debug('listening for incoming connections')
|
||||
}
|
||||
|
||||
private onRequest(request: IncomingMessage, response: ServerResponse) {
|
||||
debug('request received: %o', request)
|
||||
if (request.method === 'GET' && request.headers['accept'] === 'application/nostr+json') {
|
||||
const {
|
||||
info: { name, description, pubkey, contact },
|
||||
@ -38,14 +50,17 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter
|
||||
}
|
||||
|
||||
response.setHeader('content-type', 'application/nostr+json')
|
||||
response.end(JSON.stringify(relayInformationDocument))
|
||||
const body = JSON.stringify(relayInformationDocument)
|
||||
response.end(body)
|
||||
} else {
|
||||
response.setHeader('content-type', 'application/text')
|
||||
response.end('Please use a Nostr client to connect.')
|
||||
}
|
||||
debug('send response: %o', response)
|
||||
}
|
||||
|
||||
private onWebServerSocketError(error: Error, socket: Duplex) {
|
||||
private onError(error: Error, socket: Duplex) {
|
||||
debug('socket error: %o', error)
|
||||
if (error['code'] === 'ECONNRESET' || !socket.writable) {
|
||||
return
|
||||
}
|
||||
@ -53,7 +68,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter
|
||||
}
|
||||
|
||||
protected onClose() {
|
||||
console.log(`worker ${process.pid} web server closing`)
|
||||
debug('stopped listening to incoming connections')
|
||||
this.webServer.removeAllListeners()
|
||||
}
|
||||
}
|
||||
|
@ -10,20 +10,20 @@ import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
|
||||
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
|
||||
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
|
||||
import { attemptValidation } from '../utils/validation'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { Event } from '../@types/event'
|
||||
import { Factory } from '../@types/base'
|
||||
import { isEventMatchingFilter } from '../utils/event'
|
||||
import { messageSchema } from '../schemas/message-schema'
|
||||
|
||||
const debug = createLogger('web-socket-adapter')
|
||||
|
||||
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
|
||||
private id: string
|
||||
private clientAddress: string
|
||||
public clientId: string
|
||||
// private clientAddress: string
|
||||
private alive: boolean
|
||||
private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
|
||||
|
||||
private sent = 0
|
||||
private received = 0
|
||||
|
||||
public constructor(
|
||||
private readonly client: WebSocket,
|
||||
private readonly request: IncomingHttpMessage,
|
||||
@ -34,8 +34,8 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
this.alive = true
|
||||
this.subscriptions = new Map()
|
||||
|
||||
this.id = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
|
||||
this.clientAddress = this.request.headers['x-forwarded-for'] as string
|
||||
this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
|
||||
// this.clientAddress = this.request.headers['x-forwarded-for'] as string
|
||||
|
||||
this.client
|
||||
.on('message', this.onClientMessage.bind(this))
|
||||
@ -48,20 +48,30 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
.on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this))
|
||||
.on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this))
|
||||
.on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this))
|
||||
.on(WebSocketAdapterEvent.Message, this.onSendMessage.bind(this))
|
||||
.on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this))
|
||||
|
||||
debug('client %s connected', this.clientId)
|
||||
}
|
||||
|
||||
public getClientId(): string {
|
||||
return this.clientId
|
||||
}
|
||||
|
||||
public onUnsubscribed(subscriptionId: string): void {
|
||||
debug('client %s unsubscribed %s', this.clientId, subscriptionId)
|
||||
this.subscriptions.delete(subscriptionId)
|
||||
}
|
||||
|
||||
public onSubscribed(subscriptionId: string, filters: SubscriptionFilter[]): void {
|
||||
debug('client %s subscribed %s to %o', this.clientId, subscriptionId, filters)
|
||||
this.subscriptions.set(subscriptionId, filters)
|
||||
}
|
||||
|
||||
public onBroadcast(event: Event): void {
|
||||
debug('client %s broadcast event: %o', this.clientId, event)
|
||||
this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event)
|
||||
if (cluster.isWorker) {
|
||||
debug('client %s broadcast event to primary: %o', this.clientId, event)
|
||||
process.send({
|
||||
eventName: WebSocketServerAdapterEvent.Broadcast,
|
||||
event,
|
||||
@ -80,22 +90,20 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
}
|
||||
|
||||
private sendMessage(message: OutgoingMessage): void {
|
||||
this.sent++
|
||||
debug('sending message to client %s: %o', this.clientId, message)
|
||||
this.client.send(JSON.stringify(message))
|
||||
}
|
||||
|
||||
private onSendMessage(message: OutgoingMessage): void {
|
||||
this.sendMessage(message)
|
||||
}
|
||||
|
||||
public onHeartbeat(): void {
|
||||
if (!this.alive) {
|
||||
debug('client %s pong timed out', this.clientId)
|
||||
this.terminate()
|
||||
return
|
||||
}
|
||||
|
||||
this.alive = false
|
||||
this.client.ping()
|
||||
debug('client %s ping', this.clientId)
|
||||
}
|
||||
|
||||
public getSubscriptions(): Map<string, SubscriptionFilter[]> {
|
||||
@ -103,12 +111,13 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
}
|
||||
|
||||
private terminate(): void {
|
||||
console.debug(`worker ${process.pid} - terminating client`)
|
||||
debug('terminating client %s', this.clientId)
|
||||
this.client.terminate()
|
||||
debug('client %s terminated', this.clientId)
|
||||
}
|
||||
|
||||
private async onClientMessage(raw: Buffer) {
|
||||
let abort
|
||||
let abort: () => void
|
||||
try {
|
||||
const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf-8')))
|
||||
|
||||
@ -118,17 +127,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
this.client.prependOnceListener('close', abort)
|
||||
}
|
||||
|
||||
this.received++
|
||||
|
||||
await messageHandler?.handleMessage(message)
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name === 'AbortError') {
|
||||
console.error(`worker ${process.pid} - message handler aborted`)
|
||||
debug('message handler aborted')
|
||||
} else if (error instanceof Error && error.name === 'ValidationError') {
|
||||
console.error(`worker ${process.pid} - invalid message`, (error as any).annotate())
|
||||
debug('invalid message: %o', (error as any).annotate())
|
||||
this.sendMessage(createNoticeMessage(`Invalid message: ${error.message}`))
|
||||
} else {
|
||||
console.error(`worker ${process.pid} - unable to handle message: ${error.message}`)
|
||||
debug('unable to handle message: %o', error)
|
||||
}
|
||||
} finally {
|
||||
if (abort) {
|
||||
@ -138,13 +145,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
}
|
||||
|
||||
private onClientPong() {
|
||||
debug('client %s pong', this.clientId)
|
||||
this.alive = true
|
||||
}
|
||||
|
||||
private onClientClose() {
|
||||
debug('client %s closing', this.clientId)
|
||||
this.alive = false
|
||||
|
||||
this.removeAllListeners()
|
||||
this.client.removeAllListeners()
|
||||
|
||||
debug('client %s closed', this.clientId)
|
||||
}
|
||||
}
|
||||
|
@ -3,12 +3,14 @@ import WebSocket, { OPEN, WebSocketServer } from 'ws'
|
||||
|
||||
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
|
||||
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { Event } from '../@types/event'
|
||||
import { Factory } from '../@types/base'
|
||||
import { ISettings } from '../@types/settings'
|
||||
import { propEq } from 'ramda'
|
||||
import { WebServerAdapter } from './web-server-adapter'
|
||||
|
||||
const debug = createLogger('web-socket-server-adapter')
|
||||
|
||||
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000
|
||||
|
||||
@ -36,9 +38,9 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
this.webSocketServer
|
||||
.on(WebSocketServerAdapterEvent.Close, this.onClose.bind(this))
|
||||
.on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
|
||||
.on('error', (err) => {
|
||||
console.error('error', err)
|
||||
throw err
|
||||
.on('error', (error) => {
|
||||
debug('error: %o', error)
|
||||
throw error
|
||||
})
|
||||
this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
|
||||
}
|
||||
@ -55,8 +57,9 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
if (!propEq('readyState', OPEN)(webSocket)) {
|
||||
return
|
||||
}
|
||||
|
||||
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Event, event)
|
||||
const webSocketAdapter = this.webSocketsAdapters.get(webSocket)
|
||||
debug('broadcasting event to client %s: %o', webSocketAdapter.getClientId(), event)
|
||||
webSocketAdapter.emit(WebSocketAdapterEvent.Event, event)
|
||||
})
|
||||
}
|
||||
|
||||
@ -65,6 +68,7 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
}
|
||||
|
||||
private onConnection(client: WebSocket, req: IncomingMessage) {
|
||||
debug('client connected: %o', req.headers)
|
||||
this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this]))
|
||||
}
|
||||
|
||||
@ -75,13 +79,15 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
}
|
||||
|
||||
protected onClose() {
|
||||
this.webSocketServer.clients.forEach((webSocket: WebSocket) =>
|
||||
webSocket.terminate()
|
||||
)
|
||||
console.debug(`worker ${process.pid} - websocket server closing`)
|
||||
debug('closing')
|
||||
clearInterval(this.heartbeatInterval)
|
||||
this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
|
||||
debug('terminating client')
|
||||
webSocket.terminate()
|
||||
})
|
||||
this.removeAllListeners()
|
||||
this.webSocketServer.removeAllListeners()
|
||||
super.onClose()
|
||||
debug('closed')
|
||||
}
|
||||
}
|
||||
|
@ -1,58 +1,68 @@
|
||||
import { Cluster, Worker } from 'cluster'
|
||||
import { cpus } from 'os'
|
||||
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { IRunnable } from '../@types/base'
|
||||
import { ISettings } from '../@types/settings'
|
||||
import packageJson from '../../package.json'
|
||||
import { Serializable } from 'child_process'
|
||||
|
||||
const debug = createLogger('app-primary')
|
||||
|
||||
export class App implements IRunnable {
|
||||
public constructor(
|
||||
private readonly process: NodeJS.Process,
|
||||
private readonly cluster: Cluster,
|
||||
private readonly settingsFactory: () => ISettings,
|
||||
) {
|
||||
debug('starting')
|
||||
|
||||
this.cluster
|
||||
.on('message', this.onClusterMessage.bind(this))
|
||||
.on('exit', this.onClusterExit.bind(this))
|
||||
|
||||
this.process
|
||||
.on('SIGTERM', this.onExit.bind(this))
|
||||
|
||||
debug('started')
|
||||
}
|
||||
|
||||
public run(): void {
|
||||
console.log(`${packageJson.name}@${packageJson.version}`)
|
||||
console.log(`supported NIPs: ${packageJson.supportedNips}`)
|
||||
console.log(`primary ${this.process.pid} - running`)
|
||||
debug('running %s version %s', packageJson.name, packageJson.version)
|
||||
debug('supported NIPs: %o', packageJson.supportedNips)
|
||||
|
||||
const workerCount = this.settingsFactory().workers?.count || cpus().length
|
||||
|
||||
for (let i = 0; i < workerCount; i++) {
|
||||
debug('starting worker')
|
||||
this.cluster.fork()
|
||||
}
|
||||
}
|
||||
|
||||
private onClusterMessage(source: Worker, message: Serializable) {
|
||||
debug('message received from worker %s: %o', source.process.pid, message)
|
||||
for (const worker of Object.values(this.cluster.workers)) {
|
||||
if (source.id === worker.id) {
|
||||
continue
|
||||
}
|
||||
|
||||
debug('sending message to worker %s: %o', worker.process.pid, message)
|
||||
worker.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
private onClusterExit(deadWorker: Worker, code: number, signal: string) {
|
||||
console.log(`worker ${deadWorker.process.pid} - exiting`)
|
||||
if (code === 0 || signal === 'SIGINT') {
|
||||
return
|
||||
}
|
||||
|
||||
this.cluster.fork()
|
||||
debug('worker %s died', deadWorker.process.pid)
|
||||
if (code === 0 || signal === 'SIGINT') {
|
||||
return
|
||||
}
|
||||
debug('starting worker')
|
||||
const newWorker = this.cluster.fork()
|
||||
debug('started worker %s', newWorker.process.pid)
|
||||
}
|
||||
|
||||
private onExit() {
|
||||
console.log('exiting...')
|
||||
debug('exiting')
|
||||
this.process.exit(0)
|
||||
}
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
import { IRunnable } from '../@types/base'
|
||||
import { IWebSocketServerAdapter } from '../@types/adapters'
|
||||
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
|
||||
const debug = createLogger('app-worker')
|
||||
export class AppWorker implements IRunnable {
|
||||
public constructor(
|
||||
private readonly process: NodeJS.Process,
|
||||
@ -19,27 +22,27 @@ export class AppWorker implements IRunnable {
|
||||
const port = Number(process.env.SERVER_PORT) || 8008
|
||||
|
||||
this.adapter.listen(port)
|
||||
|
||||
console.log(`worker ${process.pid} - listening on port ${port}`)
|
||||
}
|
||||
|
||||
private onMessage(message: { eventName: string, event: unknown }): void {
|
||||
debug('broadcast message received: %o', message)
|
||||
this.adapter.emit(message.eventName, message.event)
|
||||
}
|
||||
|
||||
private onError(error: Error) {
|
||||
console.error(`worker ${process.pid} - error`, error)
|
||||
debug('error: %o', error)
|
||||
throw error
|
||||
}
|
||||
|
||||
private onExit() {
|
||||
console.log(`worker ${process.pid} - exiting`)
|
||||
debug('exiting')
|
||||
this.close(() => {
|
||||
this.process.exit(0)
|
||||
})
|
||||
}
|
||||
|
||||
public close(callback?: () => void) {
|
||||
debug('closing')
|
||||
this.adapter.close(callback)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
import 'pg'
|
||||
import 'pg-query-stream'
|
||||
import knex, { Knex } from 'knex'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
|
||||
const debug = createLogger('database-client')
|
||||
|
||||
const createDbConfig = (): Knex.Config => ({
|
||||
client: 'pg',
|
||||
@ -20,9 +23,12 @@ const createDbConfig = (): Knex.Config => ({
|
||||
})
|
||||
|
||||
let client: Knex
|
||||
|
||||
export const getDbClient = () => {
|
||||
if (!client) {
|
||||
client = knex(createDbConfig())
|
||||
const config = createDbConfig()
|
||||
debug('config: %o', config)
|
||||
client = knex(config)
|
||||
}
|
||||
|
||||
return client
|
||||
|
19
src/factories/logger-factory.ts
Normal file
19
src/factories/logger-factory.ts
Normal file
@ -0,0 +1,19 @@
|
||||
import cluster from 'cluster'
|
||||
import debug from 'debug'
|
||||
|
||||
export const createLogger = (
|
||||
namespace: string,
|
||||
options: { enabled?: boolean; stdout?: boolean } = { enabled: false, stdout: false }
|
||||
) => {
|
||||
const prefix = cluster.isWorker ? 'worker' : 'primary'
|
||||
const instance = debug(prefix)
|
||||
if (options.enabled) {
|
||||
debug.enable(`${prefix}:${namespace}:*`)
|
||||
}
|
||||
if (options.stdout) {
|
||||
instance.log = console.log.bind(console)
|
||||
}
|
||||
const fn = instance.extend(namespace)
|
||||
|
||||
return fn
|
||||
}
|
@ -2,6 +2,7 @@ import { mergeDeepLeft } from 'ramda'
|
||||
|
||||
import { DelegatedEvent, Event } from '../@types/event'
|
||||
import { EventDelegatorMetadataKey, EventTags } from '../constants/base'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { createNoticeMessage } from '../utils/messages'
|
||||
import { EventMessageHandler } from './event-message-handler'
|
||||
import { IMessageHandler } from '../@types/message-handlers'
|
||||
@ -9,20 +10,23 @@ import { IncomingEventMessage } from '../@types/messages'
|
||||
import { isDelegatedEventValid } from '../utils/event'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
const debug = createLogger('delegated-event-message-handler')
|
||||
|
||||
export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler {
|
||||
public async handleMessage(message: IncomingEventMessage): Promise<void> {
|
||||
const [, event] = message
|
||||
|
||||
let reason = this.canAcceptEvent(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
console.warn(`Event ${event.id} rejected. Reason: ${reason}`)
|
||||
return
|
||||
}
|
||||
|
||||
reason = await this.isEventValid(event)
|
||||
if (reason) {
|
||||
console.warn(`Event ${event.id} rejected. Reason: ${reason}`)
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
return
|
||||
}
|
||||
|
||||
@ -43,7 +47,7 @@ export class DelegatedEventMessageHandler extends EventMessageHandler implements
|
||||
try {
|
||||
await strategy.execute(delegatedEvent)
|
||||
} catch (error) {
|
||||
console.error('Error handling message:', message, error)
|
||||
debug('error handling message %o: %o', message, error)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
import { EventKindsRange, ISettings } from '../@types/settings'
|
||||
import { getEventProofOfWork, getPubkeyProofOfWork, isEventIdValid, isEventSignatureValid } from '../utils/event'
|
||||
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { createNoticeMessage } from '../utils/messages'
|
||||
import { Event } from '../@types/event'
|
||||
import { EventKinds } from '../constants/base'
|
||||
@ -9,6 +10,8 @@ import { IncomingEventMessage } from '../@types/messages'
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
const debug = createLogger('event-message-handler')
|
||||
|
||||
export class EventMessageHandler implements IMessageHandler {
|
||||
public constructor(
|
||||
protected readonly webSocket: IWebSocketAdapter,
|
||||
@ -17,18 +20,20 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
) { }
|
||||
|
||||
public async handleMessage(message: IncomingEventMessage): Promise<void> {
|
||||
debug('received message: %o', message)
|
||||
const [, event] = message
|
||||
|
||||
let reason = await this.isEventValid(event)
|
||||
if (reason) {
|
||||
console.warn(`Event ${event.id} rejected. Reason: ${reason}`)
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
return
|
||||
}
|
||||
|
||||
reason = this.canAcceptEvent(event)
|
||||
if (reason) {
|
||||
debug('event %s rejected: %s', event.id, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
|
||||
console.warn(`Event ${event.id} rejected. Reason: ${reason}`)
|
||||
return
|
||||
}
|
||||
|
||||
@ -41,7 +46,7 @@ export class EventMessageHandler implements IMessageHandler {
|
||||
try {
|
||||
await strategy.execute(event)
|
||||
} catch (error) {
|
||||
console.error('Error handling message:', message, error)
|
||||
debug('error handling message %o: %o', message, error)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,11 @@
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
import { IEventStrategy } from '../../@types/message-handlers'
|
||||
import { IWebSocketAdapter } from '../../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../../constants/adapter'
|
||||
|
||||
const debug = createLogger('default-event-strategy')
|
||||
|
||||
export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>> {
|
||||
public constructor(
|
||||
@ -12,6 +14,7 @@ export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>
|
||||
) { }
|
||||
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
const count = await this.eventRepository.create(event)
|
||||
if (!count) {
|
||||
return
|
||||
|
@ -1,3 +1,4 @@
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { EventTags } from '../../constants/base'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
@ -5,6 +6,7 @@ import { IEventStrategy } from '../../@types/message-handlers'
|
||||
import { IWebSocketAdapter } from '../../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../../constants/adapter'
|
||||
|
||||
const debug = createLogger('delete-event-strategy')
|
||||
|
||||
export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>> {
|
||||
public constructor(
|
||||
@ -13,6 +15,7 @@ export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>>
|
||||
) { }
|
||||
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
await this.eventRepository.create(event)
|
||||
|
||||
const eTags = event.tags.filter((tag) => tag[0] === EventTags.Event)
|
||||
|
@ -1,8 +1,10 @@
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { IEventStrategy } from '../../@types/message-handlers'
|
||||
import { IWebSocketAdapter } from '../../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../../constants/adapter'
|
||||
|
||||
const debug = createLogger('ephemeral-event-strategy')
|
||||
|
||||
export class EphemeralEventStrategy implements IEventStrategy<Event, Promise<void>> {
|
||||
public constructor(
|
||||
@ -10,6 +12,7 @@ export class EphemeralEventStrategy implements IEventStrategy<Event, Promise<voi
|
||||
) { }
|
||||
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,11 @@
|
||||
import { createLogger } from '../../factories/logger-factory'
|
||||
import { Event } from '../../@types/event'
|
||||
import { IEventRepository } from '../../@types/repositories'
|
||||
import { IEventStrategy } from '../../@types/message-handlers'
|
||||
import { IWebSocketAdapter } from '../../@types/adapters'
|
||||
import { WebSocketAdapterEvent } from '../../constants/adapter'
|
||||
|
||||
const debug = createLogger('replaceable-event-strategy')
|
||||
|
||||
export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<void>> {
|
||||
public constructor(
|
||||
@ -12,6 +14,7 @@ export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<v
|
||||
) { }
|
||||
|
||||
public async execute(event: Event): Promise<void> {
|
||||
debug('received event: %o', event)
|
||||
const count = await this.eventRepository.upsert(event)
|
||||
if (!count) {
|
||||
return
|
||||
|
@ -6,6 +6,7 @@ import { IAbortable, IMessageHandler } from '../@types/message-handlers'
|
||||
import { isEventMatchingFilter, toNostrEvent } from '../utils/event'
|
||||
import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream'
|
||||
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { Event } from '../@types/event'
|
||||
import { IEventRepository } from '../@types/repositories'
|
||||
import { ISettings } from '../@types/settings'
|
||||
@ -13,6 +14,8 @@ import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { SubscribeMessage } from '../@types/messages'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
const debug = createLogger('subscribe-message-handler')
|
||||
|
||||
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
private readonly abortController: AbortController
|
||||
|
||||
@ -29,21 +32,24 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
}
|
||||
|
||||
public async handleMessage(message: SubscribeMessage): Promise<void> {
|
||||
debug('received message: %o', message)
|
||||
const subscriptionId = message[1] as SubscriptionId
|
||||
const filters = uniqWith(equals, message.slice(2)) as SubscriptionFilter[]
|
||||
|
||||
const reason = this.canSubscribe(subscriptionId, filters)
|
||||
if (reason) {
|
||||
debug('subscription %s with %o rejected: %s', subscriptionId, filters, reason)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Subscription request rejected: ${reason}`))
|
||||
return
|
||||
}
|
||||
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, filters)
|
||||
|
||||
return this.fetchAndSend(subscriptionId, filters)
|
||||
await this.fetchAndSend(subscriptionId, filters)
|
||||
}
|
||||
|
||||
private async fetchAndSend(subscriptionId: string, filters: SubscriptionFilter[]): Promise<void> {
|
||||
debug('fetching events for subscription %s with %o', subscriptionId, filters)
|
||||
const sendEvent = (event: Event) =>
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createOutgoingEventMessage(subscriptionId, event))
|
||||
const sendEOSE = () =>
|
||||
@ -65,7 +71,10 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
)
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name === 'AbortError') {
|
||||
debug('aborted: %o', error)
|
||||
findEvents.end()
|
||||
} else {
|
||||
debug('error streaming events: %o', error)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
@ -1,9 +1,10 @@
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { IMessageHandler } from '../@types/message-handlers'
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { UnsubscribeMessage } from '../@types/messages'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
const debug = createLogger('unsubscribe-message-handler')
|
||||
|
||||
export class UnsubscribeMessageHandler implements IMessageHandler {
|
||||
public constructor(
|
||||
@ -11,6 +12,7 @@ export class UnsubscribeMessageHandler implements IMessageHandler {
|
||||
) { }
|
||||
|
||||
public async handleMessage(message: UnsubscribeMessage): Promise<void> {
|
||||
debug('received message: %o', message)
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Unsubscribe, message[1])
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import { DatabaseClient, EventId } from '../@types/base'
|
||||
import { DBEvent, Event } from '../@types/event'
|
||||
import { IEventRepository, IQueryResult } from '../@types/repositories'
|
||||
import { toBuffer, toJSON } from '../utils/transform'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { EventDelegatorMetadataKey } from '../constants/base'
|
||||
import { isGenericTagQuery } from '../utils/filter'
|
||||
import { SubscriptionFilter } from '../@types/subscription'
|
||||
@ -47,10 +48,13 @@ const groupByLengthSpec = groupBy(
|
||||
)
|
||||
)
|
||||
|
||||
const debug = createLogger('event-repository')
|
||||
|
||||
export class EventRepository implements IEventRepository {
|
||||
public constructor(private readonly dbClient: DatabaseClient) { }
|
||||
|
||||
public findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]> {
|
||||
debug('querying for %o', filters)
|
||||
if (!Array.isArray(filters) || !filters.length) {
|
||||
throw new Error('Filters cannot be empty')
|
||||
}
|
||||
@ -153,6 +157,7 @@ export class EventRepository implements IEventRepository {
|
||||
}
|
||||
|
||||
public async create(event: Event): Promise<number> {
|
||||
debug('creating event: %o', event)
|
||||
return this.insert(event).then(prop('rowCount') as () => number)
|
||||
}
|
||||
|
||||
@ -180,6 +185,7 @@ export class EventRepository implements IEventRepository {
|
||||
|
||||
|
||||
public upsert(event: Event): Promise<number> {
|
||||
debug('upserting event: %o', event)
|
||||
const toJSON = (input: any) => JSON.stringify(input)
|
||||
|
||||
const row = applySpec({
|
||||
@ -212,6 +218,7 @@ export class EventRepository implements IEventRepository {
|
||||
}
|
||||
|
||||
public deleteByPubkeyAndIds(pubkey: string, ids: EventId[]): Promise<number> {
|
||||
debug('deleting events from %s: %o', pubkey, ids)
|
||||
return this.dbClient('events')
|
||||
.where({
|
||||
event_pubkey: toBuffer(pubkey),
|
||||
|
@ -3,9 +3,11 @@ import { homedir } from 'os'
|
||||
import { join } from 'path'
|
||||
import { mergeDeepRight } from 'ramda'
|
||||
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { ISettings } from '../@types/settings'
|
||||
import packageJson from '../../package.json'
|
||||
|
||||
const debug = createLogger('settings')
|
||||
export class SettingsStatic {
|
||||
static _settings: ISettings
|
||||
|
||||
@ -58,6 +60,7 @@ export class SettingsStatic {
|
||||
}
|
||||
|
||||
public static loadSettings(path: string) {
|
||||
debug('loading settings from %s', path)
|
||||
return JSON.parse(
|
||||
fs.readFileSync(
|
||||
path,
|
||||
@ -67,6 +70,7 @@ export class SettingsStatic {
|
||||
}
|
||||
|
||||
public static createSettings(): ISettings {
|
||||
debug('creating settings')
|
||||
if (SettingsStatic._settings) {
|
||||
return SettingsStatic._settings
|
||||
}
|
||||
@ -86,13 +90,14 @@ export class SettingsStatic {
|
||||
|
||||
return SettingsStatic._settings
|
||||
} catch (error) {
|
||||
console.error('Unable to read config file. Reason: %s', error.message)
|
||||
debug('error reading config file at %s: %o', path, error)
|
||||
|
||||
return defaults
|
||||
}
|
||||
}
|
||||
|
||||
public static saveSettings(path: string, settings: ISettings) {
|
||||
debug('saving settings to %s: %o', path, settings)
|
||||
return fs.writeFileSync(
|
||||
path,
|
||||
JSON.stringify(settings, null, 2),
|
||||
|
@ -1,6 +1,69 @@
|
||||
Feature: NIP-01
|
||||
Scenario: Alice posts set_metadata event
|
||||
Given I am Alice
|
||||
And I subscribe to author Alice
|
||||
When I send a set_metadata event as Alice
|
||||
Then I receive a set_metadata event from Alice
|
||||
Scenario: Alice posts a set_metadata event
|
||||
Given someone is Alice
|
||||
And Alice subscribes to author Alice
|
||||
When Alice sends a set_metadata event
|
||||
Then Alice receives a set_metadata event from Alice
|
||||
|
||||
Scenario: Alice posts a text_note event
|
||||
Given someone is Alice
|
||||
And Alice subscribes to author Alice
|
||||
When Alice sends a text_note event with content "hello world"
|
||||
Then Alice receives a text_note event from Alice with content "hello world"
|
||||
|
||||
Scenario: Alice posts a recommend_server event
|
||||
Given someone is Alice
|
||||
And Alice subscribes to author Alice
|
||||
When Alice sends a recommend_server event with content "https://nostr-ts-relay.wlvs.space"
|
||||
Then Alice receives a recommend_server event from Alice with content "https://nostr-ts-relay.wlvs.space"
|
||||
|
||||
Scenario: Alice can't post a text_note event with an invalid signature
|
||||
Given someone is Alice
|
||||
When Alice sends a text_note event with invalid signature
|
||||
Then Alice receives a notice with invalid signature
|
||||
|
||||
Scenario: Alice and Bob exchange text_note events
|
||||
Given someone is Alice
|
||||
And someone is Bob
|
||||
And Alice subscribes to author Bob
|
||||
And Bob subscribes to author Alice
|
||||
When Bob sends a text_note event with content "hello alice"
|
||||
Then Alice receives a text_note event from Bob with content "hello alice"
|
||||
When Alice sends a text_note event with content "hello bob"
|
||||
Then Bob receives a text_note event from Alice with content "hello bob"
|
||||
|
||||
Scenario: Alice is interested in text_note events
|
||||
Given someone is Alice
|
||||
And someone is Bob
|
||||
And Alice subscribes to text_note events
|
||||
When Bob sends a text_note event with content "hello nostr"
|
||||
Then Alice receives a text_note event from Bob with content "hello nostr"
|
||||
|
||||
Scenario: Alice is interested in the #NostrNovember hashtag
|
||||
Given someone is Alice
|
||||
And someone is Bob
|
||||
And Alice subscribes to tag t with "NostrNovember"
|
||||
When Bob sends a text_note event with content "Nostr FTW!" and tag t containing "NostrNovember"
|
||||
Then Alice receives a text_note event from Bob with content "Nostr FTW!"
|
||||
|
||||
Scenario: Alice is interested in Bob's events from back in November
|
||||
Given someone is Alice
|
||||
And someone is Bob
|
||||
When Bob sends a text_note event with content "What's up?" on 1668074223
|
||||
And Alice subscribes to any event since 1667275200 until 1669870799
|
||||
Then Alice receives a text_note event from Bob with content "What's up?"
|
||||
|
||||
Scenario: Alice is interested Bob's in 2 past events
|
||||
Given someone is Alice
|
||||
And someone is Bob
|
||||
Then Bob subscribes to author Bob
|
||||
And Bob sends a text_note event with content "One"
|
||||
And Bob receives a text_note event from Bob with content "One"
|
||||
And Bob sends a text_note event with content "Two"
|
||||
And Bob receives a text_note event from Bob with content "Two"
|
||||
And Bob sends a text_note event with content "Three"
|
||||
And Bob receives a text_note event from Bob with content "Three"
|
||||
When Alice subscribes to author Bob with a limit of 2
|
||||
Then Alice receives 2 text_note events from Bob and EOSE
|
||||
|
||||
|
||||
|
@ -13,6 +13,7 @@ import { createHmac } from 'crypto'
|
||||
import sinonChai from 'sinon-chai'
|
||||
|
||||
import { Event } from '../../../../src/@types/event'
|
||||
import { getDbClient } from '../../../../src/database/client'
|
||||
import { MessageType } from '../../../../src/@types/messages'
|
||||
import { serializeEvent } from '../../../../src/utils/event'
|
||||
import { SubscriptionFilter } from '../../../../src/@types/subscription'
|
||||
@ -21,42 +22,82 @@ chai.use(sinonChai)
|
||||
const { expect } = chai
|
||||
|
||||
Before(async function () {
|
||||
const ws = new WebSocket('ws://localhost:8008')
|
||||
this.parameters.ws = ws
|
||||
await new Promise((resolve, reject) => {
|
||||
ws
|
||||
.once('open', resolve)
|
||||
.once('error', reject)
|
||||
this.parameters.identities = {}
|
||||
this.parameters.subscriptions = {}
|
||||
this.parameters.clients = {}
|
||||
})
|
||||
|
||||
After(async function () {
|
||||
this.parameters.subscriptions = {}
|
||||
Object.values(this.parameters.clients).forEach((ws: WebSocket) => {
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
ws.close()
|
||||
}
|
||||
})
|
||||
this.parameters.clients = {}
|
||||
|
||||
const dbClient = getDbClient()
|
||||
await Promise.all(
|
||||
Object.values(this.parameters.identities)
|
||||
.map(async (identity: { pubkey: string }) => dbClient('events').where({ event_pubkey: Buffer.from(identity.pubkey, 'hex') }).del())
|
||||
)
|
||||
this.parameters.identities = {}
|
||||
})
|
||||
|
||||
After(function () {
|
||||
const ws = this.parameters.ws as WebSocket
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
ws.close()
|
||||
}
|
||||
Given(/someone is (\w+)/, async function(name: string) {
|
||||
const connection = connect(name)
|
||||
this.parameters.identities[name] = this.parameters.identities[name] ?? createIdentity(name)
|
||||
this.parameters.clients[name] = await connection
|
||||
this.parameters.subscriptions[name] = []
|
||||
})
|
||||
|
||||
Given(/I am (\w+)/, function(name: string) {
|
||||
this.parameters.authors = this.parameters.authors ?? {}
|
||||
this.parameters.authors[name] = this.parameters.authors[name] ?? createIdentity(name)
|
||||
})
|
||||
|
||||
When(/I subscribe to author (\w+)/, async function(this: World<Record<string, any>>, name: string) {
|
||||
const ws = this.parameters.ws as WebSocket
|
||||
const pubkey = this.parameters.authors[name].pubkey
|
||||
this.parameters.subscriptions = this.parameters.subscriptions ?? []
|
||||
When(/(\w+) subscribes to author (\w+)$/, async function(this: World<Record<string, any>>, from: string, to: string) {
|
||||
const ws = this.parameters.clients[from] as WebSocket
|
||||
const pubkey = this.parameters.identities[to].pubkey
|
||||
const subscription = { name: `test-${Math.random()}`, filters: [{ authors: [pubkey] }] }
|
||||
this.parameters.subscriptions.push(subscription)
|
||||
this.parameters.subscriptions[from].push(subscription)
|
||||
|
||||
await createSubscription(ws, subscription.name, subscription.filters)
|
||||
})
|
||||
|
||||
When(/(\w+) subscribes to author (\w+) with a limit of (\d+)/, async function(this: World<Record<string, any>>, from: string, to: string, limit: string) {
|
||||
const ws = this.parameters.clients[from] as WebSocket
|
||||
const pubkey = this.parameters.identities[to].pubkey
|
||||
const subscription = { name: `test-${Math.random()}`, filters: [{ authors: [pubkey], limit: Number(limit) }] }
|
||||
this.parameters.subscriptions[from].push(subscription)
|
||||
|
||||
await createSubscription(ws, subscription.name, subscription.filters)
|
||||
})
|
||||
|
||||
When(/(\w+) subscribes to text_note events/, async function(this: World<Record<string, any>>, name: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = { name: `test-${Math.random()}`, filters: [{ kinds: [1] }] }
|
||||
this.parameters.subscriptions[name].push(subscription)
|
||||
|
||||
await createSubscription(ws, subscription.name, subscription.filters)
|
||||
})
|
||||
|
||||
When(/(\w+) subscribes to any event since (\d+) until (\d+)/, async function(this: World<Record<string, any>>, name: string, since: string, until: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = { name: `test-${Math.random()}`, filters: [{ since: Number(since), until: Number(until) }] }
|
||||
this.parameters.subscriptions[name].push(subscription)
|
||||
|
||||
await createSubscription(ws, subscription.name, subscription.filters)
|
||||
})
|
||||
|
||||
When(/(\w+) subscribes to tag (\w) with "(.*?)"$/, async function(this: World<Record<string, any>>, name: string, tag: string, value: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = { name: `test-${Math.random()}`, filters: [{ [`#${tag}`]: [value] }] }
|
||||
this.parameters.subscriptions[name].push(subscription)
|
||||
|
||||
await createSubscription(ws, subscription.name, subscription.filters)
|
||||
|
||||
await waitForEOSE(ws, subscription.name)
|
||||
})
|
||||
|
||||
When(/I send a set_metadata event as (\w+)/, async function(name: string) {
|
||||
const ws = this.parameters.ws as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.authors[name]
|
||||
When(/(\w+) sends a set_metadata event/, async function(name: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const content = JSON.stringify({ name })
|
||||
const event: Event = await createEvent({ pubkey, kind: 0, content }, privkey)
|
||||
@ -67,19 +108,156 @@ When(/I send a set_metadata event as (\w+)/, async function(name: string) {
|
||||
this.parameters.events.push(event)
|
||||
})
|
||||
|
||||
Then(/I receive a set_metadata event from (\w+)/, async function(author: string) {
|
||||
const expectedEvent = this.parameters.events.pop()
|
||||
const subscription = this.parameters.subscriptions[this.parameters.subscriptions.length - 1]
|
||||
const receivedEvent = await waitForNextEvent(this.parameters.ws, subscription.name)
|
||||
expect(receivedEvent.pubkey).to.equal(this.parameters.authors[author].pubkey)
|
||||
expect(receivedEvent).to.deep.equal(expectedEvent)
|
||||
When(/^(\w+) sends a text_note event with content "([^"]+)"$/, async function(name: string, content: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const event: Event = await createEvent({ pubkey, kind: 1, content }, privkey)
|
||||
|
||||
await sendEvent(ws, event)
|
||||
})
|
||||
|
||||
When(/^(\w+) sends a text_note event with content "([^"]+)" and tag (\w) containing "([^"]+)"$/, async function(
|
||||
name: string,
|
||||
content: string,
|
||||
tag: string,
|
||||
value: string,
|
||||
) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const event: Event = await createEvent({ pubkey, kind: 1, content, tags: [[tag, value]] }, privkey)
|
||||
|
||||
await sendEvent(ws, event)
|
||||
})
|
||||
|
||||
When(/^(\w+) sends a text_note event with content "([^"]+)" on (\d+)$/, async function(
|
||||
name: string,
|
||||
content: string,
|
||||
createdAt: string,
|
||||
) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const event: Event = await createEvent({ pubkey, kind: 1, content, created_at: Number(createdAt) }, privkey)
|
||||
|
||||
await sendEvent(ws, event)
|
||||
})
|
||||
|
||||
When(/(\w+) sends a text_note event with invalid signature/, async function(name: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const event: Event = await createEvent({ pubkey, kind: 1, content: "I'm cheating" }, privkey)
|
||||
|
||||
event.sig = 'f'.repeat(128)
|
||||
|
||||
await sendEvent(ws, event)
|
||||
})
|
||||
|
||||
When(/(\w+) sends a recommend_server event with content "(.+?)"/, async function(name: string, content: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const { pubkey, privkey } = this.parameters.identities[name]
|
||||
|
||||
const event: Event = await createEvent({ pubkey, kind: 2, content }, privkey)
|
||||
|
||||
await sendEvent(ws, event)
|
||||
})
|
||||
|
||||
Then(/(\w+) receives a set_metadata event from (\w+)/, async function(name: string, author: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
|
||||
const receivedEvent = await waitForNextEvent(ws, subscription.name)
|
||||
|
||||
expect(receivedEvent.kind).to.equal(0)
|
||||
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
})
|
||||
|
||||
Then(/(\w+) receives a text_note event from (\w+) with content "(.+?)"/, async function(name: string, author: string, content: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
|
||||
const receivedEvent = await waitForNextEvent(ws, subscription.name)
|
||||
|
||||
expect(receivedEvent.kind).to.equal(1)
|
||||
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
expect(receivedEvent.content).to.equal(content)
|
||||
})
|
||||
|
||||
Then(/(\w+) receives a text_note event from (\w+) with content "(.+?)" on (\d+)/, async function(
|
||||
name: string,
|
||||
author: string,
|
||||
content: string,
|
||||
createdAt: string,
|
||||
) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
|
||||
const receivedEvent = await waitForNextEvent(ws, subscription.name)
|
||||
|
||||
expect(receivedEvent.kind).to.equal(1)
|
||||
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
expect(receivedEvent.content).to.equal(content)
|
||||
expect(receivedEvent.created_at).to.equal(Number(createdAt))
|
||||
})
|
||||
|
||||
Then(/(\w+) receives (\d+) text_note events from (\w+)/, async function(
|
||||
name: string,
|
||||
count: string,
|
||||
author: string,
|
||||
) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
|
||||
const events = await waitForEventCount(ws, subscription.name, Number(count), true)
|
||||
|
||||
expect(events.length).to.equal(2)
|
||||
expect(events[0].kind).to.equal(1)
|
||||
expect(events[1].kind).to.equal(1)
|
||||
expect(events[0].pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
expect(events[1].pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
})
|
||||
|
||||
Then(/(\w+) receives a recommend_server event from (\w+) with content "(.+?)"/, async function(name: string, author: string, content: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
|
||||
const receivedEvent = await waitForNextEvent(ws, subscription.name)
|
||||
|
||||
expect(receivedEvent.kind).to.equal(2)
|
||||
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
|
||||
expect(receivedEvent.content).to.equal(content)
|
||||
})
|
||||
|
||||
Then(/(\w+) receives a notice with (.*)/, async function(name: string, pattern: string) {
|
||||
const ws = this.parameters.clients[name] as WebSocket
|
||||
const actualNotice = await waitForNotice(ws)
|
||||
|
||||
expect(actualNotice).to.contain(pattern)
|
||||
})
|
||||
|
||||
async function connect(_name: string) {
|
||||
const host = 'ws://localhost:8008'
|
||||
const ws = new WebSocket(host)
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
ws
|
||||
// .on('message', (data: RawData) => {
|
||||
// console.log(`${name} received`, JSON.parse(data.toString('utf-8')))
|
||||
// })
|
||||
.once('open', () => {
|
||||
resolve()
|
||||
})
|
||||
.once('error', reject)
|
||||
.once('close', () => {
|
||||
ws.removeAllListeners()
|
||||
})
|
||||
})
|
||||
return ws
|
||||
}
|
||||
|
||||
let eventCount = 0
|
||||
|
||||
async function createEvent(input: Partial<Event>, privkey: any): Promise<Event> {
|
||||
const event: Event = {
|
||||
pubkey: input.pubkey,
|
||||
kind: input.kind,
|
||||
created_at: input.created_at ?? Math.floor(Date.now() / 1000),
|
||||
created_at: input.created_at ?? Math.floor(Date.now() / 1000) + eventCount++,
|
||||
content: input.content ?? '',
|
||||
tags: input.tags ?? [],
|
||||
} as any
|
||||
@ -174,6 +352,82 @@ async function sendEvent(ws: WebSocket, event: Event) {
|
||||
|
||||
async function waitForNextEvent(ws: WebSocket, subscription: string): Promise<Event> {
|
||||
return new Promise((resolve, reject) => {
|
||||
ws.on('message', onMessage)
|
||||
ws.once('error', onError)
|
||||
|
||||
function cleanup() {
|
||||
ws.removeListener('message', onMessage)
|
||||
ws.removeListener('error', onError)
|
||||
}
|
||||
|
||||
function onError(error: Error) {
|
||||
reject(error)
|
||||
cleanup()
|
||||
}
|
||||
|
||||
function onMessage(raw: RawData) {
|
||||
const message = JSON.parse(raw.toString('utf-8'))
|
||||
if (message[0] === MessageType.EVENT && message[1] === subscription) {
|
||||
resolve(message[2])
|
||||
cleanup()
|
||||
} else if (message[0] === MessageType.NOTICE) {
|
||||
reject(new Error(message[1]))
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function waitForEventCount(
|
||||
ws: WebSocket,
|
||||
subscription: string,
|
||||
count = 1,
|
||||
eose = false,
|
||||
): Promise<Event[]> {
|
||||
const events = []
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
ws.on('message', onMessage)
|
||||
ws.once('error', onError)
|
||||
function cleanup() {
|
||||
ws.removeListener('message', onMessage)
|
||||
ws.removeListener('error', onError)
|
||||
}
|
||||
|
||||
function onError(error: Error) {
|
||||
reject(error)
|
||||
cleanup()
|
||||
}
|
||||
function onMessage(raw: RawData) {
|
||||
const message = JSON.parse(raw.toString('utf-8'))
|
||||
if (message[0] === MessageType.EVENT && message[1] === subscription) {
|
||||
events.push(message[2])
|
||||
if (!eose && events.length === count) {
|
||||
resolve(events)
|
||||
cleanup()
|
||||
} else if (events.length > count) {
|
||||
reject(new Error(`Expected ${count} but got ${events.length} events`))
|
||||
cleanup()
|
||||
}
|
||||
} else if (message[0] === MessageType.EOSE && message[1] === subscription) {
|
||||
if (!eose) {
|
||||
reject(new Error('Expected event but received EOSE'))
|
||||
} else if (events.length !== count) {
|
||||
reject(new Error(`Expected ${count} but got ${events.length} events before EOSE`))
|
||||
} else {
|
||||
resolve(events)
|
||||
}
|
||||
cleanup()
|
||||
} else if (message[0] === MessageType.NOTICE) {
|
||||
reject(new Error(message[1]))
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function waitForNotice(ws: WebSocket): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
function cleanup() {
|
||||
ws.removeListener('message', onMessage)
|
||||
ws.removeListener('error', onError)
|
||||
@ -186,16 +440,13 @@ async function waitForNextEvent(ws: WebSocket, subscription: string): Promise<Ev
|
||||
ws.once('error', onError)
|
||||
|
||||
function onMessage(raw: RawData) {
|
||||
ws.removeListener('error', onError)
|
||||
const message = JSON.parse(raw.toString('utf-8'))
|
||||
if (message[0] === MessageType.EVENT && message[1] === subscription) {
|
||||
resolve(message[2])
|
||||
cleanup()
|
||||
} else if (message[0] === MessageType.NOTICE) {
|
||||
reject(new Error(message[1]))
|
||||
if (message[0] === MessageType.NOTICE) {
|
||||
resolve(message[1])
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
ws.on('message', onMessage)
|
||||
})
|
||||
}
|
@ -3,6 +3,7 @@ import { AfterAll, BeforeAll } from '@cucumber/cucumber'
|
||||
import { AppWorker } from '../../../src/app/worker'
|
||||
import { DatabaseClient } from '../../../src/@types/base'
|
||||
import { getDbClient } from '../../../src/database/client'
|
||||
import { SettingsStatic } from '../../../src/utils/settings'
|
||||
import { workerFactory } from '../../../src/factories/worker-factory'
|
||||
|
||||
let worker: AppWorker
|
||||
@ -12,6 +13,10 @@ let dbClient: DatabaseClient
|
||||
BeforeAll({ timeout: 6000 }, async function () {
|
||||
dbClient = getDbClient()
|
||||
await dbClient.raw('SELECT 1=1')
|
||||
|
||||
const limits = SettingsStatic.createSettings().limits
|
||||
limits.event.createdAt.maxPositiveDelta = 0
|
||||
|
||||
worker = workerFactory()
|
||||
worker.run()
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user