Subscription.Fire() can error, so Relay.Subscribe() must also.

This commit is contained in:
fiatjaf 2023-04-06 16:21:25 -03:00
parent 4c9db5928a
commit ef428ff39f
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
2 changed files with 22 additions and 10 deletions

View File

@ -301,7 +301,11 @@ func (r *Relay) Publish(ctx context.Context, event Event) (Status, error) {
return status, err return status, err
} }
sub := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}}) sub, err := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}})
if err != nil {
return status, fmt.Errorf("failed to subscribe to just published event %s at %s: %w", event.ID, r.URL, err)
}
for { for {
select { select {
case receivedEvent := <-sub.Events: case receivedEvent := <-sub.Events:
@ -389,20 +393,27 @@ func (r *Relay) Auth(ctx context.Context, event Event) (Status, error) {
// Subscribe sends a "REQ" command to the relay r as in NIP-01. // Subscribe sends a "REQ" command to the relay r as in NIP-01.
// Events are returned through the channel sub.Events. // Events are returned through the channel sub.Events.
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01). // The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription { func (r *Relay) Subscribe(ctx context.Context, filters Filters) (*Subscription, error) {
if r.Connection == nil { if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
} }
sub := r.PrepareSubscription(ctx) sub := r.PrepareSubscription(ctx)
sub.Filters = filters sub.Filters = filters
sub.Fire()
return sub if err := sub.Fire(); err != nil {
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filters, r.URL, err)
}
return sub, nil
} }
func (r *Relay) QuerySync(ctx context.Context, filter Filter) []*Event { func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) {
sub := r.Subscribe(ctx, Filters{filter}) sub, err := r.Subscribe(ctx, Filters{filter})
if err != nil {
return nil, err
}
defer sub.Unsub() defer sub.Unsub()
if _, ok := ctx.Deadline(); !ok { if _, ok := ctx.Deadline(); !ok {
@ -418,13 +429,13 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter) []*Event {
case evt := <-sub.Events: case evt := <-sub.Events:
if evt == nil { if evt == nil {
// channel is closed // channel is closed
return events return events, nil
} }
events = append(events, evt) events = append(events, evt)
case <-sub.EndOfStoredEvents: case <-sub.EndOfStoredEvents:
return events return events, nil
case <-ctx.Done(): case <-ctx.Done():
return events return events, nil
} }
} }
} }

View File

@ -2,6 +2,7 @@ package nostr
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
"sync" "sync"
) )
@ -70,7 +71,7 @@ func (sub *Subscription) Fire() error {
err := sub.conn.WriteJSON(message) err := sub.conn.WriteJSON(message)
if err != nil { if err != nil {
sub.cancel() sub.cancel()
return err return fmt.Errorf("failed to write: %w", err)
} }
// the subscription ends once the context is canceled // the subscription ends once the context is canceled