added sub.mutex handling in the relay Connect() function (#37)

This commit is contained in:
barkyq 2023-01-15 07:19:00 -05:00 committed by GitHub
parent 635c1b0132
commit a37ffacc74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 8 deletions

View File

@ -43,7 +43,8 @@ if _, v, err := nip19.Decode(npub); err == nil {
panic(err) panic(err)
} }
sub := relay.Subscribe(context.Background(), filters) ctx, cancel := context.WithCancel(context.Background())
sub := relay.Subscribe(ctx, filters)
go func() { go func() {
<-sub.EndOfStoredEvents <-sub.EndOfStoredEvents
@ -52,7 +53,9 @@ go func() {
for ev := range sub.Events { for ev := range sub.Events {
// handle returned event. // handle returned event.
// channel will stay open until sub.Unsub() is called // channel will stay open until the ctx is cancelled (in this case, by calling cancel())
fmt.Println(ev.ID)
} }
``` ```

View File

@ -46,6 +46,9 @@ type Relay struct {
okCallbacks s.MapOf[string, func(bool)] okCallbacks s.MapOf[string, func(bool)]
} }
// RelayConnect returns a relay object connected to url
// Once successfully connected, cancelling ctx has no effect
// To close the connection, call r.Close()
func RelayConnect(ctx context.Context, url string) (*Relay, error) { func RelayConnect(ctx context.Context, url string) (*Relay, error) {
r := &Relay{URL: NormalizeURL(url)} r := &Relay{URL: NormalizeURL(url)}
err := r.Connect(ctx) err := r.Connect(ctx)
@ -141,13 +144,14 @@ func (r *Relay) Connect(ctx context.Context) error {
} }
// check if the event matches the desired filter, ignore otherwise // check if the event matches the desired filter, ignore otherwise
if !subscription.Filters.Match(&event) { func() {
continue subscription.mutex.Lock()
} defer subscription.mutex.Unlock()
if !subscription.Filters.Match(&event) || subscription.stopped {
if !subscription.stopped { return
}
subscription.Events <- event subscription.Events <- event
} }()
} }
case "EOSE": case "EOSE":
if len(jsonMessage) < 2 { if len(jsonMessage) < 2 {
@ -181,6 +185,8 @@ func (r *Relay) Connect(ctx context.Context) error {
return nil return nil
} }
// Publish sends an "EVENT" command to the relay r as in NIP-01
// status can be: success, failed, or sent (no response from relay before ctx times out)
func (r *Relay) Publish(ctx context.Context, event Event) Status { func (r *Relay) Publish(ctx context.Context, event Event) Status {
status := PublishStatusFailed status := PublishStatusFailed
@ -236,6 +242,9 @@ func (r *Relay) Publish(ctx context.Context, event Event) Status {
} }
} }
// Subscribe sends a "REQ" command to the relay r as in NIP-01
// Events are returned through the channel sub.Events
// the subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01)
func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription { func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription {
if r.Connection == nil { if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))

View File

@ -24,6 +24,8 @@ type EventMessage struct {
Relay string Relay string
} }
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01
// Unsub() also closes the channel sub.Events
func (sub *Subscription) Unsub() { func (sub *Subscription) Unsub() {
sub.mutex.Lock() sub.mutex.Lock()
defer sub.mutex.Unlock() defer sub.mutex.Unlock()
@ -35,11 +37,14 @@ func (sub *Subscription) Unsub() {
sub.stopped = true sub.stopped = true
} }
// Sub sets sub.Filters and then calls sub.Fire(ctx)
func (sub *Subscription) Sub(ctx context.Context, filters Filters) { func (sub *Subscription) Sub(ctx context.Context, filters Filters) {
sub.Filters = filters sub.Filters = filters
sub.Fire(ctx) sub.Fire(ctx)
} }
// Fire sends the "REQ" command to the relay
// when ctx is cancelled, sub.Unsub() is called, closing the subscription
func (sub *Subscription) Fire(ctx context.Context) { func (sub *Subscription) Fire(ctx context.Context) {
message := []interface{}{"REQ", sub.id} message := []interface{}{"REQ", sub.id}
for _, filter := range sub.Filters { for _, filter := range sub.Filters {