pool.SubManyNotifyEOSE()

This commit is contained in:
fiatjaf 2025-01-28 16:12:53 -03:00
parent 7eba27f026
commit edb2f782cf

41
pool.go
View File

@ -234,6 +234,28 @@ func (pool *SimplePool) SubMany(
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, nil, opts...)
}
// SubManyNotifyEOSE is like SubMany, but takes a channel that is closed when
// all subscriptions have received an EOSE
func (pool *SimplePool) SubManyNotifyEOSE(
ctx context.Context,
urls []string,
filters Filters,
eoseChan chan struct{},
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, eoseChan, opts...)
}
func (pool *SimplePool) subMany(
ctx context.Context,
urls []string,
filters Filters,
eoseChan chan struct{},
opts ...SubscriptionOption,
) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining
@ -242,6 +264,14 @@ func (pool *SimplePool) SubMany(
ticker := time.NewTicker(seenAlreadyDropTick)
eose := false
eoseWg := sync.WaitGroup{}
eoseWg.Add(len(urls))
if eoseChan != nil {
go func() {
eoseWg.Wait()
close(eoseChan)
}()
}
pending := xsync.NewCounter()
pending.Add(int64(len(urls)))
@ -250,6 +280,7 @@ func (pool *SimplePool) SubMany(
urls[i] = url
if idx := slices.Index(urls, url); idx != i {
// skip duplicate relays in the list
eoseWg.Done()
continue
}
@ -305,7 +336,12 @@ func (pool *SimplePool) SubMany(
go func() {
<-sub.EndOfStoredEvents
eose = true
// guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done()
if !eose {
eose = true
eoseWg.Done()
}
}()
// reset interval when we get a good subscription
@ -359,6 +395,9 @@ func (pool *SimplePool) SubMany(
} else {
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
}
eoseWg.Done()
return
case <-ctx.Done():
return