From e45921c11a417422120b657fce2787efff212d00 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 2 Oct 2023 14:16:16 -0300 Subject: [PATCH] pool.SubMany(Eose)NonUnique() --- pool.go | 54 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/pool.go b/pool.go index 59f5fcd..98f49f2 100644 --- a/pool.go +++ b/pool.go @@ -57,8 +57,17 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { - uniqueEvents := make(chan IncomingEvent) +func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { + return pool.subMany(ctx, urls, filters, true) +} + +// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays +func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { + return pool.subMany(ctx, urls, filters, false) +} + +func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { + events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() pending := xsync.NewCounter() @@ -77,27 +86,43 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt } for evt := range sub.Events { - // dispatch unique events to client - if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok { - uniqueEvents <- IncomingEvent{Event: evt, Relay: relay} + stop := true + if unique { + _, stop = seenAlready.LoadOrStore(evt.ID, true) + } + if !stop { + select { + case events <- IncomingEvent{Event: evt, Relay: relay}: + case <-ctx.Done(): + return + } } } pending.Dec() if pending.Value() == 0 { - close(uniqueEvents) + close(events) } }(NormalizeURL(url)) } - return uniqueEvents + return events } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { + return pool.subManyEose(ctx, urls, filters, true) +} + +// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays +func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { + return pool.subManyEose(ctx, urls, filters, false) +} + +func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { ctx, cancel := context.WithCancel(ctx) - uniqueEvents := make(chan IncomingEvent) + events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() wg := sync.WaitGroup{} wg.Add(len(urls)) @@ -106,7 +131,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters // this will happen when all subscriptions get an eose (or when they die) wg.Wait() cancel() - close(uniqueEvents) + close(events) }() for _, url := range urls { @@ -135,10 +160,13 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters return } - // dispatch unique events to client - if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok { + stop := true + if unique { + _, stop = seenAlready.LoadOrStore(evt.ID, true) + } + if !stop { select { - case uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}: + case events <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done(): return } @@ -148,7 +176,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters }(NormalizeURL(url)) } - return uniqueEvents + return events } // QuerySingle returns the first event returned by the first relay, cancels everything else.