context cancelation for relay connections and subscriptions.

This commit is contained in:
fiatjaf 2023-03-16 14:15:16 -03:00
parent 5e24b24442
commit 0765f7b91b
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
2 changed files with 27 additions and 4 deletions

View File

@ -43,9 +43,10 @@ type Relay struct {
Connection *Connection
subscriptions s.MapOf[string, *Subscription]
Challenges chan string // NIP-42 Challenges
Notices chan string
ConnectionError chan error
Challenges chan string // NIP-42 Challenges
Notices chan string
ConnectionError chan error
ConnectionContext context.Context // will be canceled when the connection closes
okCallbacks s.MapOf[string, func(bool)]
@ -72,7 +73,11 @@ func (r *Relay) String() string {
// Once successfully connected, context expiration has no effect: call r.Close
// to close the connection.
func (r *Relay) Connect(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
r.ConnectionContext = ctx
if r.URL == "" {
cancel()
return fmt.Errorf("invalid relay URL '%s'", r.URL)
}
@ -85,6 +90,7 @@ func (r *Relay) Connect(ctx context.Context) error {
socket, _, err := websocket.DefaultDialer.DialContext(ctx, r.URL, r.RequestHeader)
if err != nil {
cancel()
return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
}
@ -198,6 +204,8 @@ func (r *Relay) Connect(ctx context.Context) error {
}
}
}
cancel()
}()
return nil
@ -360,7 +368,6 @@ func (r *Relay) PrepareSubscription() *Subscription {
random := make([]byte, 7)
rand.Read(random)
id := hex.EncodeToString(random)
return r.prepareSubscription(id)
}

View File

@ -14,6 +14,7 @@ type Subscription struct {
Filters Filters
Events chan *Event
EndOfStoredEvents chan struct{}
Context context.Context
stopped bool
emitEose sync.Once
@ -46,6 +47,9 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) {
// Fire sends the "REQ" command to the relay.
// When ctx is cancelled, sub.Unsub() is called, closing the subscription.
func (sub *Subscription) Fire(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
sub.Context = ctx
message := []interface{}{"REQ", sub.id}
for _, filter := range sub.Filters {
message = append(message, filter)
@ -58,4 +62,16 @@ func (sub *Subscription) Fire(ctx context.Context) {
<-ctx.Done()
sub.Unsub()
}()
// or when the relay connection is closed
go func() {
<-sub.Relay.ConnectionContext.Done()
// this will close the Events channel,
// which can be used by an external reader to learn the subscription has stopped
sub.Unsub()
// we also cancel the context
cancel()
}()
}