From 4b1f69ec279e618b9a5d723495578e874d6e3629 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 21 Mar 2023 14:50:34 -0300 Subject: [PATCH] subscriptions receive their context on Prepare(). --- relay.go | 13 ++++++++----- subscription.go | 15 ++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/relay.go b/relay.go index 2b97fce..60c5e56 100644 --- a/relay.go +++ b/relay.go @@ -161,14 +161,13 @@ func (r *Relay) Connect(ctx context.Context) error { // check if the event matches the desired filter, ignore otherwise if !subscription.Filters.Match(&event) { - log.Printf("filter does not match\n") + log.Printf("filter does not match: %v ~ %v\n", subscription.Filters[0], event) return } subscription.mutex.Lock() defer subscription.mutex.Unlock() if subscription.stopped { - log.Printf("subscription '%s' is stopped\n", subId) return } @@ -360,9 +359,9 @@ func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription { panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) } - sub := r.PrepareSubscription() + sub := r.PrepareSubscription(ctx) sub.Filters = filters - sub.Fire(ctx) + sub.Fire() return sub } @@ -395,12 +394,16 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter) []*Event { } } -func (r *Relay) PrepareSubscription() *Subscription { +func (r *Relay) PrepareSubscription(ctx context.Context) *Subscription { current := subscriptionIdCounter subscriptionIdCounter++ + ctx, cancel := context.WithCancel(ctx) + return &Subscription{ Relay: r, + Context: ctx, + cancel: cancel, conn: r.Connection, counter: current, Events: make(chan *Event), diff --git a/subscription.go b/subscription.go index 71326b4..d4ac5a9 100644 --- a/subscription.go +++ b/subscription.go @@ -17,6 +17,7 @@ type Subscription struct { Events chan *Event EndOfStoredEvents chan struct{} Context context.Context + cancel context.CancelFunc stopped bool emitEose sync.Once @@ -54,17 +55,13 @@ func (sub *Subscription) Unsub() { // Sub sets sub.Filters and then calls sub.Fire(ctx). func (sub *Subscription) Sub(ctx context.Context, filters Filters) { sub.Filters = filters - sub.Fire(ctx) + sub.Fire() } // Fire sends the "REQ" command to the relay. -// When ctx is cancelled, sub.Unsub() is called, closing the subscription. -func (sub *Subscription) Fire(ctx context.Context) error { +func (sub *Subscription) Fire() error { sub.Relay.subscriptions.Store(sub.GetID(), sub) - ctx, cancel := context.WithCancel(ctx) - sub.Context = ctx - message := []interface{}{"REQ", sub.GetID()} for _, filter := range sub.Filters { message = append(message, filter) @@ -72,13 +69,13 @@ func (sub *Subscription) Fire(ctx context.Context) error { err := sub.conn.WriteJSON(message) if err != nil { - cancel() + sub.cancel() return err } // the subscription ends once the context is canceled go func() { - <-ctx.Done() + <-sub.Context.Done() sub.Unsub() }() @@ -87,7 +84,7 @@ func (sub *Subscription) Fire(ctx context.Context) error { <-sub.Relay.ConnectionContext.Done() // cancel the context -- this will cause the other context cancelation cause above to be called - cancel() + sub.cancel() }() return nil