diff --git a/pool.go b/pool.go index d0c1b46..f10ea7c 100644 --- a/pool.go +++ b/pool.go @@ -72,14 +72,15 @@ func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, fil } func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { + ctx, cancel := context.WithCancel(ctx) + _ = cancel // do this so `go vet` will stop complaining events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[Timestamp]() ticker := time.NewTicker(seenAlreadyDropTick) eose := false pending := xsync.NewCounter() - initial := len(urls) - pending.Add(int64(initial)) + pending.Add(int64(len(urls))) for _, url := range urls { go func(nm string) { relay, err := pool.EnsureRelay(nm) @@ -92,23 +93,38 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt return } - for evt := range sub.Events { - if unique { - if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { - continue - } + defer func() { + pending.Dec() + if pending.Value() == 0 { + close(events) } + cancel() + }() + for { select { + case evt, more := <-sub.Events: + if !more { + return + } + if unique { + if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { + continue + } + } + select { + case events <- IncomingEvent{Event: evt, Relay: relay}: + case <-ctx.Done(): + } case <-sub.EndOfStoredEvents: eose = true case <-ticker.C: if eose { del := map[string]struct{}{} old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) - seenAlready.Range(func(key string, value Timestamp) bool { + seenAlready.Range(func(id string, value Timestamp) bool { if value < old { - del[evt.ID] = struct{}{} + del[id] = struct{}{} } return true }) @@ -116,7 +132,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt seenAlready.Delete(k) } } - case events <- IncomingEvent{Event: evt, Relay: relay}: case reason := <-sub.ClosedReason: log.Printf("CLOSED from %s: '%s'\n", nm, reason) return @@ -124,11 +139,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt return } } - - pending.Dec() - if pending.Value() == 0 { - close(events) - } }(NormalizeURL(url)) }