From 54d3de49084a9f4c45858d0c79eeb5349496530c Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sun, 25 Jun 2023 00:17:39 -0300 Subject: [PATCH] refactor some things, add back one mutex for each subscription for dispatching events, but in a cleaner way. --- relay.go | 57 ++++++++++++++++++------------------------------- subscription.go | 54 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 65 insertions(+), 46 deletions(-) diff --git a/relay.go b/relay.go index fb302b4..be8d097 100644 --- a/relay.go +++ b/relay.go @@ -186,9 +186,6 @@ func (r *Relay) Connect(ctx context.Context) error { // ping every 29 seconds ticker := time.NewTicker(29 * time.Second) - // this ensures we don't send an event to the Events channel after closing it - eventsChannelCloserMutex := &sync.Mutex{} - // to be used when the connection is closed go func() { <-r.connectionContext.Done() @@ -230,16 +227,6 @@ func (r *Relay) Connect(ctx context.Context) error { } }() - // every time a subscription ends we use this queue to close its .Events channel - go func() { - for toClose := range r.subscriptionChannelCloseQueue { - eventsChannelCloserMutex.Lock() - close(toClose.Events) - toClose.Events = make(chan *Event) - eventsChannelCloserMutex.Unlock() - } - }() - // general message reader loop go func() { for { @@ -298,19 +285,19 @@ func (r *Relay) Connect(ctx context.Context) error { } } - go func() { - eventsChannelCloserMutex.Lock() - if subscription.live { - subscription.Events <- &env.Event - } - eventsChannelCloserMutex.Unlock() - }() + // dispatch this to the internal .events channel of the subscription + subscription.events <- &env.Event } case *EOSEEnvelope: if subscription, ok := r.Subscriptions.Load(string(*env)); ok { - subscription.emitEose.Do(func() { - close(subscription.EndOfStoredEvents) - }) + // implementation adapted from the naïve/incorrect implementation of sync.Once + // (which is ok for this use case) + if subscription.eosed.CompareAndSwap(false, true) { + go func() { + time.Sleep(time.Millisecond) // this basically ensures the EndOfStoredEvents call happens after the last EVENT + close(subscription.EndOfStoredEvents) + }() + } } case *OKEnvelope: if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { @@ -470,13 +457,15 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . ctx, cancel := context.WithCancel(ctx) sub := &Subscription{ - Relay: r, - Context: ctx, - cancel: cancel, - counter: int(current), - Events: make(chan *Event), - EndOfStoredEvents: make(chan struct{}), - Filters: filters, + Relay: r, + Context: ctx, + cancel: cancel, + counter: int(current), + Events: make(chan *Event), + events: make(chan *Event), + EndOfStoredEvents: make(chan struct{}), + Filters: filters, + closeEventsChannel: make(chan struct{}), } for _, opt := range opts { @@ -487,14 +476,10 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . } id := sub.GetID() - r.Subscriptions.Store(id, sub) - // the subscription ends once the context is canceled - go func() { - <-sub.Context.Done() - sub.Unsub() - }() + // start handling events, eose, unsub etc: + go sub.start() return sub } diff --git a/subscription.go b/subscription.go index 0726bdd..48f86ae 100644 --- a/subscription.go +++ b/subscription.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "sync" + "sync/atomic" ) type Subscription struct { @@ -17,6 +18,7 @@ type Subscription struct { // the Events channel emits all EVENTs that come in a Subscription // will be closed when the subscription ends Events chan *Event + events chan *Event // underlines the above, this one is never closed // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription EndOfStoredEvents chan struct{} @@ -24,9 +26,10 @@ type Subscription struct { // Context will be .Done() when the subscription ends Context context.Context - live bool - cancel context.CancelFunc - emitEose sync.Once + live atomic.Bool + eosed atomic.Bool + closeEventsChannel chan struct{} + cancel context.CancelFunc } type EventMessage struct { @@ -54,17 +57,48 @@ func (sub *Subscription) GetID() string { return sub.label + ":" + strconv.Itoa(sub.counter) } +func (sub *Subscription) start() { + var mu sync.Mutex + + for { + select { + case event := <-sub.events: + // this is guarded such that it will only fire until the .Events channel is closed + go func() { + mu.Lock() + if sub.live.Load() { + sub.Events <- event + } + mu.Unlock() + }() + case <-sub.Context.Done(): + // the subscription ends once the context is canceled + sub.Unsub() + return + case <-sub.closeEventsChannel: + // this is called only once on .Unsub() and closes the .Events channel + mu.Lock() + close(sub.Events) + mu.Unlock() + return + } + } +} + // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub() also closes the channel sub.Events and makes a new one. func (sub *Subscription) Unsub() { - go sub.Close() + sub.cancel() - sub.live = false - id := sub.GetID() - sub.Relay.Subscriptions.Delete(id) + // naïve sync.Once implementation: + if sub.live.CompareAndSwap(true, false) { + go sub.Close() + id := sub.GetID() + sub.Relay.Subscriptions.Delete(id) - // do this so we don't have the possibility of closing the Events channel and then trying to send to it - sub.Relay.subscriptionChannelCloseQueue <- sub + // do this so we don't have the possibility of closing the Events channel and then trying to send to it + close(sub.closeEventsChannel) + } } // Close just sends a CLOSE message. You probably want Unsub() instead. @@ -92,7 +126,7 @@ func (sub *Subscription) Fire() error { reqb, _ := ReqEnvelope{id, sub.Filters}.MarshalJSON() debugLog("{%s} sending %v", sub.Relay.URL, reqb) - sub.live = true + sub.live.Store(true) if err := <-sub.Relay.Write(reqb); err != nil { sub.cancel() return fmt.Errorf("failed to write: %w", err)