diff --git a/relay.go b/relay.go index e1ed345..2b4ce47 100644 --- a/relay.go +++ b/relay.go @@ -125,7 +125,7 @@ func (r *Relay) Connect() error { } // check if the event matches the desired filter, ignore otherwise - if !subscription.filters.Match(&event) { + if !subscription.Filters.Match(&event) { continue } @@ -212,24 +212,30 @@ func (r *Relay) Subscribe(filters Filters) *Subscription { panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) } + sub := r.PrepareSubscription() + sub.Filters = filters + sub.Fire() + return sub +} + +func (r *Relay) PrepareSubscription() *Subscription { random := make([]byte, 7) rand.Read(random) id := hex.EncodeToString(random) - return r.subscribe(id, filters) + + return r.prepareSubscription(id) } -func (r *Relay) subscribe(id string, filters Filters) *Subscription { - sub := Subscription{ +func (r *Relay) prepareSubscription(id string) *Subscription { + sub := &Subscription{ conn: r.Connection, id: id, Events: make(chan Event), EndOfStoredEvents: make(chan struct{}, 1), } - r.subscriptions.Store(sub.id, &sub) - - sub.Sub(filters) - return &sub + r.subscriptions.Store(sub.id, sub) + return sub } func (r *Relay) Close() error { diff --git a/relaypool.go b/relaypool.go index daea525..3c4edd4 100644 --- a/relaypool.go +++ b/relaypool.go @@ -78,7 +78,8 @@ func (r *RelayPool) Add(url string, policy RelayPoolPolicy) chan error { r.Relays.Store(relay.URL, relay) r.subscriptions.Range(func(id string, filters Filters) bool { - sub := relay.subscribe(id, filters) + sub := relay.prepareSubscription(id) + sub.Sub(filters) eventStream, _ := r.eventStreams.Load(id) go func(sub *Subscription) { @@ -119,7 +120,8 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) { r.eventStreams.Store(id, eventStream) r.Relays.Range(func(_ string, relay *Relay) bool { - sub := relay.subscribe(id, filters) + sub := relay.prepareSubscription(id) + sub.Sub(filters) go func(sub *Subscription) { for evt := range sub.Events { diff --git a/subscription.go b/subscription.go index 402b0f6..ad4dd83 100644 --- a/subscription.go +++ b/subscription.go @@ -6,7 +6,7 @@ type Subscription struct { id string conn *Connection - filters Filters + Filters Filters Events chan Event EndOfStoredEvents chan struct{} @@ -19,10 +19,6 @@ type EventMessage struct { Relay string } -func (sub Subscription) GetFilters() Filters { - return sub.filters -} - func (sub Subscription) Unsub() { sub.conn.WriteJSON([]interface{}{"CLOSE", sub.id}) @@ -33,10 +29,13 @@ func (sub Subscription) Unsub() { } func (sub *Subscription) Sub(filters Filters) { - sub.filters = filters + sub.Filters = filters + sub.Fire() +} +func (sub *Subscription) Fire() { message := []interface{}{"REQ", sub.id} - for _, filter := range sub.filters { + for _, filter := range sub.Filters { message = append(message, filter) }