diff --git a/README.md b/README.md index 4950386..5a01682 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ for notice := range pool.Notices { ### Listening for events ```go -sub := pool.Sub(nostr.Filters{ +subId, allEvents, uniqueEvents := pool.Sub(nostr.Filters{ { Authors: []string{"0ded86bf80c76847320b16f22b7451c08169434837a51ad5fe3b178af6c35f5d"}, Kinds: []int{nostr.KindTextNote}, // or {1} @@ -32,7 +32,7 @@ sub := pool.Sub(nostr.Filters{ }) go func() { - for event := range sub.UniqueEvents { + for event := range uniqueEvents { log.Print(event) } }() diff --git a/relaypool.go b/relaypool.go index 99c3ec9..b615dc5 100644 --- a/relaypool.go +++ b/relaypool.go @@ -97,7 +97,7 @@ func (r *RelayPool) Remove(url string) { } } -func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { +func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) { random := make([]byte, 7) rand.Read(random) id := hex.EncodeToString(random) @@ -105,6 +105,8 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { r.subscriptions.Store(id, filters) eventStream := make(chan EventMessage) r.eventStreams.Store(id, eventStream) + uniqueEvents := make(chan Event) + emittedAlready := s.MapOf[string, struct{}]{} r.Relays.Range(func(_ string, relay *Relay) bool { sub := relay.subscribe(id, filters) @@ -112,13 +114,16 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { go func(sub *Subscription) { for evt := range sub.Events { eventStream <- EventMessage{Relay: relay.URL, Event: evt} + if _, ok := emittedAlready.LoadOrStore(evt.ID, struct{}{}); !ok { + uniqueEvents <- evt + } } }(sub) return true }) - return id, eventStream + return id, eventStream, uniqueEvents } func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {