From 1f605f3629d16c9be0a1671e34d6bc61c9183842 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 4 Sep 2023 08:57:51 -0300 Subject: [PATCH] remove intermediary .events channel and hacky ms sleep. --- relay.go | 6 +---- subscription.go | 64 +++++++++++++++++++++++-------------------------- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/relay.go b/relay.go index ba42a53..0c506a9 100644 --- a/relay.go +++ b/relay.go @@ -292,10 +292,7 @@ func (r *Relay) Connect(ctx context.Context) error { } // dispatch this to the internal .events channel of the subscription - select { - case subscription.events <- &env.Event: - case <-subscription.Context.Done(): - } + subscription.dispatchEvent(&env.Event) } case *EOSEEnvelope: if subscription, ok := r.Subscriptions.Load(string(*env)); ok { @@ -486,7 +483,6 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . cancel: cancel, counter: int(current), Events: make(chan *Event), - events: make(chan *Event), EndOfStoredEvents: make(chan struct{}), Filters: filters, } diff --git a/subscription.go b/subscription.go index 79f0e71..9b25814 100644 --- a/subscription.go +++ b/subscription.go @@ -6,7 +6,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" ) type Subscription struct { @@ -22,7 +21,7 @@ type Subscription struct { // the Events channel emits all EVENTs that come in a Subscription // will be closed when the subscription ends Events chan *Event - events chan *Event // underlines the above, this one is never closed + mu sync.Mutex // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription EndOfStoredEvents chan struct{} @@ -65,44 +64,41 @@ func (sub *Subscription) GetID() string { } func (sub *Subscription) start() { - var mu sync.Mutex + <-sub.Context.Done() + // the subscription ends once the context is canceled (if not already) + sub.Unsub() // this will set sub.live to false - for { - select { - case event := <-sub.events: - // this is guarded such that it will only fire until the .Events channel is closed - go func() { - if !sub.eosed.Load() { - sub.storedwg.Add(1) - defer sub.storedwg.Done() - } + // do this so we don't have the possibility of closing the Events channel and then trying to send to it + sub.mu.Lock() + close(sub.Events) + sub.mu.Unlock() +} - mu.Lock() - defer mu.Unlock() - - if sub.live.Load() { - select { - case sub.Events <- event: - case <-sub.Context.Done(): - } - } - }() - case <-sub.Context.Done(): - // the subscription ends once the context is canceled (if not already) - sub.Unsub() // this will set sub.live to false - - // do this so we don't have the possibility of closing the Events channel and then trying to send to it - mu.Lock() - close(sub.Events) - mu.Unlock() - - return - } +func (sub *Subscription) dispatchEvent(evt *Event) { + added := false + if !sub.eosed.Load() { + sub.storedwg.Add(1) + added = true } + + go func() { + sub.mu.Lock() + defer sub.mu.Unlock() + + if sub.live.Load() { + select { + case sub.Events <- evt: + case <-sub.Context.Done(): + } + } + + if added { + sub.storedwg.Done() + } + }() } func (sub *Subscription) dispatchEose() { - time.Sleep(time.Millisecond) if sub.eosed.CompareAndSwap(false, true) { go func() { sub.storedwg.Wait()