diff --git a/relay.go b/relay.go index 85e8f12..ba42a53 100644 --- a/relay.go +++ b/relay.go @@ -299,14 +299,7 @@ func (r *Relay) Connect(ctx context.Context) error { } case *EOSEEnvelope: if subscription, ok := r.Subscriptions.Load(string(*env)); ok { - // implementation adapted from the naïve/incorrect implementation of sync.Once - // (which is ok for this use case) - if subscription.eosed.CompareAndSwap(false, true) { - go func() { - time.Sleep(time.Millisecond) // this basically ensures the EndOfStoredEvents call happens after the last EVENT - close(subscription.EndOfStoredEvents) - }() - } + subscription.dispatchEose() } case *CountEnvelope: if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { diff --git a/subscription.go b/subscription.go index 1d33799..79f0e71 100644 --- a/subscription.go +++ b/subscription.go @@ -6,6 +6,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" ) type Subscription struct { @@ -32,6 +33,10 @@ type Subscription struct { live atomic.Bool eosed atomic.Bool cancel context.CancelFunc + + // this keeps track of the events we've received before the EOSE that we must dispatch before + // closing the EndOfStoredEvents channel + storedwg sync.WaitGroup } type EventMessage struct { @@ -67,14 +72,20 @@ func (sub *Subscription) start() { case event := <-sub.events: // this is guarded such that it will only fire until the .Events channel is closed go func() { + if !sub.eosed.Load() { + sub.storedwg.Add(1) + defer sub.storedwg.Done() + } + mu.Lock() + defer mu.Unlock() + if sub.live.Load() { select { case sub.Events <- event: case <-sub.Context.Done(): } } - mu.Unlock() }() case <-sub.Context.Done(): // the subscription ends once the context is canceled (if not already) @@ -90,6 +101,16 @@ func (sub *Subscription) start() { } } +func (sub *Subscription) dispatchEose() { + time.Sleep(time.Millisecond) + if sub.eosed.CompareAndSwap(false, true) { + go func() { + sub.storedwg.Wait() + close(sub.EndOfStoredEvents) + }() + } +} + // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub() also closes the channel sub.Events and makes a new one. func (sub *Subscription) Unsub() {