feat: support db read replicas

Signed-off-by: Ricardo Arturo Cabral Mejía <me@ricardocabral.io>
This commit is contained in:
Ricardo Arturo Cabral Mejía 2023-01-25 00:13:01 -05:00
parent b24b0283b0
commit 09039791f1
No known key found for this signature in database
GPG Key ID: 5931EBF43A650245
13 changed files with 135 additions and 60 deletions

View File

@ -4,28 +4,37 @@
The following environment variables can be set:
| Name | Description | Default |
|-----------------------|--------------------------------|------------------------|
| RELAY_PORT | Relay's server port | 8008 |
| WORKER_COUNT | Number of workers override | No. of available CPUs |
| DB_HOST | PostgresSQL Hostname | |
| DB_PORT | PostgreSQL Port | 5432 |
| DB_USER | PostgreSQL Username | nostr_ts_relay |
| DB_PASSWORD | PostgreSQL Password | nostr_ts_relay |
| DB_NAME | PostgreSQL Database name | nostr_ts_relay |
| DB_MIN_POOL_SIZE | Min. connections per worker | 16 |
| DB_MAX_POOL_SIZE | Max. connections per worker | 32 |
| DB_ACQUIRE_CONNECTION_TIMEOUT | New connection timeout (ms) | 60000 |
| TOR_HOST | Tor Hostname | |
| TOR_CONTROL_PORT | Tor control Port | 9051 |
| TOR_PASSWORD | Tor control password | nostr_ts_relay |
| HIDDEN_SERVICE_PORT | Tor hidden service port | 80 |
| REDIS_HOST | | |
| REDIS_PORT | Redis Port | 6379 |
| REDIS_USER | Redis User | default |
| REDIS_PASSWORD | Redis Password | nostr_ts_relay |
| NOSTR_CONFIG_DIR | Configuration directory | <project_root>/.nostr/ |
| DEBUG | Debugging filter | |
| Name | Description | Default |
|----------------------------------|--------------------------------|------------------------|
| RELAY_PORT | Relay's server port | 8008 |
| WORKER_COUNT | Number of workers override | No. of available CPUs |
| DB_HOST | PostgresSQL Hostname | |
| DB_PORT | PostgreSQL Port | 5432 |
| DB_USER | PostgreSQL Username | nostr_ts_relay |
| DB_PASSWORD | PostgreSQL Password | nostr_ts_relay |
| DB_NAME | PostgreSQL Database name | nostr_ts_relay |
| DB_MIN_POOL_SIZE | Min. connections per worker | 16 |
| DB_MAX_POOL_SIZE | Max. connections per worker | 32 |
| DB_ACQUIRE_CONNECTION_TIMEOUT | New connection timeout (ms) | 60000 |
| READ_REPLICA_ENABLED | Read Replica (RR) Toggle | 'false' |
| RR_DB_HOST | PostgresSQL Hostname (RR) | |
| RR_DB_PORT | PostgreSQL Port (RR) | 5432 |
| RR_DB_USER | PostgreSQL Username (RR) | nostr_ts_relay |
| RR_DB_PASSWORD | PostgreSQL Password (RR) | nostr_ts_relay |
| RR_DB_NAME | PostgreSQL Database name (RR) | nostr_ts_relay |
| RR_DB_MIN_POOL_SIZE | Min. connections per worker (RR) | 16 |
| RR_DB_MAX_POOL_SIZE | Max. connections per worker (RR) | 32 |
| RR_DB_ACQUIRE_CONNECTION_TIMEOUT | New connection timeout (ms) (RR) | 60000 |
| TOR_HOST | Tor Hostname | |
| TOR_CONTROL_PORT | Tor control Port | 9051 |
| TOR_PASSWORD | Tor control password | nostr_ts_relay |
| HIDDEN_SERVICE_PORT | Tor hidden service port | 80 |
| REDIS_HOST | | |
| REDIS_PORT | Redis Port | 6379 |
| REDIS_USER | Redis User | default |
| REDIS_PASSWORD | Redis Password | nostr_ts_relay |
| NOSTR_CONFIG_DIR | Configuration directory | <project_root>/.nostr/ |
| DEBUG | Debugging filter | |
# Settings

View File

@ -63,7 +63,7 @@ NIPs with a relay-specific implementation are listed here.
## Requirements
### Standalone setup
- PostgreSQL 15.0
- PostgreSQL 14.0
- Redis
- Node v18
- Typescript

View File

@ -5,6 +5,7 @@ services:
environment:
RELAY_PORT: 8008
NOSTR_CONFIG_DIR: /home/node/
# Master
DB_HOST: db
DB_PORT: 5432
DB_USER: nostr_ts_relay
@ -13,6 +14,17 @@ services:
DB_MIN_POOL_SIZE: 16
DB_MAX_POOL_SIZE: 64
DB_ACQUIRE_CONNECTION_TIMEOUT: 60000
# Read Replica
READ_REPLICA_ENABLED: 'false'
RR_DB_HOST: db
RR_DB_PORT: 5432
RR_DB_USER: nostr_ts_relay
RR_DB_PASSWORD: nostr_ts_relay
RR_DB_NAME: nostr_ts_relay
RR_DB_MIN_POOL_SIZE: 16
RR_DB_MAX_POOL_SIZE: 64
RR_DB_ACQUIRE_CONNECTION_TIMEOUT: 60000
# Redis
REDIS_HOST: cache
REDIS_PORT: 6379
REDIS_USER: default
@ -42,7 +54,7 @@ services:
default:
ipv4_address: 10.10.10.2
db:
image: postgres:14
image: postgres
container_name: db
environment:
POSTGRES_DB: nostr_ts_relay

View File

@ -3,9 +3,7 @@ import 'pg-query-stream'
import knex, { Knex } from 'knex'
import { createLogger } from '../factories/logger-factory'
const debug = createLogger('database-client')
const createDbConfig = (): Knex.Config => ({
const getMasterConfig = (): Knex.Config => ({
client: 'pg',
connection: {
host: process.env.DB_HOST,
@ -28,14 +26,55 @@ const createDbConfig = (): Knex.Config => ({
: 60000,
})
let client: Knex
const getReadReplicaConfig = (): Knex.Config => ({
client: 'pg',
connection: {
host: process.env.RR_DB_HOST,
port: Number(process.env.RR_DB_PORT),
user: process.env.RR_DB_USER,
password: process.env.RR_DB_PASSWORD,
database: process.env.RR_DB_NAME,
},
pool: {
min: process.env.RR_DB_MIN_POOL_SIZE ? Number(process.env.RR_DB_MIN_POOL_SIZE) : 0,
max: process.env.RR_DB_MAX_POOL_SIZE ? Number(process.env.RR_DB_MAX_POOL_SIZE) : 3,
idleTimeoutMillis: 60000,
propagateCreateError: false,
acquireTimeoutMillis: process.env.RR_DB_ACQUIRE_CONNECTION_TIMEOUT
? Number(process.env.RR_DB_ACQUIRE_CONNECTION_TIMEOUT)
: 60000,
},
acquireConnectionTimeout: process.env.RR_DB_ACQUIRE_CONNECTION_TIMEOUT
? Number(process.env.RR_DB_ACQUIRE_CONNECTION_TIMEOUT)
: 60000,
})
export const getDbClient = () => {
if (!client) {
const config = createDbConfig()
let writeClient: Knex
export const getMasterDbClient = () => {
const debug = createLogger('database-client:get-db-client')
if (!writeClient) {
const config = getMasterConfig()
debug('config: %o', config)
client = knex(config)
writeClient = knex(config)
}
return client
return writeClient
}
let readClient: Knex
export const getReadReplicaDbClient = () => {
if (process.env.READ_REPLICA_ENABLED !== 'true') {
return getMasterDbClient()
}
const debug = createLogger('database-client:get-read-replica-db-client')
if (!readClient) {
const config = getReadReplicaConfig()
debug('config: %o', config)
readClient = knex(config)
}
return readClient
}

View File

@ -2,16 +2,17 @@ import http from 'http'
import process from 'process'
import { WebSocketServer } from 'ws'
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
import { AppWorker } from '../app/worker'
import { createSettings } from '../factories/settings-factory'
import { EventRepository } from '../repositories/event-repository'
import { getDbClient } from '../database/client'
import { webSocketAdapterFactory } from './websocket-adapter-factory'
import { WebSocketServerAdapter } from '../adapters/web-socket-server-adapter'
export const workerFactory = (): AppWorker => {
const dbClient = getDbClient()
const eventRepository = new EventRepository(dbClient)
const dbClient = getMasterDbClient()
const readReplicaDbClient = getReadReplicaDbClient()
const eventRepository = new EventRepository(dbClient, readReplicaDbClient)
// deepcode ignore HttpToHttps: we use proxies
const server = http.createServer()

View File

@ -53,7 +53,10 @@ const groupByLengthSpec = groupBy(
const debug = createLogger('event-repository')
export class EventRepository implements IEventRepository {
public constructor(private readonly dbClient: DatabaseClient) { }
public constructor(
private readonly masterDbClient: DatabaseClient,
private readonly readReplicaDbClient: DatabaseClient,
) { }
public findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]> {
debug('querying for %o', filters)
@ -61,9 +64,9 @@ export class EventRepository implements IEventRepository {
throw new Error('Filters cannot be empty')
}
const queries = filters.map((currentFilter) => {
const builder = this.dbClient<DBEvent>('events')
const builder = this.readReplicaDbClient<DBEvent>('events')
forEachObjIndexed((tableFields: string[], filterName: string) => {
forEachObjIndexed((tableFields: string[], filterName: string | number) => {
builder.andWhere((bd) => {
cond([
[isEmpty, () => void bd.whereRaw('1 = 0')],
@ -179,7 +182,7 @@ export class EventRepository implements IEventRepository {
),
})(event)
return this.dbClient('events')
return this.masterDbClient('events')
.insert(row)
.onConflict()
.ignore()
@ -211,12 +214,12 @@ export class EventRepository implements IEventRepository {
),
})(event)
const query = this.dbClient('events')
const query = this.masterDbClient('events')
.insert(row)
// NIP-16: Replaceable Events
// NIP-33: Parameterized Replaceable Events
.onConflict(
this.dbClient.raw(
this.masterDbClient.raw(
'(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)'
)
)
@ -233,7 +236,7 @@ export class EventRepository implements IEventRepository {
public insertStubs(pubkey: string, eventIdsToDelete: EventId[]): Promise<number> {
debug('inserting stubs for %s: %o', pubkey, eventIdsToDelete)
const date = new Date()
return this.dbClient('events').insert(
return this.masterDbClient('events').insert(
eventIdsToDelete.map(
applySpec({
event_id: pipe(identity, toBuffer),
@ -256,12 +259,12 @@ export class EventRepository implements IEventRepository {
public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise<number> {
debug('deleting events from %s: %o', pubkey, eventIdsToDelete)
return this.dbClient('events')
return this.masterDbClient('events')
.where('event_pubkey', toBuffer(pubkey))
.whereIn('event_id', map(toBuffer)(eventIdsToDelete))
.whereNull('deleted_at')
.update({
deleted_at: this.dbClient.raw('now()'),
deleted_at: this.masterDbClient.raw('now()'),
})
}
}

View File

@ -18,7 +18,7 @@ import { CacheClient } from '../../../src/@types/cache'
import { DatabaseClient } from '../../../src/@types/base'
import { Event } from '../../../src/@types/event'
import { getCacheClient } from '../../../src/cache/client'
import { getDbClient } from '../../../src/database/client'
import { getMasterDbClient } from '../../../src/database/client'
import { SettingsStatic } from '../../../src/utils/settings'
import { workerFactory } from '../../../src/factories/worker-factory'
@ -34,7 +34,7 @@ export const streams = new WeakMap<WebSocket, Observable<unknown>>()
BeforeAll({ timeout: 1000 }, async function () {
process.env.RELAY_PORT = '18808'
cacheClient = getCacheClient()
dbClient = getDbClient()
dbClient = getMasterDbClient()
await dbClient.raw('SELECT 1=1')
await cacheClient.connect()
await cacheClient.ping()
@ -72,7 +72,7 @@ After(async function () {
}
this.parameters.clients = {}
const dbClient = getDbClient()
const dbClient = getMasterDbClient()
await dbClient('events')
.where({

View File

@ -7,14 +7,17 @@ import { AppWorker } from '../../../src/app/worker'
import { workerFactory } from '../../../src/factories/worker-factory'
describe('workerFactory', () => {
let getDbClientStub: Sinon.SinonStub
let getMasterDbClientStub: Sinon.SinonStub
let getReadReplicaDbClientStub: Sinon.SinonStub
beforeEach(() => {
getDbClientStub = Sinon.stub(databaseClientModule, 'getDbClient')
getMasterDbClientStub = Sinon.stub(databaseClientModule, 'getMasterDbClient')
getReadReplicaDbClientStub = Sinon.stub(databaseClientModule, 'getReadReplicaDbClient')
})
afterEach(() => {
getDbClientStub.restore()
getMasterDbClientStub.restore()
getReadReplicaDbClientStub.restore()
})
it('returns an AppWorker', () => {

View File

@ -39,8 +39,9 @@ describe('DefaultEventStrategy', () => {
webSocket = {
emit: webSocketEmitStub,
} as any
const client: DatabaseClient = {} as any
eventRepository = new EventRepository(client)
const masterClient: DatabaseClient = {} as any
const readReplicaClient: DatabaseClient = {} as any
eventRepository = new EventRepository(masterClient, readReplicaClient)
strategy = new DefaultEventStrategy(webSocket, eventRepository)
})

View File

@ -49,8 +49,9 @@ describe('DeleteEventStrategy', () => {
webSocket = {
emit: webSocketEmitStub,
} as any
const client: DatabaseClient = {} as any
eventRepository = new EventRepository(client)
const masterClient: DatabaseClient = {} as any
const readReplicaClient: DatabaseClient = {} as any
eventRepository = new EventRepository(masterClient, readReplicaClient)
strategy = new DeleteEventStrategy(webSocket, eventRepository)
})

View File

@ -43,8 +43,9 @@ describe('ParameterizedReplaceableEventStrategy', () => {
webSocket = {
emit: webSocketEmitStub,
} as any
const client: DatabaseClient = {} as any
eventRepository = new EventRepository(client)
const masterClient: DatabaseClient = {} as any
const readReplicaClient: DatabaseClient = {} as any
eventRepository = new EventRepository(masterClient, readReplicaClient)
strategy = new ParameterizedReplaceableEventStrategy(webSocket, eventRepository)
})

View File

@ -39,8 +39,9 @@ describe('ReplaceableEventStrategy', () => {
webSocket = {
emit: webSocketEmitStub,
} as any
const client: DatabaseClient = {} as any
eventRepository = new EventRepository(client)
const masterClient: DatabaseClient = {} as any
const readReplicaClient: DatabaseClient = {} as any
eventRepository = new EventRepository(masterClient, readReplicaClient)
strategy = new ReplaceableEventStrategy(webSocket, eventRepository)
})

View File

@ -19,6 +19,7 @@ describe('EventRepository', () => {
let repository: IEventRepository
let sandbox: sinon.SinonSandbox
let dbClient: DatabaseClient
let rrDbClient: DatabaseClient
beforeEach(() => {
sandbox = sinon.createSandbox()
@ -26,8 +27,11 @@ describe('EventRepository', () => {
dbClient = knex({
client: 'pg',
})
rrDbClient = knex({
client: 'pg',
})
repository = new EventRepository(dbClient)
repository = new EventRepository(dbClient, rrDbClient)
})
afterEach(() => {