From 0765f7b91b64727ec715ac129afee2cad6838931 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 16 Mar 2023 14:15:16 -0300 Subject: [PATCH] context cancelation for relay connections and subscriptions. --- relay.go | 15 +++++++++++---- subscription.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/relay.go b/relay.go index 228d23f..3a08caa 100644 --- a/relay.go +++ b/relay.go @@ -43,9 +43,10 @@ type Relay struct { Connection *Connection subscriptions s.MapOf[string, *Subscription] - Challenges chan string // NIP-42 Challenges - Notices chan string - ConnectionError chan error + Challenges chan string // NIP-42 Challenges + Notices chan string + ConnectionError chan error + ConnectionContext context.Context // will be canceled when the connection closes okCallbacks s.MapOf[string, func(bool)] @@ -72,7 +73,11 @@ func (r *Relay) String() string { // Once successfully connected, context expiration has no effect: call r.Close // to close the connection. func (r *Relay) Connect(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + r.ConnectionContext = ctx + if r.URL == "" { + cancel() return fmt.Errorf("invalid relay URL '%s'", r.URL) } @@ -85,6 +90,7 @@ func (r *Relay) Connect(ctx context.Context) error { socket, _, err := websocket.DefaultDialer.DialContext(ctx, r.URL, r.RequestHeader) if err != nil { + cancel() return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err) } @@ -198,6 +204,8 @@ func (r *Relay) Connect(ctx context.Context) error { } } } + + cancel() }() return nil @@ -360,7 +368,6 @@ func (r *Relay) PrepareSubscription() *Subscription { random := make([]byte, 7) rand.Read(random) id := hex.EncodeToString(random) - return r.prepareSubscription(id) } diff --git a/subscription.go b/subscription.go index 7f29ee9..8d2b494 100644 --- a/subscription.go +++ b/subscription.go @@ -14,6 +14,7 @@ type Subscription struct { Filters Filters Events chan *Event EndOfStoredEvents chan struct{} + Context context.Context stopped bool emitEose sync.Once @@ -46,6 +47,9 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) { // Fire sends the "REQ" command to the relay. // When ctx is cancelled, sub.Unsub() is called, closing the subscription. func (sub *Subscription) Fire(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + sub.Context = ctx + message := []interface{}{"REQ", sub.id} for _, filter := range sub.Filters { message = append(message, filter) @@ -58,4 +62,16 @@ func (sub *Subscription) Fire(ctx context.Context) { <-ctx.Done() sub.Unsub() }() + + // or when the relay connection is closed + go func() { + <-sub.Relay.ConnectionContext.Done() + + // this will close the Events channel, + // which can be used by an external reader to learn the subscription has stopped + sub.Unsub() + + // we also cancel the context + cancel() + }() }