go-nostr/relay.go

436 lines
11 KiB
Go
Raw Permalink Normal View History

package nostr
import (
"context"
"fmt"
"net/http"
"sync"
"time"
2023-05-30 13:52:14 -03:00
"github.com/puzpuzpuz/xsync"
)
type Status int
const (
PublishStatusSent Status = 0
PublishStatusFailed Status = -1
PublishStatusSucceeded Status = 1
)
2023-05-30 14:01:07 -03:00
var subscriptionIdCounter xsync.Counter
func (s Status) String() string {
switch s {
case PublishStatusSent:
return "sent"
case PublishStatusFailed:
return "failed"
case PublishStatusSucceeded:
return "success"
}
return "unknown"
}
type Relay struct {
URL string
RequestHeader http.Header // e.g. for origin header
2023-05-05 22:00:25 +02:00
Connection *Connection
Subscriptions *xsync.MapOf[string, *Subscription]
Challenges chan string // NIP-42 Challenges
Notices chan string
ConnectionError error
connectionContext context.Context // will be canceled when the connection closes
connectionContextCancel context.CancelFunc
2023-05-30 13:52:14 -03:00
okCallbacks *xsync.MapOf[string, func(bool, string)]
mutex sync.RWMutex
2023-03-14 17:07:22 -03:00
// custom things that aren't often used
//
AssumeValid bool // this will skip verifying signatures for events received from this relay
}
// NewRelay returns a new relay. The relay connection will be closed when the context is canceled.
func NewRelay(ctx context.Context, url string) *Relay {
2023-05-30 13:52:14 -03:00
return &Relay{
URL: NormalizeURL(url),
connectionContext: ctx,
Subscriptions: xsync.NewMapOf[*Subscription](),
2023-05-30 13:52:14 -03:00
okCallbacks: xsync.NewMapOf[func(bool, string)](),
}
}
// RelayConnect returns a relay object connected to url.
// Once successfully connected, cancelling ctx has no effect.
// To close the connection, call r.Close().
2023-01-01 20:22:40 -03:00
func RelayConnect(ctx context.Context, url string) (*Relay, error) {
r := NewRelay(context.Background(), url)
2023-01-01 20:22:40 -03:00
err := r.Connect(ctx)
return r, err
}
2022-11-26 09:25:31 -03:00
func (r *Relay) String() string {
return r.URL
}
// Context retrieves the context that is associated with this relay connection.
func (r *Relay) Context() context.Context { return r.connectionContext }
2023-01-01 20:22:40 -03:00
// Connect tries to establish a websocket connection to r.URL.
// If the context expires before the connection is complete, an error is returned.
// Once successfully connected, context expiration has no effect: call r.Close
// to close the connection.
//
// The underlying relay connection will use a background context. If you want to
// pass a custom context to the underlying relay connection, use NewRelay() and
// then Relay.Connect().
2023-01-01 20:22:40 -03:00
func (r *Relay) Connect(ctx context.Context) error {
if r.connectionContext == nil {
connectionContext, cancel := context.WithCancel(context.Background())
r.connectionContext = connectionContext
r.connectionContextCancel = cancel
}
if r.URL == "" {
return fmt.Errorf("invalid relay URL '%s'", r.URL)
}
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
defer cancel()
}
2023-05-05 22:00:25 +02:00
conn, err := NewConnection(ctx, r.URL, r.RequestHeader)
if err != nil {
return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
}
2023-05-04 23:51:15 +02:00
r.Connection = conn
r.Challenges = make(chan string)
r.Notices = make(chan string)
2023-03-29 14:55:52 -03:00
// close these channels when the connection is dropped
go func() {
<-r.connectionContext.Done()
r.mutex.Lock()
2023-03-29 14:55:52 -03:00
close(r.Challenges)
close(r.Notices)
r.mutex.Unlock()
2023-03-29 14:55:52 -03:00
}()
// ping every 29 seconds
go func() {
2023-03-29 14:55:52 -03:00
ticker := time.NewTicker(29 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
2023-05-04 23:51:15 +02:00
err := conn.Ping()
if err != nil {
InfoLogger.Printf("{%s} error writing ping: %v; closing websocket", r.URL, err)
return
}
}
}
}()
// handling received messages
2022-11-14 19:48:02 -03:00
go func() {
for {
message, err := conn.ReadMessage(r.connectionContext)
2022-11-14 19:48:02 -03:00
if err != nil {
r.ConnectionError = err
break
2022-11-14 19:48:02 -03:00
}
envelope := ParseMessage(message)
if envelope == nil {
2022-11-14 19:48:02 -03:00
continue
}
switch env := envelope.(type) {
case *NoticeEnvelope:
debugLog("{%s} %v\n", r.URL, message)
// TODO: improve this, otherwise if the application doesn't read the notices
// we'll consume ever more memory with each new notice
2023-02-05 20:03:00 -03:00
go func() {
r.mutex.RLock()
if r.connectionContext.Err() == nil {
r.Notices <- string(*env)
2023-03-29 14:55:52 -03:00
}
r.mutex.RUnlock()
2023-02-05 20:03:00 -03:00
}()
case *AuthEnvelope:
debugLog("{%s} %v\n", r.URL, message)
if env.Challenge == nil {
continue
}
// TODO: same as with NoticeEnvelope
go func() {
r.mutex.RLock()
if r.connectionContext.Err() == nil {
r.Challenges <- *env.Challenge
2023-03-29 14:55:52 -03:00
}
r.mutex.RUnlock()
}()
case *EventEnvelope:
debugLog("{%s} %v\n", r.URL, message)
if env.SubscriptionID == nil {
continue
}
if subscription, ok := r.Subscriptions.Load(*env.SubscriptionID); !ok {
InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID)
continue
} else {
func() {
// check if the event matches the desired filter, ignore otherwise
if !subscription.Filters.Match(&env.Event) {
InfoLogger.Printf("{%s} filter does not match: %v ~ %v\n", r.URL, subscription.Filters[0], env.Event)
return
2022-11-14 19:48:02 -03:00
}
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
if subscription.stopped {
return
}
// check signature, ignore invalid, except from trusted (AssumeValid) relays
if !r.AssumeValid {
if ok, err := env.Event.CheckSignature(); !ok {
errmsg := ""
if err != nil {
errmsg = err.Error()
}
InfoLogger.Printf("{%s} bad signature: %s\n", r.URL, errmsg)
return
}
}
subscription.Events <- &env.Event
}()
2022-11-14 19:48:02 -03:00
}
case *EOSEEnvelope:
debugLog("{%s} %v\n", r.URL, message)
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
subscription.emitEose.Do(func() {
subscription.EndOfStoredEvents <- struct{}{}
})
}
case *OKEnvelope:
if okCallback, exist := r.okCallbacks.Load(env.EventID); exist {
okCallback(env.OK, *env.Reason)
}
}
}
2022-11-14 19:48:02 -03:00
}()
return nil
}
// Publish sends an "EVENT" command to the relay r as in NIP-01.
// Status can be: success, failed, or sent (no response from relay before ctx times out).
2023-03-16 14:27:33 -03:00
func (r *Relay) Publish(ctx context.Context, event Event) (Status, error) {
status := PublishStatusFailed
2023-03-16 14:27:33 -03:00
var err error
// data races on status variable without this mutex
var mu sync.Mutex
2023-01-01 20:22:40 -03:00
if _, ok := ctx.Deadline(); !ok {
2023-04-11 15:33:29 -03:00
// if no timeout is set, force it to 7 seconds
2023-01-01 20:22:40 -03:00
var cancel context.CancelFunc
2023-04-11 15:33:29 -03:00
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
2023-01-01 20:22:40 -03:00
defer cancel()
}
// make it cancellable so we can stop everything upon receiving an "OK"
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// listen for an OK callback
2023-03-16 14:27:33 -03:00
okCallback := func(ok bool, msg string) {
mu.Lock()
defer mu.Unlock()
2023-01-01 20:22:40 -03:00
if ok {
status = PublishStatusSucceeded
} else {
status = PublishStatusFailed
2023-03-16 14:27:33 -03:00
err = fmt.Errorf("msg: %s", msg)
}
2023-01-01 20:22:40 -03:00
cancel()
}
r.okCallbacks.Store(event.ID, okCallback)
defer r.okCallbacks.Delete(event.ID)
2023-01-01 20:22:40 -03:00
// publish event
envb, _ := EventEnvelope{Event: event}.MarshalJSON()
debugLog("{%s} sending %v\n", r.URL, envb)
status = PublishStatusSent
if err := r.Connection.WriteMessage(envb); err != nil {
status = PublishStatusFailed
2023-03-16 14:27:33 -03:00
return status, err
2023-01-01 20:22:40 -03:00
}
for {
select {
case <-ctx.Done(): // this will be called when we get an OK
// proceed to return status as it is
// e.g. if this happens because of the timeout then status will probably be "failed"
// but if it happens because okCallback was called then it might be "succeeded"
// do not return if okCallback is in process
return status, err
case <-r.connectionContext.Done():
// same as above, but when the relay loses connectivity entirely
return status, err
}
2023-01-01 20:22:40 -03:00
}
}
// Auth sends an "AUTH" command client -> relay as in NIP-42.
// Status can be: success, failed, or sent (no response from relay before ctx times out).
2023-03-16 14:27:33 -03:00
func (r *Relay) Auth(ctx context.Context, event Event) (Status, error) {
status := PublishStatusFailed
2023-03-16 14:27:33 -03:00
var err error
// data races on status variable without this mutex
var mu sync.Mutex
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
// make it cancellable so we can stop everything upon receiving an "OK"
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// listen for an OK callback
2023-03-16 14:27:33 -03:00
okCallback := func(ok bool, msg string) {
mu.Lock()
if ok {
status = PublishStatusSucceeded
} else {
status = PublishStatusFailed
2023-03-16 14:27:33 -03:00
err = fmt.Errorf("msg: %s", msg)
}
mu.Unlock()
cancel()
}
r.okCallbacks.Store(event.ID, okCallback)
defer r.okCallbacks.Delete(event.ID)
// send AUTH
authResponse, _ := AuthEnvelope{Event: event}.MarshalJSON()
debugLog("{%s} sending %v\n", r.URL, authResponse)
if err := r.Connection.WriteMessage(authResponse); err != nil {
// status will be "failed"
2023-03-16 14:27:33 -03:00
return status, err
}
// use mu.Lock() just in case the okCallback got called, extremely unlikely.
mu.Lock()
status = PublishStatusSent
mu.Unlock()
// the context either times out, and the status is "sent"
// or the okCallback is called and the status is set to "succeeded" or "failed"
// NIP-42 does not mandate an "OK" reply to an "AUTH" message
<-ctx.Done()
mu.Lock()
defer mu.Unlock()
2023-03-16 14:27:33 -03:00
return status, err
}
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
// Events are returned through the channel sub.Events.
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
func (r *Relay) Subscribe(ctx context.Context, filters Filters) (*Subscription, error) {
2022-11-14 19:48:02 -03:00
if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
}
sub := r.PrepareSubscription(ctx)
sub.Filters = filters
2023-01-01 20:22:40 -03:00
if err := sub.Fire(); err != nil {
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filters, r.URL, err)
}
return sub, nil
}
func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) {
sub, err := r.Subscribe(ctx, Filters{filter})
if err != nil {
return nil, err
}
2023-01-01 20:22:40 -03:00
defer sub.Unsub()
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
2023-01-01 20:22:40 -03:00
defer cancel()
}
2023-01-26 09:04:27 -03:00
var events []*Event
2022-11-26 19:32:03 -03:00
for {
select {
case evt := <-sub.Events:
if evt == nil {
// channel is closed
return events, nil
}
2022-11-26 19:32:03 -03:00
events = append(events, evt)
case <-sub.EndOfStoredEvents:
return events, nil
2023-01-01 20:22:40 -03:00
case <-ctx.Done():
return events, nil
2022-11-26 19:32:03 -03:00
}
}
}
func (r *Relay) PrepareSubscription(ctx context.Context) *Subscription {
2023-05-30 14:01:07 -03:00
current := subscriptionIdCounter.Value()
subscriptionIdCounter.Inc()
ctx, cancel := context.WithCancel(ctx)
go func() {
// ensure the subscription dies if the relay connection dies
<-r.connectionContext.Done()
cancel()
}()
2023-03-18 15:09:49 -03:00
return &Subscription{
Relay: r,
Context: ctx,
cancel: cancel,
2022-11-14 19:48:02 -03:00
conn: r.Connection,
2023-03-18 15:09:49 -03:00
counter: current,
2023-01-26 09:04:27 -03:00
Events: make(chan *Event),
2022-11-16 10:05:28 -03:00
EndOfStoredEvents: make(chan struct{}, 1),
2022-11-14 19:48:02 -03:00
}
}
func (r *Relay) Close() error {
if r.connectionContextCancel == nil {
return fmt.Errorf("relay not connected")
}
r.connectionContextCancel()
r.connectionContextCancel = nil
return r.Connection.Close()
}