stopping data races with sync.mutex to Publish() in relay.go

This commit is contained in:
Dylan Cant
2023-01-16 11:32:00 -05:00
parent 87b6280299
commit 67813257df

View File

@@ -199,6 +199,9 @@ func (r *Relay) Connect(ctx context.Context) error {
func (r *Relay) Publish(ctx context.Context, event Event) Status { func (r *Relay) Publish(ctx context.Context, event Event) Status {
status := PublishStatusFailed status := PublishStatusFailed
// data races on status variable without this mutex
var mu sync.Mutex
if _, ok := ctx.Deadline(); !ok { if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds // if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc var cancel context.CancelFunc
@@ -213,6 +216,8 @@ func (r *Relay) Publish(ctx context.Context, event Event) Status {
// listen for an OK callback // listen for an OK callback
okCallback := func(ok bool) { okCallback := func(ok bool) {
mu.Lock()
defer mu.Unlock()
if ok { if ok {
status = PublishStatusSucceeded status = PublishStatusSucceeded
} else { } else {
@@ -224,20 +229,23 @@ func (r *Relay) Publish(ctx context.Context, event Event) Status {
defer r.okCallbacks.Delete(event.ID) defer r.okCallbacks.Delete(event.ID)
// publish event // publish event
err := r.Connection.WriteJSON([]interface{}{"EVENT", event}) if err := r.Connection.WriteJSON([]interface{}{"EVENT", event}); err != nil {
if err != nil {
return status return status
} }
// update status (this will be returned later) // update status (this will be returned later)
mu.Lock()
status = PublishStatusSent status = PublishStatusSent
mu.Unlock()
sub := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}}) sub := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}})
defer mu.Unlock()
for { for {
select { select {
case receivedEvent := <-sub.Events: case receivedEvent := <-sub.Events:
if receivedEvent.ID == event.ID { if receivedEvent.ID == event.ID {
// we got a success, so update our status and proceed to return // we got a success, so update our status and proceed to return
mu.Lock()
status = PublishStatusSucceeded status = PublishStatusSucceeded
return status return status
} }
@@ -246,6 +254,8 @@ func (r *Relay) Publish(ctx context.Context, event Event) Status {
// will proceed to return status as it is // will proceed to return status as it is
// e.g. if this happens because of the timeout then status will probably be "failed" // e.g. if this happens because of the timeout then status will probably be "failed"
// but if it happens because okCallback was called then it might be "succeeded" // but if it happens because okCallback was called then it might be "succeeded"
// do not return if okCallback is in process
mu.Lock()
return status return status
} }
} }
@@ -289,12 +299,12 @@ func (r *Relay) Auth(ctx context.Context, event Event) Status {
if err := r.Connection.WriteJSON([]interface{}{"AUTH", event}); err != nil { if err := r.Connection.WriteJSON([]interface{}{"AUTH", event}); err != nil {
// status will be "failed" // status will be "failed"
return status return status
} else {
// use mu.Lock() just in case the okCallback got called, extremely unlikely.
mu.Lock()
status = PublishStatusSent
mu.Unlock()
} }
// use mu.Lock() just in case the okCallback got called, extremely unlikely.
mu.Lock()
status = PublishStatusSent
mu.Unlock()
// the context either times out, and the status is "sent" // the context either times out, and the status is "sent"
// or the okCallback is called and the status is set to "succeeded" or "failed" // or the okCallback is called and the status is set to "succeeded" or "failed"
// NIP-42 does not mandate an "OK" reply to an "AUTH" message // NIP-42 does not mandate an "OK" reply to an "AUTH" message