go-nostr/relay.go

378 lines
9.0 KiB
Go
Raw Permalink Normal View History

package nostr
import (
"context"
2022-12-19 22:40:24 -06:00
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
s "github.com/SaveTheRbtz/generic-sync-map-go"
"github.com/gorilla/websocket"
)
type Status int
const (
PublishStatusSent Status = 0
PublishStatusFailed Status = -1
PublishStatusSucceeded Status = 1
)
func (s Status) String() string {
switch s {
case PublishStatusSent:
return "sent"
case PublishStatusFailed:
return "failed"
case PublishStatusSucceeded:
return "success"
}
return "unknown"
}
type Relay struct {
URL string
RequestHeader http.Header // e.g. for origin header
Connection *Connection
subscriptions s.MapOf[string, *Subscription]
Challenges chan string // NIP-42 Challenges
Notices chan string
ConnectionError chan error
2023-01-01 20:22:40 -03:00
okCallbacks s.MapOf[string, func(bool)]
}
// RelayConnect returns a relay object connected to url.
// Once successfully connected, cancelling ctx has no effect.
// To close the connection, call r.Close().
2023-01-01 20:22:40 -03:00
func RelayConnect(ctx context.Context, url string) (*Relay, error) {
r := &Relay{URL: NormalizeURL(url)}
2023-01-01 20:22:40 -03:00
err := r.Connect(ctx)
return r, err
}
2022-11-26 09:25:31 -03:00
func (r *Relay) String() string {
return r.URL
}
2023-01-01 20:22:40 -03:00
// Connect tries to establish a websocket connection to r.URL.
// If the context expires before the connection is complete, an error is returned.
// Once successfully connected, context expiration has no effect: call r.Close
// to close the connection.
2023-01-01 20:22:40 -03:00
func (r *Relay) Connect(ctx context.Context) error {
if r.URL == "" {
return fmt.Errorf("invalid relay URL '%s'", r.URL)
}
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
defer cancel()
}
socket, _, err := websocket.DefaultDialer.DialContext(ctx, r.URL, r.RequestHeader)
if err != nil {
return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
}
r.Challenges = make(chan string)
r.Notices = make(chan string)
r.ConnectionError = make(chan error)
conn := NewConnection(socket)
2022-11-14 19:48:02 -03:00
r.Connection = conn
2022-11-14 19:48:02 -03:00
go func() {
for {
typ, message, err := conn.socket.ReadMessage()
if err != nil {
r.ConnectionError <- err
break
2022-11-14 19:48:02 -03:00
}
2022-11-14 19:48:02 -03:00
if typ == websocket.PingMessage {
conn.WriteMessage(websocket.PongMessage, nil)
continue
}
2022-11-14 19:48:02 -03:00
if typ != websocket.TextMessage || len(message) == 0 || message[0] != '[' {
continue
}
2022-11-14 19:48:02 -03:00
var jsonMessage []json.RawMessage
err = json.Unmarshal(message, &jsonMessage)
if err != nil {
continue
}
2022-11-14 19:48:02 -03:00
if len(jsonMessage) < 2 {
continue
}
2022-11-14 19:48:02 -03:00
var label string
json.Unmarshal(jsonMessage[0], &label)
switch label {
case "NOTICE":
var content string
json.Unmarshal(jsonMessage[1], &content)
2023-02-05 20:03:00 -03:00
go func() {
r.Notices <- content
}()
case "AUTH":
var challenge string
json.Unmarshal(jsonMessage[1], &challenge)
go func() {
r.Challenges <- challenge
}()
2022-11-14 19:48:02 -03:00
case "EVENT":
if len(jsonMessage) < 3 {
continue
}
2022-11-14 19:48:02 -03:00
var channel string
json.Unmarshal(jsonMessage[1], &channel)
if subscription, ok := r.subscriptions.Load(channel); ok {
var event Event
json.Unmarshal(jsonMessage[2], &event)
// check signature of all received events, ignore invalid
ok, err := event.CheckSignature()
if !ok {
errmsg := ""
if err != nil {
errmsg = err.Error()
}
log.Printf("bad signature: %s", errmsg)
continue
}
// check if the event matches the desired filter, ignore otherwise
func() {
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
if !subscription.Filters.Match(&event) || subscription.stopped {
return
}
2023-01-26 09:04:27 -03:00
subscription.Events <- &event
}()
2022-11-14 19:48:02 -03:00
}
case "EOSE":
if len(jsonMessage) < 2 {
continue
}
2022-11-14 19:48:02 -03:00
var channel string
json.Unmarshal(jsonMessage[1], &channel)
if subscription, ok := r.subscriptions.Load(channel); ok {
subscription.emitEose.Do(func() {
subscription.EndOfStoredEvents <- struct{}{}
})
}
2022-11-14 19:48:02 -03:00
case "OK":
if len(jsonMessage) < 3 {
continue
}
var (
eventId string
ok bool
)
json.Unmarshal(jsonMessage[1], &eventId)
json.Unmarshal(jsonMessage[2], &ok)
2023-01-01 20:22:40 -03:00
if okCallback, exist := r.okCallbacks.Load(eventId); exist {
okCallback(ok)
}
}
}
2022-11-14 19:48:02 -03:00
}()
return nil
}
// Publish sends an "EVENT" command to the relay r as in NIP-01.
// Status can be: success, failed, or sent (no response from relay before ctx times out).
2023-01-01 20:22:40 -03:00
func (r *Relay) Publish(ctx context.Context, event Event) Status {
2023-02-05 13:44:26 +00:00
status := PublishStatusSent
// data races on status variable without this mutex
var mu sync.Mutex
2023-01-01 20:22:40 -03:00
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
// make it cancellable so we can stop everything upon receiving an "OK"
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// listen for an OK callback
okCallback := func(ok bool) {
mu.Lock()
defer mu.Unlock()
2023-01-01 20:22:40 -03:00
if ok {
status = PublishStatusSucceeded
} else {
status = PublishStatusFailed
}
2023-01-01 20:22:40 -03:00
cancel()
}
r.okCallbacks.Store(event.ID, okCallback)
defer r.okCallbacks.Delete(event.ID)
2023-01-01 20:22:40 -03:00
// publish event
if err := r.Connection.WriteJSON([]interface{}{"EVENT", event}); err != nil {
2023-01-01 20:22:40 -03:00
return status
}
sub := r.Subscribe(ctx, Filters{Filter{IDs: []string{event.ID}}})
for {
select {
case receivedEvent := <-sub.Events:
if receivedEvent.ID == event.ID {
// we got a success, so update our status and proceed to return
mu.Lock()
2023-01-01 20:22:40 -03:00
status = PublishStatusSucceeded
mu.Unlock()
2023-01-01 20:22:40 -03:00
return status
}
2023-01-01 20:22:40 -03:00
case <-ctx.Done():
// return status as it was
// will proceed to return status as it is
// e.g. if this happens because of the timeout then status will probably be "failed"
// but if it happens because okCallback was called then it might be "succeeded"
// do not return if okCallback is in process
2023-01-01 20:22:40 -03:00
return status
}
2023-01-01 20:22:40 -03:00
}
}
// Auth sends an "AUTH" command client -> relay as in NIP-42.
// Status can be: success, failed, or sent (no response from relay before ctx times out).
func (r *Relay) Auth(ctx context.Context, event Event) Status {
status := PublishStatusFailed
// data races on status variable without this mutex
var mu sync.Mutex
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
// make it cancellable so we can stop everything upon receiving an "OK"
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// listen for an OK callback
okCallback := func(ok bool) {
mu.Lock()
if ok {
status = PublishStatusSucceeded
} else {
status = PublishStatusFailed
}
mu.Unlock()
cancel()
}
r.okCallbacks.Store(event.ID, okCallback)
defer r.okCallbacks.Delete(event.ID)
// send AUTH
if err := r.Connection.WriteJSON([]interface{}{"AUTH", event}); err != nil {
// status will be "failed"
return status
}
// use mu.Lock() just in case the okCallback got called, extremely unlikely.
mu.Lock()
status = PublishStatusSent
mu.Unlock()
// the context either times out, and the status is "sent"
// or the okCallback is called and the status is set to "succeeded" or "failed"
// NIP-42 does not mandate an "OK" reply to an "AUTH" message
<-ctx.Done()
mu.Lock()
defer mu.Unlock()
return status
}
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
// Events are returned through the channel sub.Events.
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
2023-01-01 20:22:40 -03:00
func (r *Relay) Subscribe(ctx context.Context, filters Filters) *Subscription {
2022-11-14 19:48:02 -03:00
if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
}
sub := r.PrepareSubscription()
sub.Filters = filters
2023-01-01 20:22:40 -03:00
sub.Fire(ctx)
return sub
}
2023-01-26 09:04:27 -03:00
func (r *Relay) QuerySync(ctx context.Context, filter Filter) []*Event {
2023-01-01 20:22:40 -03:00
sub := r.Subscribe(ctx, Filters{filter})
defer sub.Unsub()
if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 3 seconds
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
}
2023-01-26 09:04:27 -03:00
var events []*Event
2022-11-26 19:32:03 -03:00
for {
select {
case evt := <-sub.Events:
events = append(events, evt)
case <-sub.EndOfStoredEvents:
return events
2023-01-01 20:22:40 -03:00
case <-ctx.Done():
2022-11-26 19:32:03 -03:00
return events
}
}
}
func (r *Relay) PrepareSubscription() *Subscription {
random := make([]byte, 7)
rand.Read(random)
id := hex.EncodeToString(random)
return r.prepareSubscription(id)
}
func (r *Relay) prepareSubscription(id string) *Subscription {
sub := &Subscription{
Relay: r,
2022-11-14 19:48:02 -03:00
conn: r.Connection,
id: id,
2023-01-26 09:04:27 -03:00
Events: make(chan *Event),
2022-11-16 10:05:28 -03:00
EndOfStoredEvents: make(chan struct{}, 1),
2022-11-14 19:48:02 -03:00
}
r.subscriptions.Store(sub.id, sub)
return sub
}
func (r *Relay) Close() error {
return r.Connection.Close()
}