mirror of
https://github.com/Cameri/nostream.git
synced 2025-04-03 09:28:03 +02:00
feat: add redis-adapter
This commit is contained in:
parent
35915c3ecf
commit
15bf0cf6ee
@ -15,3 +15,10 @@ export type IWebSocketAdapter = EventEmitter & {
|
||||
getClientId(): string
|
||||
getSubscriptions(): Map<string, SubscriptionFilter[]>
|
||||
}
|
||||
|
||||
export interface ICacheAdapter {
|
||||
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
|
||||
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
|
||||
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>
|
||||
setKeyExpiry(key: string, expiry: number): Promise<void>
|
||||
}
|
||||
|
8
src/@types/cache.ts
Normal file
8
src/@types/cache.ts
Normal file
@ -0,0 +1,8 @@
|
||||
import {
|
||||
RedisClientType,
|
||||
RedisFunctions,
|
||||
RedisModules,
|
||||
RedisScripts,
|
||||
} from 'redis'
|
||||
|
||||
export type Cache = RedisClientType<RedisModules, RedisFunctions, RedisScripts>
|
80
src/adapters/redis-adapter.ts
Normal file
80
src/adapters/redis-adapter.ts
Normal file
@ -0,0 +1,80 @@
|
||||
import { Cache } from '../@types/cache'
|
||||
import { createLogger } from '../factories/logger-factory'
|
||||
import { ICacheAdapter } from '../@types/adapters'
|
||||
|
||||
const debug = createLogger('redis-adapter')
|
||||
|
||||
export class RedisAdapter implements ICacheAdapter {
|
||||
private connection: Promise<void>
|
||||
|
||||
public constructor(private readonly client: Cache) {
|
||||
this.connection = client.connect()
|
||||
|
||||
this.connection.catch((error) => this.onClientError(error))
|
||||
|
||||
this.client
|
||||
.on('connect', () => debug('connecting'))
|
||||
.on('ready', () => debug('connected'))
|
||||
.on('error', (error) => this.onClientError(error))
|
||||
.on('reconnecting', () => {
|
||||
debug('reconnecting')
|
||||
this.connection = new Promise((resolve, reject) => {
|
||||
const cleanup = () => {
|
||||
this.client.removeListener('ready', onReady)
|
||||
this.client.removeListener('error', onError)
|
||||
}
|
||||
|
||||
const onError = (error: Error) => {
|
||||
cleanup()
|
||||
reject(error)
|
||||
}
|
||||
|
||||
const onReady = () => {
|
||||
cleanup()
|
||||
resolve()
|
||||
}
|
||||
|
||||
this.client.once('ready', onReady)
|
||||
|
||||
this.client.once('error', onError)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
private onClientError(error: Error) {
|
||||
console.error('Unable to connect to Redis.', error)
|
||||
// throw error
|
||||
}
|
||||
|
||||
public async removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number> {
|
||||
await this.connection
|
||||
debug('remove %d..%d range from sorted set %s', min, max, key)
|
||||
return this.client.zRemRangeByScore(key, min, max)
|
||||
}
|
||||
|
||||
public async getRangeFromSortedSet(key: string, min: number, max: number): Promise<string[]> {
|
||||
await this.connection
|
||||
debug('get %d..%d range from sorted set %s', min, max, key)
|
||||
return this.client.zRange(key, min, max)
|
||||
}
|
||||
|
||||
public async setKeyExpiry(key: string, expiry: number): Promise<void> {
|
||||
await this.connection
|
||||
debug('expire at %d from sorted set %s', expiry, key)
|
||||
await this.client.expire(key, expiry)
|
||||
}
|
||||
|
||||
public async addToSortedSet(
|
||||
key: string,
|
||||
set: Record<string, string>
|
||||
): Promise<number> {
|
||||
await this.connection
|
||||
debug('add %o to sorted set %s', set, key)
|
||||
const members = Object
|
||||
.entries(set)
|
||||
.map(([value, score]) => ({ score: Number(score), value }))
|
||||
|
||||
return this.client.zAdd(key, members)
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user