diff --git a/README.md b/README.md index a868d3f..d4bee2b 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,8 @@ if _, v, err := nip19.Decode(npub); err == nil { panic(err) } -sub := relay.Subscribe(context.Background(), filters) +ctx, cancel := context.WithCancel(context.Background()) +sub := relay.Subscribe(ctx, filters) go func() { <-sub.EndOfStoredEvents @@ -52,7 +53,9 @@ go func() { for ev := range sub.Events { // handle returned event. - // channel will stay open until sub.Unsub() is called + // channel will stay open until the ctx is cancelled (in this case, by calling cancel()) + + fmt.Println(ev.ID) } ``` diff --git a/relay.go b/relay.go index 444df4e..651d874 100644 --- a/relay.go +++ b/relay.go @@ -46,6 +46,9 @@ type Relay struct { okCallbacks s.MapOf[string, func(bool)] } +// RelayConnect returns a relay object connected to url +// Once successfully connected, cancelling ctx has no effect +// To close the connection, call r.Close() func RelayConnect(ctx context.Context, url string) (*Relay, error) { r := &Relay{URL: NormalizeURL(url)} err := r.Connect(ctx) @@ -141,13 +144,14 @@ func (r *Relay) Connect(ctx context.Context) error { } // check if the event matches the desired filter, ignore otherwise - if !subscription.Filters.Match(&event) { - continue - } - - if !subscription.stopped { + func() { + subscription.mutex.Lock() + defer subscription.mutex.Unlock() + if !subscription.Filters.Match(&event) || subscription.stopped { + return + } subscription.Events <- event - } + }() } case "EOSE": if len(jsonMessage) < 2 { @@ -181,6 +185,8 @@ func (r *Relay) Connect(ctx context.Context) error { return nil } +// Publish sends an "EVENT" command to the relay r as in NIP-01 +// status can be: success, failed, or sent (no response from relay before ctx times out) func (r *Relay) Publish(ctx context.Context, event Event) Status { status := PublishStatusFailed @@ -236,6 +242,9 @@ func (r *Relay) Publish(ctx context.Context, event Event) Status { } } +// Subscribe sends a "REQ" command to the relay r as in NIP-01 +// Events are returned through the channel sub.Events +// the subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01) func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription { if r.Connection == nil { panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) diff --git a/subscription.go b/subscription.go index 8ae66e5..55d294f 100644 --- a/subscription.go +++ b/subscription.go @@ -24,6 +24,8 @@ type EventMessage struct { Relay string } +// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01 +// Unsub() also closes the channel sub.Events func (sub *Subscription) Unsub() { sub.mutex.Lock() defer sub.mutex.Unlock() @@ -35,11 +37,14 @@ func (sub *Subscription) Unsub() { sub.stopped = true } +// Sub sets sub.Filters and then calls sub.Fire(ctx) func (sub *Subscription) Sub(ctx context.Context, filters Filters) { sub.Filters = filters sub.Fire(ctx) } +// Fire sends the "REQ" command to the relay +// when ctx is cancelled, sub.Unsub() is called, closing the subscription func (sub *Subscription) Fire(ctx context.Context) { message := []interface{}{"REQ", sub.id} for _, filter := range sub.Filters {