diff --git a/pool.go b/pool.go index 3f56f0b..05d10b7 100644 --- a/pool.go +++ b/pool.go @@ -79,15 +79,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt ticker := time.NewTicker(seenAlreadyDropTick) eose := false - updateSince := func() { - // After reconnection, update the since in the filter so that - // old events are not retrieved. - now := Now() - for i := range filters { - filters[i].Since = &now - } - } - pending := xsync.NewCounter() pending.Add(int64(len(urls))) for _, url := range urls { @@ -107,26 +98,25 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt default: } + var sub *Subscription + relay, err := pool.EnsureRelay(nm) if err != nil { time.Sleep(3 * time.Second) - updateSince() - continue + goto reconnect } - sub, err := relay.Subscribe(ctx, filters) + sub, err = relay.Subscribe(ctx, filters) if err != nil { time.Sleep(3 * time.Second) - updateSince() - continue + goto reconnect } - loop: for { select { case evt, more := <-sub.Events: if !more { - break loop + goto reconnect } if unique { if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { @@ -156,7 +146,13 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt return } } - updateSince() + + reconnect: + // when attempting to reconnect update the `since` in filters so old events are not retrieved + now := Now() + for i := range filters { + filters[i].Since = &now + } } }(NormalizeURL(url)) }