go-nostr/relaypool.go

289 lines
6.1 KiB
Go
Raw Permalink Normal View History

2022-01-02 08:44:18 -03:00
package nostr
2021-01-31 11:05:09 -03:00
import (
2021-02-20 17:44:05 -03:00
"crypto/rand"
"encoding/hex"
2021-01-31 11:05:09 -03:00
"encoding/json"
"errors"
2021-02-07 07:56:55 -03:00
"fmt"
2021-01-31 11:05:09 -03:00
"log"
2021-02-20 17:44:05 -03:00
"time"
2021-01-31 11:05:09 -03:00
s "github.com/SaveTheRbtz/generic-sync-map-go"
2021-01-31 11:05:09 -03:00
"github.com/gorilla/websocket"
)
type Status int
2022-01-02 08:44:18 -03:00
const (
PublishStatusSent Status = 0
PublishStatusFailed Status = -1
PublishStatusSucceeded Status = 1
2022-01-02 08:44:18 -03:00
)
func (s Status) String() string {
switch s {
case PublishStatusSent:
return "sent"
case PublishStatusFailed:
return "failed"
case PublishStatusSucceeded:
return "success"
}
return "unknown"
}
2022-01-02 08:44:18 -03:00
type PublishStatus struct {
Relay string
Status Status
2022-01-02 08:44:18 -03:00
}
2021-01-31 11:05:09 -03:00
type RelayPool struct {
SecretKey *string
Relays s.MapOf[string, RelayPoolPolicy]
websockets s.MapOf[string, *Connection]
subscriptions s.MapOf[string, *Subscription]
2021-01-31 11:05:09 -03:00
Notices chan *NoticeMessage
}
type RelayPoolPolicy interface {
ShouldRead(Filters) bool
ShouldWrite(*Event) bool
2021-01-31 11:05:09 -03:00
}
type SimplePolicy struct {
Read bool
Write bool
}
func (s SimplePolicy) ShouldRead(_ Filters) bool {
return s.Read
}
func (s SimplePolicy) ShouldWrite(_ *Event) bool {
return s.Write
}
2021-01-31 11:05:09 -03:00
type NoticeMessage struct {
Message string
Relay string
}
// New creates a new RelayPool with no relays in it
2022-01-02 08:44:18 -03:00
func NewRelayPool() *RelayPool {
2021-01-31 11:05:09 -03:00
return &RelayPool{
Relays: s.MapOf[string, RelayPoolPolicy]{},
websockets: s.MapOf[string, *Connection]{},
subscriptions: s.MapOf[string, *Subscription]{},
2021-01-31 11:05:09 -03:00
Notices: make(chan *NoticeMessage),
}
}
// Add adds a new relay to the pool, if policy is nil, it will be a simple
// read+write policy.
func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error {
2021-01-31 11:05:09 -03:00
if policy == nil {
policy = SimplePolicy{Read: true, Write: true}
2021-01-31 11:05:09 -03:00
}
2022-01-02 08:44:18 -03:00
nm := NormalizeURL(url)
2021-01-31 11:05:09 -03:00
if nm == "" {
2021-02-07 07:56:55 -03:00
return fmt.Errorf("invalid relay URL '%s'", url)
2021-01-31 11:05:09 -03:00
}
socket, _, err := websocket.DefaultDialer.Dial(NormalizeURL(url), nil)
2021-01-31 11:05:09 -03:00
if err != nil {
2021-02-07 07:56:55 -03:00
return fmt.Errorf("error opening websocket to '%s': %w", nm, err)
2021-01-31 11:05:09 -03:00
}
conn := NewConnection(socket)
r.Relays.Store(nm, policy)
r.websockets.Store(nm, conn)
2021-01-31 11:05:09 -03:00
r.subscriptions.Range(func(_ string, sub *Subscription) bool {
2021-02-20 17:44:05 -03:00
sub.addRelay(nm, conn)
return true
})
2021-02-20 17:44:05 -03:00
2021-01-31 11:05:09 -03:00
go func() {
for {
typ, message, err := conn.socket.ReadMessage()
2021-01-31 11:05:09 -03:00
if err != nil {
log.Println("read error: ", err)
return
}
if typ == websocket.PingMessage {
conn.WriteMessage(websocket.PongMessage, nil)
2022-02-19 09:56:15 +01:00
continue
2021-01-31 11:05:09 -03:00
}
if typ != websocket.TextMessage || len(message) == 0 || message[0] != '[' {
continue
}
2021-02-20 17:44:05 -03:00
var jsonMessage []json.RawMessage
err = json.Unmarshal(message, &jsonMessage)
if err != nil {
continue
}
if len(jsonMessage) < 2 {
continue
}
var label string
json.Unmarshal(jsonMessage[0], &label)
switch label {
case "NOTICE":
var content string
json.Unmarshal(jsonMessage[1], &content)
r.Notices <- &NoticeMessage{
Relay: nm,
Message: content,
}
case "EVENT":
if len(jsonMessage) < 3 {
2021-01-31 11:05:09 -03:00
continue
}
2021-02-20 17:44:05 -03:00
var channel string
json.Unmarshal(jsonMessage[1], &channel)
if subscription, ok := r.subscriptions.Load(channel); ok {
2022-01-02 08:44:18 -03:00
var event Event
2021-02-20 17:44:05 -03:00
json.Unmarshal(jsonMessage[2], &event)
// check signature of all received events, ignore invalid
ok, err := event.CheckSignature()
2021-02-20 17:44:05 -03:00
if !ok {
errmsg := ""
if err != nil {
errmsg = err.Error()
}
log.Printf("bad signature: %s", errmsg)
2021-02-20 17:44:05 -03:00
continue
}
// check if the event matches the desired filter, ignore otherwise
if !subscription.filters.Match(&event) {
continue
}
if !subscription.stopped {
subscription.Events <- EventMessage{
Relay: nm,
Event: event,
}
2021-02-20 17:44:05 -03:00
}
}
2021-01-31 11:05:09 -03:00
}
}
}()
2021-02-07 07:56:55 -03:00
return nil
2021-01-31 11:05:09 -03:00
}
// Remove removes a relay from the pool.
func (r *RelayPool) Remove(url string) {
2022-01-02 08:44:18 -03:00
nm := NormalizeURL(url)
2021-02-20 17:44:05 -03:00
r.subscriptions.Range(func(_ string, sub *Subscription) bool {
2021-02-20 17:44:05 -03:00
sub.removeRelay(nm)
return true
})
if conn, ok := r.websockets.Load(nm); ok {
2021-01-31 11:05:09 -03:00
conn.Close()
}
2021-02-20 17:44:05 -03:00
r.Relays.Delete(nm)
r.websockets.Delete(nm)
2021-01-31 11:05:09 -03:00
}
func (r *RelayPool) Sub(filters Filters) *Subscription {
2021-02-20 17:44:05 -03:00
random := make([]byte, 7)
rand.Read(random)
2021-01-31 11:05:09 -03:00
subscription := Subscription{}
2021-02-20 17:44:05 -03:00
subscription.channel = hex.EncodeToString(random)
subscription.relays = s.MapOf[string, *Connection]{}
r.Relays.Range(func(relay string, policy RelayPoolPolicy) bool {
if policy.ShouldRead(filters) {
if ws, ok := r.websockets.Load(relay); ok {
subscription.relays.Store(relay, ws)
}
2021-02-07 07:56:55 -03:00
}
return true
})
2021-02-20 17:44:05 -03:00
subscription.Events = make(chan EventMessage)
2022-01-02 08:44:18 -03:00
subscription.UniqueEvents = make(chan Event)
r.subscriptions.Store(subscription.channel, &subscription)
2021-01-31 11:05:09 -03:00
subscription.Sub(filters)
2021-02-20 17:44:05 -03:00
return &subscription
2021-01-31 11:05:09 -03:00
}
2022-01-02 08:44:18 -03:00
func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {
status := make(chan PublishStatus, 1)
2021-01-31 11:05:09 -03:00
if r.SecretKey == nil && (evt.PubKey == "" || evt.Sig == "") {
2021-02-20 17:44:05 -03:00
return nil, status, errors.New("PublishEvent needs either a signed event to publish or to have been configured with a .SecretKey.")
2021-01-31 11:05:09 -03:00
}
if evt.PubKey == "" {
sk, err := GetPublicKey(*r.SecretKey)
if err != nil {
return nil, status, fmt.Errorf("The pool's global SecretKey is invalid: %w", err)
}
evt.PubKey = sk
}
2021-01-31 11:05:09 -03:00
if evt.Sig == "" {
2021-02-07 07:56:55 -03:00
err := evt.Sign(*r.SecretKey)
if err != nil {
2021-02-20 17:44:05 -03:00
return nil, status, fmt.Errorf("Error signing event: %w", err)
2021-02-07 07:56:55 -03:00
}
2021-01-31 11:05:09 -03:00
}
r.websockets.Range(func(relay string, conn *Connection) bool {
if r, ok := r.Relays.Load(relay); !ok || !r.ShouldWrite(evt) {
return true
}
go func(relay string, conn *Connection) {
err := conn.WriteJSON([]interface{}{"EVENT", evt})
2021-02-20 17:44:05 -03:00
if err != nil {
log.Printf("error sending event to '%s': %s", relay, err.Error())
status <- PublishStatus{relay, PublishStatusFailed}
}
status <- PublishStatus{relay, PublishStatusSent}
subscription := r.Sub(Filters{Filter{IDs: []string{evt.ID}}})
for {
select {
case event := <-subscription.UniqueEvents:
if event.ID == evt.ID {
status <- PublishStatus{relay, PublishStatusSucceeded}
break
} else {
continue
}
case <-time.After(5 * time.Second):
break
}
break
}
2021-02-20 17:44:05 -03:00
}(relay, conn)
return true
})
2021-01-31 11:05:09 -03:00
2021-02-20 17:44:05 -03:00
return evt, status, nil
2021-01-31 11:05:09 -03:00
}