remove intermediary .events channel and hacky ms sleep.

This commit is contained in:
fiatjaf 2023-09-04 08:57:51 -03:00
parent 6d1875de46
commit 1f605f3629
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
2 changed files with 31 additions and 39 deletions

View File

@ -292,10 +292,7 @@ func (r *Relay) Connect(ctx context.Context) error {
} }
// dispatch this to the internal .events channel of the subscription // dispatch this to the internal .events channel of the subscription
select { subscription.dispatchEvent(&env.Event)
case subscription.events <- &env.Event:
case <-subscription.Context.Done():
}
} }
case *EOSEEnvelope: case *EOSEEnvelope:
if subscription, ok := r.Subscriptions.Load(string(*env)); ok { if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
@ -486,7 +483,6 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
cancel: cancel, cancel: cancel,
counter: int(current), counter: int(current),
Events: make(chan *Event), Events: make(chan *Event),
events: make(chan *Event),
EndOfStoredEvents: make(chan struct{}), EndOfStoredEvents: make(chan struct{}),
Filters: filters, Filters: filters,
} }

View File

@ -6,7 +6,6 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
type Subscription struct { type Subscription struct {
@ -22,7 +21,7 @@ type Subscription struct {
// the Events channel emits all EVENTs that come in a Subscription // the Events channel emits all EVENTs that come in a Subscription
// will be closed when the subscription ends // will be closed when the subscription ends
Events chan *Event Events chan *Event
events chan *Event // underlines the above, this one is never closed mu sync.Mutex
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
EndOfStoredEvents chan struct{} EndOfStoredEvents chan struct{}
@ -65,44 +64,41 @@ func (sub *Subscription) GetID() string {
} }
func (sub *Subscription) start() { func (sub *Subscription) start() {
var mu sync.Mutex <-sub.Context.Done()
// the subscription ends once the context is canceled (if not already)
sub.Unsub() // this will set sub.live to false
for { // do this so we don't have the possibility of closing the Events channel and then trying to send to it
select { sub.mu.Lock()
case event := <-sub.events: close(sub.Events)
// this is guarded such that it will only fire until the .Events channel is closed sub.mu.Unlock()
go func() { }
if !sub.eosed.Load() {
sub.storedwg.Add(1)
defer sub.storedwg.Done()
}
mu.Lock() func (sub *Subscription) dispatchEvent(evt *Event) {
defer mu.Unlock() added := false
if !sub.eosed.Load() {
if sub.live.Load() { sub.storedwg.Add(1)
select { added = true
case sub.Events <- event:
case <-sub.Context.Done():
}
}
}()
case <-sub.Context.Done():
// the subscription ends once the context is canceled (if not already)
sub.Unsub() // this will set sub.live to false
// do this so we don't have the possibility of closing the Events channel and then trying to send to it
mu.Lock()
close(sub.Events)
mu.Unlock()
return
}
} }
go func() {
sub.mu.Lock()
defer sub.mu.Unlock()
if sub.live.Load() {
select {
case sub.Events <- evt:
case <-sub.Context.Done():
}
}
if added {
sub.storedwg.Done()
}
}()
} }
func (sub *Subscription) dispatchEose() { func (sub *Subscription) dispatchEose() {
time.Sleep(time.Millisecond)
if sub.eosed.CompareAndSwap(false, true) { if sub.eosed.CompareAndSwap(false, true) {
go func() { go func() {
sub.storedwg.Wait() sub.storedwg.Wait()