fix unique logic (move it to a helper function, we can't return two channels because that will break if the caller doesn't read from both.)

This commit is contained in:
fiatjaf
2022-11-15 16:40:17 -03:00
parent 2ec7957409
commit 14e81a756a

View File

@@ -60,33 +60,41 @@ func NewRelayPool() *RelayPool {
// Add adds a new relay to the pool, if policy is nil, it will be a simple // Add adds a new relay to the pool, if policy is nil, it will be a simple
// read+write policy. // read+write policy.
func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error { func (r *RelayPool) Add(url string, policy RelayPoolPolicy) chan error {
if policy == nil { if policy == nil {
policy = SimplePolicy{Read: true, Write: true} policy = SimplePolicy{Read: true, Write: true}
} }
relay, err := RelayConnect(url) cherr := make(chan error)
if err != nil {
return err
}
r.Policies.Store(relay.URL, policy) go func() {
r.Relays.Store(relay.URL, relay) relay, err := RelayConnect(url)
if err != nil {
cherr <- fmt.Errorf("failed to connect to %s: %w", url, err)
return
}
r.subscriptions.Range(func(id string, filters Filters) bool { r.Policies.Store(relay.URL, policy)
sub := relay.subscribe(id, filters) r.Relays.Store(relay.URL, relay)
eventStream, _ := r.eventStreams.Load(id)
go func(sub *Subscription) { r.subscriptions.Range(func(id string, filters Filters) bool {
for evt := range sub.Events { sub := relay.subscribe(id, filters)
eventStream <- EventMessage{Relay: relay.URL, Event: evt} eventStream, _ := r.eventStreams.Load(id)
}
}(sub)
return true go func(sub *Subscription) {
}) for evt := range sub.Events {
eventStream <- EventMessage{Relay: relay.URL, Event: evt}
}
}(sub)
return nil return true
})
cherr <- nil
close(cherr)
}()
return cherr
} }
// Remove removes a relay from the pool. // Remove removes a relay from the pool.
@@ -101,7 +109,7 @@ func (r *RelayPool) Remove(url string) {
} }
} }
func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) { func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
random := make([]byte, 7) random := make([]byte, 7)
rand.Read(random) rand.Read(random)
id := hex.EncodeToString(random) id := hex.EncodeToString(random)
@@ -109,8 +117,6 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event)
r.subscriptions.Store(id, filters) r.subscriptions.Store(id, filters)
eventStream := make(chan EventMessage) eventStream := make(chan EventMessage)
r.eventStreams.Store(id, eventStream) r.eventStreams.Store(id, eventStream)
uniqueEvents := make(chan Event)
emittedAlready := s.MapOf[string, struct{}]{}
r.Relays.Range(func(_ string, relay *Relay) bool { r.Relays.Range(func(_ string, relay *Relay) bool {
sub := relay.subscribe(id, filters) sub := relay.subscribe(id, filters)
@@ -118,16 +124,28 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event)
go func(sub *Subscription) { go func(sub *Subscription) {
for evt := range sub.Events { for evt := range sub.Events {
eventStream <- EventMessage{Relay: relay.URL, Event: evt} eventStream <- EventMessage{Relay: relay.URL, Event: evt}
if _, ok := emittedAlready.LoadOrStore(evt.ID, struct{}{}); !ok {
uniqueEvents <- evt
}
} }
}(sub) }(sub)
return true return true
}) })
return id, eventStream, uniqueEvents return id, eventStream
}
func Unique(all chan EventMessage) chan Event {
uniqueEvents := make(chan Event)
emittedAlready := s.MapOf[string, struct{}]{}
go func() {
for eventMessage := range all {
if _, ok := emittedAlready.LoadOrStore(eventMessage.Event.ID, struct{}{}); !ok {
uniqueEvents <- eventMessage.Event
}
}
}()
return uniqueEvents
} }
func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) { func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {