chore: refactor abortable requests

Signed-off-by: Ricardo Arturo Cabral Mejía <me@ricardocabral.io>
This commit is contained in:
Ricardo Arturo Cabral Mejía 2023-01-17 18:50:01 -05:00
parent a519f4f36d
commit 4f62475a1a

View File

@ -1,4 +1,5 @@
import { anyPass, equals, isNil, map, propSatisfies, uniqWith } from 'ramda'
import { addAbortSignal } from 'stream'
import { pipeline } from 'stream/promises'
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
@ -57,17 +58,16 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
const findEvents = this.eventRepository.findByFilters(filters).stream()
const abortableFintEvents = addAbortSignal(this.abortController.signal, findEvents)
try {
await pipeline(
findEvents,
abortableFintEvents,
streamFilter(propSatisfies(isNil, 'deleted_at')),
streamMap(toNostrEvent),
streamFilter(isSubscribedToEvent),
streamEach(sendEvent),
streamEnd(sendEOSE),
{
signal: this.abortController.signal,
}
)
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
@ -92,14 +92,14 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
return `Duplicate subscription ${subscriptionId}: Ignorning`
}
const maxSubscriptions = this.settings().limits.client.subscription.maxSubscriptions
const maxSubscriptions = this.settings().limits?.client?.subscription?.maxSubscriptions ?? 0
if (maxSubscriptions > 0
&& !existingSubscription?.length && subscriptions.size + 1 > maxSubscriptions
) {
return `Too many subscriptions: Number of subscriptions must be less than or equal to ${maxSubscriptions}`
}
const maxFilters = this.settings().limits.client.subscription.maxFilters
const maxFilters = this.settings().limits?.client?.subscription?.maxFilters ?? 0
if (maxFilters > 0) {
if (filters.length > maxFilters) {
return `Too many filters: Number of filters per susbscription must be less then or equal to ${maxFilters}`