diff --git a/relay.go b/relay.go index 5f6a391..11c3edd 100644 --- a/relay.go +++ b/relay.go @@ -38,7 +38,7 @@ type Relay struct { RequestHeader http.Header // e.g. for origin header Connection *Connection - subscriptions *xsync.MapOf[string, *Subscription] + Subscriptions *xsync.MapOf[string, *Subscription] Challenges chan string // NIP-42 Challenges Notices chan string @@ -59,7 +59,7 @@ func NewRelay(ctx context.Context, url string) *Relay { return &Relay{ URL: NormalizeURL(url), connectionContext: ctx, - subscriptions: xsync.NewMapOf[*Subscription](), + Subscriptions: xsync.NewMapOf[*Subscription](), okCallbacks: xsync.NewMapOf[func(bool, string)](), } } @@ -184,7 +184,7 @@ func (r *Relay) Connect(ctx context.Context) error { if env.SubscriptionID == nil { continue } - if subscription, ok := r.subscriptions.Load(*env.SubscriptionID); !ok { + if subscription, ok := r.Subscriptions.Load(*env.SubscriptionID); !ok { InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) continue } else { @@ -218,7 +218,7 @@ func (r *Relay) Connect(ctx context.Context) error { } case *EOSEEnvelope: debugLog("{%s} %v\n", r.URL, message) - if subscription, ok := r.subscriptions.Load(string(*env)); ok { + if subscription, ok := r.Subscriptions.Load(string(*env)); ok { subscription.emitEose.Do(func() { subscription.EndOfStoredEvents <- struct{}{} }) diff --git a/subscription.go b/subscription.go index 3df922b..3c8db63 100644 --- a/subscription.go +++ b/subscription.go @@ -47,10 +47,12 @@ func (sub *Subscription) Unsub() { sub.mutex.Lock() defer sub.mutex.Unlock() - closeMsg := CloseEnvelope(sub.GetID()) + id := sub.GetID() + closeMsg := CloseEnvelope(id) closeb, _ := (&closeMsg).MarshalJSON() debugLog("{%s} sending %v", sub.Relay.URL, closeb) sub.conn.WriteMessage(closeb) + sub.Relay.Subscriptions.Delete(id) if sub.stopped == false && sub.Events != nil { close(sub.Events) @@ -59,6 +61,7 @@ func (sub *Subscription) Unsub() { } // Sub sets sub.Filters and then calls sub.Fire(ctx). +// The subscription will be closed if the context expires. func (sub *Subscription) Sub(ctx context.Context, filters Filters) { sub.Filters = filters sub.Fire() @@ -66,9 +69,10 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) { // Fire sends the "REQ" command to the relay. func (sub *Subscription) Fire() error { - sub.Relay.subscriptions.Store(sub.GetID(), sub) + id := sub.GetID() + sub.Relay.Subscriptions.Store(id, sub) - reqb, _ := ReqEnvelope{sub.GetID(), sub.Filters}.MarshalJSON() + reqb, _ := ReqEnvelope{id, sub.Filters}.MarshalJSON() debugLog("{%s} sending %v", sub.Relay.URL, reqb) err := sub.conn.WriteMessage(reqb) if err != nil {