refactor some things, add back one mutex for each subscription for dispatching events, but in a cleaner way.

This commit is contained in:
fiatjaf 2023-06-25 00:17:39 -03:00
parent a7964ae675
commit 54d3de4908
2 changed files with 65 additions and 46 deletions

View File

@ -186,9 +186,6 @@ func (r *Relay) Connect(ctx context.Context) error {
// ping every 29 seconds // ping every 29 seconds
ticker := time.NewTicker(29 * time.Second) ticker := time.NewTicker(29 * time.Second)
// this ensures we don't send an event to the Events channel after closing it
eventsChannelCloserMutex := &sync.Mutex{}
// to be used when the connection is closed // to be used when the connection is closed
go func() { go func() {
<-r.connectionContext.Done() <-r.connectionContext.Done()
@ -230,16 +227,6 @@ func (r *Relay) Connect(ctx context.Context) error {
} }
}() }()
// every time a subscription ends we use this queue to close its .Events channel
go func() {
for toClose := range r.subscriptionChannelCloseQueue {
eventsChannelCloserMutex.Lock()
close(toClose.Events)
toClose.Events = make(chan *Event)
eventsChannelCloserMutex.Unlock()
}
}()
// general message reader loop // general message reader loop
go func() { go func() {
for { for {
@ -298,19 +285,19 @@ func (r *Relay) Connect(ctx context.Context) error {
} }
} }
go func() { // dispatch this to the internal .events channel of the subscription
eventsChannelCloserMutex.Lock() subscription.events <- &env.Event
if subscription.live {
subscription.Events <- &env.Event
}
eventsChannelCloserMutex.Unlock()
}()
} }
case *EOSEEnvelope: case *EOSEEnvelope:
if subscription, ok := r.Subscriptions.Load(string(*env)); ok { if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
subscription.emitEose.Do(func() { // implementation adapted from the naïve/incorrect implementation of sync.Once
close(subscription.EndOfStoredEvents) // (which is ok for this use case)
}) if subscription.eosed.CompareAndSwap(false, true) {
go func() {
time.Sleep(time.Millisecond) // this basically ensures the EndOfStoredEvents call happens after the last EVENT
close(subscription.EndOfStoredEvents)
}()
}
} }
case *OKEnvelope: case *OKEnvelope:
if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { if okCallback, exist := r.okCallbacks.Load(env.EventID); exist {
@ -470,13 +457,15 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
sub := &Subscription{ sub := &Subscription{
Relay: r, Relay: r,
Context: ctx, Context: ctx,
cancel: cancel, cancel: cancel,
counter: int(current), counter: int(current),
Events: make(chan *Event), Events: make(chan *Event),
EndOfStoredEvents: make(chan struct{}), events: make(chan *Event),
Filters: filters, EndOfStoredEvents: make(chan struct{}),
Filters: filters,
closeEventsChannel: make(chan struct{}),
} }
for _, opt := range opts { for _, opt := range opts {
@ -487,14 +476,10 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
} }
id := sub.GetID() id := sub.GetID()
r.Subscriptions.Store(id, sub) r.Subscriptions.Store(id, sub)
// the subscription ends once the context is canceled // start handling events, eose, unsub etc:
go func() { go sub.start()
<-sub.Context.Done()
sub.Unsub()
}()
return sub return sub
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
) )
type Subscription struct { type Subscription struct {
@ -17,6 +18,7 @@ type Subscription struct {
// the Events channel emits all EVENTs that come in a Subscription // the Events channel emits all EVENTs that come in a Subscription
// will be closed when the subscription ends // will be closed when the subscription ends
Events chan *Event Events chan *Event
events chan *Event // underlines the above, this one is never closed
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
EndOfStoredEvents chan struct{} EndOfStoredEvents chan struct{}
@ -24,9 +26,10 @@ type Subscription struct {
// Context will be .Done() when the subscription ends // Context will be .Done() when the subscription ends
Context context.Context Context context.Context
live bool live atomic.Bool
cancel context.CancelFunc eosed atomic.Bool
emitEose sync.Once closeEventsChannel chan struct{}
cancel context.CancelFunc
} }
type EventMessage struct { type EventMessage struct {
@ -54,17 +57,48 @@ func (sub *Subscription) GetID() string {
return sub.label + ":" + strconv.Itoa(sub.counter) return sub.label + ":" + strconv.Itoa(sub.counter)
} }
func (sub *Subscription) start() {
var mu sync.Mutex
for {
select {
case event := <-sub.events:
// this is guarded such that it will only fire until the .Events channel is closed
go func() {
mu.Lock()
if sub.live.Load() {
sub.Events <- event
}
mu.Unlock()
}()
case <-sub.Context.Done():
// the subscription ends once the context is canceled
sub.Unsub()
return
case <-sub.closeEventsChannel:
// this is called only once on .Unsub() and closes the .Events channel
mu.Lock()
close(sub.Events)
mu.Unlock()
return
}
}
}
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
// Unsub() also closes the channel sub.Events and makes a new one. // Unsub() also closes the channel sub.Events and makes a new one.
func (sub *Subscription) Unsub() { func (sub *Subscription) Unsub() {
go sub.Close() sub.cancel()
sub.live = false // naïve sync.Once implementation:
id := sub.GetID() if sub.live.CompareAndSwap(true, false) {
sub.Relay.Subscriptions.Delete(id) go sub.Close()
id := sub.GetID()
sub.Relay.Subscriptions.Delete(id)
// do this so we don't have the possibility of closing the Events channel and then trying to send to it // do this so we don't have the possibility of closing the Events channel and then trying to send to it
sub.Relay.subscriptionChannelCloseQueue <- sub close(sub.closeEventsChannel)
}
} }
// Close just sends a CLOSE message. You probably want Unsub() instead. // Close just sends a CLOSE message. You probably want Unsub() instead.
@ -92,7 +126,7 @@ func (sub *Subscription) Fire() error {
reqb, _ := ReqEnvelope{id, sub.Filters}.MarshalJSON() reqb, _ := ReqEnvelope{id, sub.Filters}.MarshalJSON()
debugLog("{%s} sending %v", sub.Relay.URL, reqb) debugLog("{%s} sending %v", sub.Relay.URL, reqb)
sub.live = true sub.live.Store(true)
if err := <-sub.Relay.Write(reqb); err != nil { if err := <-sub.Relay.Write(reqb); err != nil {
sub.cancel() sub.cancel()
return fmt.Errorf("failed to write: %w", err) return fmt.Errorf("failed to write: %w", err)