mirror of
https://github.com/Cameri/nostream.git
synced 2025-03-17 13:21:45 +01:00
feat: clustering
This commit is contained in:
parent
fc38d9a472
commit
58c8a1371a
@ -33,7 +33,8 @@
|
||||
],
|
||||
"exclude": [
|
||||
"src/@types",
|
||||
"src/constants"
|
||||
"src/constants",
|
||||
"src/database"
|
||||
],
|
||||
"require": [
|
||||
"ts-node/register",
|
||||
|
@ -7,7 +7,7 @@ module.exports = {
|
||||
password: process.env.DB_PASSWORD ?? 'postgres',
|
||||
database: process.env.DB_NAME ?? 'nostr-ts-relay',
|
||||
},
|
||||
pool: { min: 0, max: 7 },
|
||||
pool: { min: 4, max: 16 },
|
||||
seeds: {
|
||||
directory: './seeds',
|
||||
},
|
||||
|
@ -4,6 +4,7 @@ import { WebSocket } from 'ws'
|
||||
export interface IWebSocketServerAdapter extends EventEmitter {
|
||||
getConnectedClients(): number
|
||||
getClients(): Set<WebSocket>
|
||||
terminate(): Promise<void>
|
||||
}
|
||||
|
||||
export interface IWebServerAdapter extends EventEmitter {
|
||||
|
14
src/@types/runes.ts
Normal file
14
src/@types/runes.ts
Normal file
@ -0,0 +1,14 @@
|
||||
export interface IAlternative {
|
||||
test(values: Record<string, any>): string | undefined
|
||||
encode(): string
|
||||
}
|
||||
|
||||
export interface IRestriction {
|
||||
test(values: Record<string, any>): string | undefined
|
||||
encode(): string
|
||||
}
|
||||
|
||||
export interface IRuneLike {
|
||||
test(values: Record<string, unknown>): [boolean, string]
|
||||
encode(): string
|
||||
}
|
@ -6,13 +6,13 @@ import { IAbortable, IMessageHandler } from '../@types/message-handlers'
|
||||
import { IncomingMessage, OutgoingMessage } from '../@types/messages'
|
||||
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
|
||||
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
|
||||
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
|
||||
import { attemptValidation } from '../utils/validation'
|
||||
import { createOutgoingEventMessage } from '../utils/messages'
|
||||
import { Event } from '../@types/event'
|
||||
import { Factory } from '../@types/base'
|
||||
import { isEventMatchingFilter } from '../utils/event'
|
||||
import { messageSchema } from '../schemas/message-schema'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
|
||||
private id: string
|
||||
@ -39,11 +39,12 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
.on('pong', this.onClientPong.bind(this))
|
||||
|
||||
this
|
||||
.on('heartbeat', this.onHeartbeat.bind(this))
|
||||
.on('subscribe', this.onSubscribed.bind(this))
|
||||
.on('unsubscribe', this.onUnsubscribed.bind(this))
|
||||
.on(WebSocketAdapterEvent.Send, this.onSend.bind(this))
|
||||
.on(WebSocketAdapterEvent.Heartbeat, this.onHeartbeat.bind(this))
|
||||
.on(WebSocketAdapterEvent.Subscribe, this.onSubscribed.bind(this))
|
||||
.on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this))
|
||||
.on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this))
|
||||
.on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this))
|
||||
.on(WebSocketAdapterEvent.Message, this.onSendMessage.bind(this))
|
||||
}
|
||||
|
||||
public onUnsubscribed(subscriptionId: string): void {
|
||||
@ -55,10 +56,10 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
}
|
||||
|
||||
public onBroadcast(event: Event): void {
|
||||
this.webSocketServer.emit('broadcast', event)
|
||||
this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event)
|
||||
}
|
||||
|
||||
public onSend(event: Event): void {
|
||||
public onSendEvent(event: Event): void {
|
||||
this.subscriptions.forEach((filters, subscriptionId) => {
|
||||
if (
|
||||
Array.from(filters).map(isEventMatchingFilter).some((Matches) => Matches(event))
|
||||
@ -68,10 +69,14 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
|
||||
})
|
||||
}
|
||||
|
||||
public sendMessage(message: OutgoingMessage): void {
|
||||
private sendMessage(message: OutgoingMessage): void {
|
||||
this.client.send(JSON.stringify(message))
|
||||
}
|
||||
|
||||
private onSendMessage(message: OutgoingMessage): void {
|
||||
this.sendMessage(message)
|
||||
}
|
||||
|
||||
public onHeartbeat(): void {
|
||||
if (!this.alive) {
|
||||
this.terminate()
|
||||
|
@ -36,13 +36,23 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
|
||||
}
|
||||
|
||||
public async terminate(): Promise<void> {
|
||||
return void Promise.all(
|
||||
[
|
||||
...Array.from(this.webSocketServer.clients).map((webSocket: WebSocket) =>
|
||||
webSocket.terminate()
|
||||
),
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
private onBroadcast(event: Event) {
|
||||
this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
|
||||
if (!propEq('readyState', OPEN)(webSocket)) {
|
||||
return
|
||||
}
|
||||
|
||||
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Send, event)
|
||||
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Event, event)
|
||||
})
|
||||
}
|
||||
|
||||
@ -62,14 +72,19 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
|
||||
|
||||
private onHeartbeat() {
|
||||
console.debug(`heartbeat - ${this.getConnectedClients()} connected / ${this.webSocketServer.clients.size} total`)
|
||||
this.webSocketServer.clients.forEach((webSocket) => this.webSocketsAdapters.get(webSocket).emit('heartbeat'))
|
||||
this.webSocketServer.clients.forEach((webSocket) =>
|
||||
this.webSocketsAdapters.get(webSocket).emit(WebSocketAdapterEvent.Heartbeat)
|
||||
)
|
||||
}
|
||||
|
||||
protected onClose() {
|
||||
this.webSocketServer.clients.forEach((webSocket: WebSocket) =>
|
||||
|
||||
webSocket.terminate()
|
||||
)
|
||||
console.debug('websocket server closing')
|
||||
clearInterval(this.heartbeatInterval)
|
||||
this.webSocketServer.removeAllListeners()
|
||||
super.onClose()
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,10 @@
|
||||
export enum WebSocketAdapterEvent {
|
||||
Send = 'send',
|
||||
Broadcast = 'broadcast'
|
||||
Event = 'event',
|
||||
Message = 'message',
|
||||
Broadcast = 'broadcast',
|
||||
Subscribe = 'subscribe',
|
||||
Unsubscribe = 'unsubscribe',
|
||||
Heartbeat = 'heartbeat'
|
||||
}
|
||||
|
||||
export enum WebSocketServerAdapterEvent {
|
||||
|
@ -10,6 +10,7 @@ import { Event } from '../@types/event'
|
||||
import { IEventRepository } from '../@types/repositories'
|
||||
import { IWebSocketAdapter } from '../@types/adapters'
|
||||
import { SubscribeMessage } from '../@types/messages'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
|
||||
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
@ -30,10 +31,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
const subscriptionId = message[1] as SubscriptionId
|
||||
const filters = message.slice(2) as SubscriptionFilter[]
|
||||
|
||||
this.webSocket.emit('subscribe', subscriptionId, new Set(filters))
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, new Set(filters))
|
||||
|
||||
const sendEvent = (event: Event) => this.webSocket.sendMessage(createOutgoingEventMessage(subscriptionId, event))
|
||||
const sendEOSE = () => this.webSocket.sendMessage(createEndOfStoredEventsNoticeMessage(subscriptionId))
|
||||
const sendEvent = (event: Event) =>
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createOutgoingEventMessage(subscriptionId, event))
|
||||
const sendEOSE = () =>
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId))
|
||||
|
||||
const findEvents = this.eventRepository.findByFilters(filters).stream()
|
||||
try {
|
||||
|
@ -2,6 +2,7 @@ import { IWebSocketAdapter } from '../@types/adapters'
|
||||
|
||||
import { IMessageHandler } from '../@types/message-handlers'
|
||||
import { UnsubscribeMessage } from '../@types/messages'
|
||||
import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
|
||||
|
||||
export class UnsubscribeMessageHandler implements IMessageHandler {
|
||||
@ -10,6 +11,6 @@ export class UnsubscribeMessageHandler implements IMessageHandler {
|
||||
) { }
|
||||
|
||||
public async handleMessage(message: UnsubscribeMessage): Promise<void> {
|
||||
this.webSocket.emit('unsubscribe', message[1])
|
||||
this.webSocket.emit(WebSocketAdapterEvent.Unsubscribe, message[1])
|
||||
}
|
||||
}
|
||||
|
125
src/index.ts
125
src/index.ts
@ -1,4 +1,7 @@
|
||||
import * as http from 'http'
|
||||
import cluster, { Worker } from 'cluster'
|
||||
import { cpus } from 'os'
|
||||
import http from 'http'
|
||||
import process from 'process'
|
||||
import { WebSocketServer } from 'ws'
|
||||
|
||||
import { EventRepository } from './repositories/event-repository'
|
||||
@ -6,29 +9,109 @@ import { getDbClient } from './database/client'
|
||||
import { webSocketAdapterFactory } from './factories/websocket-adapter-factory'
|
||||
import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter'
|
||||
|
||||
const server = http.createServer()
|
||||
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
|
||||
const dbClient = getDbClient()
|
||||
const eventRepository = new EventRepository(dbClient)
|
||||
|
||||
const adapter = new WebSocketServerAdapter(
|
||||
server,
|
||||
wss,
|
||||
webSocketAdapterFactory(eventRepository)
|
||||
)
|
||||
|
||||
const numCpus = cpus().length
|
||||
const port = Number(process.env.SERVER_PORT) || 8008
|
||||
adapter.listen(port)
|
||||
|
||||
process.on('SIGINT', async function () {
|
||||
console.log('\rCaught interrupt signal')
|
||||
wss.clients.forEach((client) => client.terminate())
|
||||
await new Promise((resolve, reject) =>
|
||||
wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
|
||||
const newWorker = (): Worker => {
|
||||
let timeout
|
||||
const worker = cluster.fork()
|
||||
worker
|
||||
.on('listening', () => {
|
||||
console.log(`worker ${worker.process.pid} listening`)
|
||||
// worker.send('shutdown')
|
||||
// worker.disconnect()
|
||||
// timeout = setTimeout(() => {
|
||||
// worker.kill()
|
||||
// }, 5000)
|
||||
})
|
||||
.on('disconnect', () => {
|
||||
console.log(`worker ${worker.process.pid} disconnect`)
|
||||
clearTimeout(timeout)
|
||||
})
|
||||
.on('exit', (code, signal) => {
|
||||
console.log(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
|
||||
})
|
||||
|
||||
return worker
|
||||
}
|
||||
|
||||
if (cluster.isPrimary) {
|
||||
console.log(`Primary ${process.pid} is running`)
|
||||
|
||||
for (let i = 0; i < numCpus; i++) {
|
||||
newWorker()
|
||||
}
|
||||
|
||||
cluster.on('exit', (deadWorker) => {
|
||||
const worker = newWorker()
|
||||
|
||||
// Note the process IDs
|
||||
const newPID = worker.process.pid
|
||||
const oldPID = deadWorker.process.pid
|
||||
|
||||
// Log the event
|
||||
console.log('worker ' + oldPID + ' died.')
|
||||
console.log('worker ' + newPID + ' born.')
|
||||
})
|
||||
|
||||
process.on('SIGINT', function () {
|
||||
console.log('\rCaught interrupt signal')
|
||||
|
||||
//await Promise.all(apps.map((app) => app.terminate()))
|
||||
// for (const id in cluster.workers) {
|
||||
// apps.get(cluster.workers[id])
|
||||
// }
|
||||
|
||||
// await new Promise((resolve, reject) =>
|
||||
// wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
// )
|
||||
// await new Promise((resolve, reject) =>
|
||||
// server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
// )
|
||||
|
||||
for (const id in cluster.workers) {
|
||||
console.log('id', id)
|
||||
console.log(`shutting down worker ${cluster.workers[id].process.pid}`)
|
||||
cluster.workers[id].send('shutdown')
|
||||
}
|
||||
|
||||
console.log('Disconnecting from db')
|
||||
dbClient.destroy(() => {
|
||||
console.log('Exiting')
|
||||
process.exit()
|
||||
})
|
||||
})
|
||||
} else if (cluster.isWorker) {
|
||||
|
||||
const server = http.createServer()
|
||||
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
|
||||
const adapter = new WebSocketServerAdapter(
|
||||
server,
|
||||
wss,
|
||||
webSocketAdapterFactory(eventRepository)
|
||||
)
|
||||
await new Promise((resolve, reject) =>
|
||||
server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
)
|
||||
dbClient.destroy()
|
||||
process.exit()
|
||||
})
|
||||
|
||||
adapter.listen(port)
|
||||
|
||||
process.on('message', async (msg) => {
|
||||
console.log('worker received', msg)
|
||||
if (msg === 'shutdown') {
|
||||
console.log('disconnecting all clients')
|
||||
wss.clients.forEach((client) => client.terminate())
|
||||
wss.close()
|
||||
// server.close()
|
||||
// await new Promise((resolve, reject) =>
|
||||
// wss.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
// )
|
||||
// await new Promise((resolve, reject) =>
|
||||
// server.close((error?: Error) => void (error instanceof Error) ? reject(error) : resolve(undefined))
|
||||
// )
|
||||
}
|
||||
})
|
||||
|
||||
console.log(`Worker ${process.pid} started and listening on port ${port}`)
|
||||
}
|
||||
|
@ -172,10 +172,6 @@ export const isEphemeralEvent = (event: Event): boolean => {
|
||||
return event.kind >= 20000 && event.kind < 30000
|
||||
}
|
||||
|
||||
export const isNullEvent = (event: Event): boolean => {
|
||||
return event.kind === Number.MAX_SAFE_INTEGER
|
||||
}
|
||||
|
||||
export const isDeleteEvent = (event: Event): boolean => {
|
||||
return event.kind === EventKinds.DELETE
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
import { IAlternative } from '../../@types/runes'
|
||||
|
||||
const punctuations = /[!"#\$%&'()*+-.\/:;<=>?@\[\\\]^`{|}~]/
|
||||
|
||||
@ -5,7 +6,7 @@ const hasPunctuation = (input) => punctuations.test(input)
|
||||
|
||||
// Reference: https://github.com/rustyrussell/runes/blob/master/runes/runes.py
|
||||
|
||||
export class Alternative {
|
||||
export class Alternative implements IAlternative {
|
||||
public constructor(
|
||||
private readonly field: string,
|
||||
private readonly cond: string,
|
||||
@ -78,7 +79,7 @@ export class Alternative {
|
||||
return `${this.field}${this.cond}${this.value.replace(/[\\|&]/g, '\\$&')}`
|
||||
}
|
||||
|
||||
public static decode(encodedStr: string): [Alternative, string] {
|
||||
public static decode(encodedStr: string): [IAlternative, string] {
|
||||
let cond = undefined
|
||||
let endOff = 0
|
||||
|
||||
@ -118,7 +119,7 @@ export class Alternative {
|
||||
return [new Alternative(field, cond, value), encodedStr.slice(endOff)]
|
||||
}
|
||||
|
||||
public static from(encodedStr: string): Alternative {
|
||||
public static from(encodedStr: string): IAlternative {
|
||||
const [field, cond, value] = encodedStr.replace(/\s+/g, '').split(new RegExp(`(${punctuations.source})`, 'g'))
|
||||
|
||||
return new Alternative(field, cond, value)
|
||||
|
@ -1,9 +1,10 @@
|
||||
import { IAlternative, IRestriction } from '../../@types/runes'
|
||||
import { Alternative } from './alternative'
|
||||
|
||||
|
||||
export class Restriction {
|
||||
export class Restriction implements IRestriction {
|
||||
public constructor(
|
||||
private readonly alternatives: Alternative[]
|
||||
private readonly alternatives: IAlternative[]
|
||||
) {
|
||||
if (!alternatives.length) {
|
||||
throw new Error('Restriction must have some alternatives')
|
||||
@ -27,10 +28,10 @@ export class Restriction {
|
||||
return this.alternatives.map((alternative) => alternative.encode()).join('|')
|
||||
}
|
||||
|
||||
public static decode(encodedStr: string): [Restriction, string] {
|
||||
public static decode(encodedStr: string): [IRestriction, string] {
|
||||
let encStr = encodedStr
|
||||
let alternative: Alternative
|
||||
const alternatives: Alternative[] = []
|
||||
let alternative: IAlternative
|
||||
const alternatives: IAlternative[] = []
|
||||
while (encStr.length) {
|
||||
if (encStr.startsWith('&')) {
|
||||
encStr = encStr.slice(1)
|
||||
|
@ -1,9 +1,10 @@
|
||||
import { IRestriction, IRuneLike } from '../../@types/runes'
|
||||
import { Restriction } from './restriction'
|
||||
|
||||
|
||||
export class RuneLike {
|
||||
export class RuneLike implements IRuneLike {
|
||||
public constructor(
|
||||
private readonly restrictions: Restriction[] = []
|
||||
private readonly restrictions: IRestriction[]
|
||||
) { }
|
||||
|
||||
public test(values: Record<string, unknown>): [boolean, string] {
|
||||
@ -17,13 +18,13 @@ export class RuneLike {
|
||||
return [true, '']
|
||||
}
|
||||
|
||||
public encode() {
|
||||
public encode(): string {
|
||||
return this.restrictions.map((restriction) => restriction.encode()).join('&')
|
||||
}
|
||||
|
||||
public static from(encodedStr: string): RuneLike {
|
||||
const restrictions: Restriction[] = []
|
||||
let restriction: Restriction
|
||||
public static from(encodedStr: string): IRuneLike {
|
||||
const restrictions: IRestriction[] = []
|
||||
let restriction: IRestriction
|
||||
let encStr = encodedStr.replace(/\s+/g, '')
|
||||
|
||||
while (encStr.length) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user