mirror of
https://github.com/Cameri/nostream.git
synced 2025-03-17 21:31:48 +01:00
chore: disable aborting queries
This commit is contained in:
parent
9021aa6b11
commit
e89ec19604
@ -1,5 +1,5 @@
|
||||
import { anyPass, equals, isNil, map, propSatisfies, uniqWith } from 'ramda'
|
||||
import { addAbortSignal } from 'stream'
|
||||
// import { addAbortSignal } from 'stream'
|
||||
import { pipeline } from 'stream/promises'
|
||||
|
||||
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
|
||||
@ -18,18 +18,18 @@ import { WebSocketAdapterEvent } from '../constants/adapter'
|
||||
const debug = createLogger('subscribe-message-handler')
|
||||
|
||||
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
private readonly abortController: AbortController
|
||||
//private readonly abortController: AbortController
|
||||
|
||||
public constructor(
|
||||
private readonly webSocket: IWebSocketAdapter,
|
||||
private readonly eventRepository: IEventRepository,
|
||||
private readonly settings: () => Settings,
|
||||
) {
|
||||
this.abortController = new AbortController()
|
||||
//this.abortController = new AbortController()
|
||||
}
|
||||
|
||||
public abort(): void {
|
||||
this.abortController.abort()
|
||||
//this.abortController.abort()
|
||||
}
|
||||
|
||||
public async handleMessage(message: SubscribeMessage): Promise<void> {
|
||||
@ -58,11 +58,11 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
|
||||
const findEvents = this.eventRepository.findByFilters(filters).stream()
|
||||
|
||||
const abortableFindEvents = addAbortSignal(this.abortController.signal, findEvents)
|
||||
// const abortableFindEvents = addAbortSignal(this.abortController.signal, findEvents)
|
||||
|
||||
try {
|
||||
await pipeline(
|
||||
abortableFindEvents,
|
||||
findEvents,
|
||||
streamFilter(propSatisfies(isNil, 'deleted_at')),
|
||||
streamMap(toNostrEvent),
|
||||
streamFilter(isSubscribedToEvent),
|
||||
@ -72,7 +72,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name === 'AbortError') {
|
||||
debug('subscription %s aborted: %o', subscriptionId, error)
|
||||
findEvents.destroy()
|
||||
findEvents.destroy()
|
||||
} else {
|
||||
debug('error streaming events: %o', error)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user