contexts everywhere.

This commit is contained in:
fiatjaf
2023-01-01 20:22:40 -03:00
parent c18de89dd3
commit 4a62a753e6
3 changed files with 89 additions and 93 deletions

130
relay.go
View File

@@ -43,19 +43,12 @@ type Relay struct {
Notices chan string
ConnectionError chan error
statusChans s.MapOf[string, chan Status]
okCallbacks s.MapOf[string, func(bool)]
}
// RelayConnect forwards calls to RelayConnectContext with a background context.
func RelayConnect(url string) (*Relay, error) {
return RelayConnectContext(context.Background(), url)
}
// RelayConnectContext creates a new relay client and connects to a canonical
// URL using Relay.ConnectContext, passing ctx as is.
func RelayConnectContext(ctx context.Context, url string) (*Relay, error) {
func RelayConnect(ctx context.Context, url string) (*Relay, error) {
r := &Relay{URL: NormalizeURL(url)}
err := r.ConnectContext(ctx)
err := r.Connect(ctx)
return r, err
}
@@ -63,16 +56,11 @@ func (r *Relay) String() string {
return r.URL
}
// Connect calls ConnectContext with a background context.
func (r *Relay) Connect() error {
return r.ConnectContext(context.Background())
}
// ConnectContext tries to establish a websocket connection to r.URL.
// Connect tries to establish a websocket connection to r.URL.
// If the context expires before the connection is complete, an error is returned.
// Once successfully connected, context expiration has no effect: call r.Close
// to close the connection.
func (r *Relay) ConnectContext(ctx context.Context) error {
func (r *Relay) Connect(ctx context.Context) error {
if r.URL == "" {
return fmt.Errorf("invalid relay URL '%s'", r.URL)
}
@@ -176,12 +164,8 @@ func (r *Relay) ConnectContext(ctx context.Context) error {
json.Unmarshal(jsonMessage[1], &eventId)
json.Unmarshal(jsonMessage[2], &ok)
if statusChan, exist := r.statusChans.Load(eventId); exist {
if ok {
statusChan <- PublishStatusSucceeded
} else {
statusChan <- PublishStatusFailed
}
if okCallback, exist := r.okCallbacks.Load(eventId); exist {
okCallback(ok)
}
}
}
@@ -190,60 +174,84 @@ func (r *Relay) ConnectContext(ctx context.Context) error {
return nil
}
func (r *Relay) Publish(event Event) chan Status {
statusChan := make(chan Status, 4)
func (r *Relay) Publish(ctx context.Context, event Event) Status {
status := PublishStatusFailed
go func() {
// we keep track of this so the OK message can be used to close it
r.statusChans.Store(event.ID, statusChan)
defer r.statusChans.Delete(event.ID)
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
err := r.Connection.WriteJSON([]interface{}{"EVENT", event})
if err != nil {
statusChan <- PublishStatusFailed
close(statusChan)
return
// make it cancellable so we can stop everything upon receiving an "OK"
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// listen for an OK callback
okCallback := func(ok bool) {
if ok {
status = PublishStatusSucceeded
} else {
status = PublishStatusFailed
}
statusChan <- PublishStatusSent
cancel()
}
r.okCallbacks.Store(event.ID, okCallback)
defer r.okCallbacks.Delete(event.ID)
// TODO: there's no reason to sub if the relay supports nip-20 (command results).
// in fact, subscribing here with nip20-enabled relays makes it send PublishStatusSucceded
// twice: once here, and the other on "OK" command result.
sub := r.Subscribe(Filters{Filter{IDs: []string{event.ID}}})
for {
select {
case receivedEvent := <-sub.Events:
if receivedEvent.ID == event.ID {
statusChan <- PublishStatusSucceeded
close(statusChan)
break
} else {
continue
}
case <-time.After(5 * time.Second):
close(statusChan)
break
// publish event
err := r.Connection.WriteJSON([]interface{}{"EVENT", event})
if err != nil {
return status
}
// update status (this will be returned later)
status = PublishStatusSent
sub := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}})
for {
select {
case receivedEvent := <-sub.Events:
if receivedEvent.ID == event.ID {
// we got a success, so update our status and proceed to return
status = PublishStatusSucceeded
return status
}
break
case <-ctx.Done():
// return status as it was
// will proceed to return status as it is
// e.g. if this happens because of the timeout then status will probably be "failed"
// but if it happens because okCallback was called then it might be "succeeded"
return status
}
}()
return statusChan
}
}
func (r *Relay) Subscribe(filters Filters) *Subscription {
func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription {
if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
}
sub := r.PrepareSubscription()
sub.Filters = filters
sub.Fire()
sub.Fire(ctx)
return sub
}
func (r *Relay) QuerySync(filter Filter, timeout time.Duration) []Event {
sub := r.Subscribe(Filters{filter})
func (r *Relay) QuerySync(ctx context.Context, filter Filter) []Event {
sub := r.Subscribe(ctx, Filters{filter})
defer sub.Unsub()
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
var events []Event
for {
select {
@@ -251,7 +259,7 @@ func (r *Relay) QuerySync(filter Filter, timeout time.Duration) []Event {
events = append(events, evt)
case <-sub.EndOfStoredEvents:
return events
case <-time.After(timeout):
case <-ctx.Done():
return events
}
}