From 446b104990c90879450c75f50e882f507f32bfcd Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 22 Aug 2023 10:41:58 -0300 Subject: [PATCH] simplify subscription closing. --- relay.go | 17 ++++++++--------- subscription.go | 30 ++++++++++++++---------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/relay.go b/relay.go index cb4c2b8..7e918c1 100644 --- a/relay.go +++ b/relay.go @@ -482,15 +482,14 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . ctx, cancel := context.WithCancel(ctx) sub := &Subscription{ - Relay: r, - Context: ctx, - cancel: cancel, - counter: int(current), - Events: make(chan *Event), - events: make(chan *Event), - EndOfStoredEvents: make(chan struct{}), - Filters: filters, - closeEventsChannel: make(chan struct{}), + Relay: r, + Context: ctx, + cancel: cancel, + counter: int(current), + Events: make(chan *Event), + events: make(chan *Event), + EndOfStoredEvents: make(chan struct{}), + Filters: filters, } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index 12026b8..1d33799 100644 --- a/subscription.go +++ b/subscription.go @@ -29,10 +29,9 @@ type Subscription struct { // Context will be .Done() when the subscription ends Context context.Context - live atomic.Bool - eosed atomic.Bool - closeEventsChannel chan struct{} - cancel context.CancelFunc + live atomic.Bool + eosed atomic.Bool + cancel context.CancelFunc } type EventMessage struct { @@ -78,14 +77,14 @@ func (sub *Subscription) start() { mu.Unlock() }() case <-sub.Context.Done(): - // the subscription ends once the context is canceled - sub.Unsub() - return - case <-sub.closeEventsChannel: - // this is called only once on .Unsub() and closes the .Events channel + // the subscription ends once the context is canceled (if not already) + sub.Unsub() // this will set sub.live to false + + // do this so we don't have the possibility of closing the Events channel and then trying to send to it mu.Lock() close(sub.Events) mu.Unlock() + return } } @@ -94,17 +93,16 @@ func (sub *Subscription) start() { // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub() also closes the channel sub.Events and makes a new one. func (sub *Subscription) Unsub() { + // cancel the context (if it's not canceled already) sub.cancel() - // naïve sync.Once implementation: + // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) if sub.live.CompareAndSwap(true, false) { - go sub.Close() - id := sub.GetID() - sub.Relay.Subscriptions.Delete(id) - - // do this so we don't have the possibility of closing the Events channel and then trying to send to it - close(sub.closeEventsChannel) + sub.Close() } + + // remove subscription from our map + sub.Relay.Subscriptions.Delete(sub.GetID()) } // Close just sends a CLOSE message. You probably want Unsub() instead.