fix atrocious bug on pool.subMany(): we were missing events because of a badly designed select{}

This commit is contained in:
fiatjaf
2023-12-01 20:49:45 -03:00
parent 2969449fb3
commit c606a43c07

40
pool.go
View File

@@ -72,14 +72,15 @@ func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, fil
}
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining
events := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[Timestamp]()
ticker := time.NewTicker(seenAlreadyDropTick)
eose := false
pending := xsync.NewCounter()
initial := len(urls)
pending.Add(int64(initial))
pending.Add(int64(len(urls)))
for _, url := range urls {
go func(nm string) {
relay, err := pool.EnsureRelay(nm)
@@ -92,23 +93,38 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
return
}
for evt := range sub.Events {
if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
continue
}
defer func() {
pending.Dec()
if pending.Value() == 0 {
close(events)
}
cancel()
}()
for {
select {
case evt, more := <-sub.Events:
if !more {
return
}
if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
continue
}
}
select {
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
}
case <-sub.EndOfStoredEvents:
eose = true
case <-ticker.C:
if eose {
del := map[string]struct{}{}
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(key string, value Timestamp) bool {
seenAlready.Range(func(id string, value Timestamp) bool {
if value < old {
del[evt.ID] = struct{}{}
del[id] = struct{}{}
}
return true
})
@@ -116,7 +132,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
seenAlready.Delete(k)
}
}
case events <- IncomingEvent{Event: evt, Relay: relay}:
case reason := <-sub.ClosedReason:
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
return
@@ -124,11 +139,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
return
}
}
pending.Dec()
if pending.Value() == 0 {
close(events)
}
}(NormalizeURL(url))
}