bring back unique events.

This commit is contained in:
fiatjaf
2022-11-13 19:33:48 -03:00
parent 2641327c28
commit 37235a1394
2 changed files with 9 additions and 4 deletions

View File

@@ -24,7 +24,7 @@ for notice := range pool.Notices {
### Listening for events ### Listening for events
```go ```go
sub := pool.Sub(nostr.Filters{ subId, allEvents, uniqueEvents := pool.Sub(nostr.Filters{
{ {
Authors: []string{"0ded86bf80c76847320b16f22b7451c08169434837a51ad5fe3b178af6c35f5d"}, Authors: []string{"0ded86bf80c76847320b16f22b7451c08169434837a51ad5fe3b178af6c35f5d"},
Kinds: []int{nostr.KindTextNote}, // or {1} Kinds: []int{nostr.KindTextNote}, // or {1}
@@ -32,7 +32,7 @@ sub := pool.Sub(nostr.Filters{
}) })
go func() { go func() {
for event := range sub.UniqueEvents { for event := range uniqueEvents {
log.Print(event) log.Print(event)
} }
}() }()

View File

@@ -97,7 +97,7 @@ func (r *RelayPool) Remove(url string) {
} }
} }
func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage, chan Event) {
random := make([]byte, 7) random := make([]byte, 7)
rand.Read(random) rand.Read(random)
id := hex.EncodeToString(random) id := hex.EncodeToString(random)
@@ -105,6 +105,8 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
r.subscriptions.Store(id, filters) r.subscriptions.Store(id, filters)
eventStream := make(chan EventMessage) eventStream := make(chan EventMessage)
r.eventStreams.Store(id, eventStream) r.eventStreams.Store(id, eventStream)
uniqueEvents := make(chan Event)
emittedAlready := s.MapOf[string, struct{}]{}
r.Relays.Range(func(_ string, relay *Relay) bool { r.Relays.Range(func(_ string, relay *Relay) bool {
sub := relay.subscribe(id, filters) sub := relay.subscribe(id, filters)
@@ -112,13 +114,16 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
go func(sub *Subscription) { go func(sub *Subscription) {
for evt := range sub.Events { for evt := range sub.Events {
eventStream <- EventMessage{Relay: relay.URL, Event: evt} eventStream <- EventMessage{Relay: relay.URL, Event: evt}
if _, ok := emittedAlready.LoadOrStore(evt.ID, struct{}{}); !ok {
uniqueEvents <- evt
}
} }
}(sub) }(sub)
return true return true
}) })
return id, eventStream return id, eventStream, uniqueEvents
} }
func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) { func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {