mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-04-11 13:19:54 +02:00
remove relaypool because it is considered harmful.
This commit is contained in:
parent
4dbbcec80a
commit
c18de89dd3
228
relaypool.go
228
relaypool.go
@ -1,228 +0,0 @@
|
||||
package nostr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
s "github.com/SaveTheRbtz/generic-sync-map-go"
|
||||
)
|
||||
|
||||
type PublishStatus struct {
|
||||
Relay string
|
||||
Status Status
|
||||
}
|
||||
|
||||
type RelayPool struct {
|
||||
SecretKey *string
|
||||
|
||||
Policies s.MapOf[string, RelayPoolPolicy]
|
||||
Relays s.MapOf[string, *Relay]
|
||||
subscriptions s.MapOf[string, Filters]
|
||||
eventStreams s.MapOf[string, chan EventMessage]
|
||||
|
||||
Notices chan *NoticeMessage
|
||||
}
|
||||
|
||||
type RelayPoolPolicy interface {
|
||||
ShouldRead(Filters) bool
|
||||
ShouldWrite(*Event) bool
|
||||
}
|
||||
|
||||
type SimplePolicy struct {
|
||||
Read bool
|
||||
Write bool
|
||||
}
|
||||
|
||||
func (s SimplePolicy) ShouldRead(_ Filters) bool {
|
||||
return s.Read
|
||||
}
|
||||
|
||||
func (s SimplePolicy) ShouldWrite(_ *Event) bool {
|
||||
return s.Write
|
||||
}
|
||||
|
||||
type NoticeMessage struct {
|
||||
Message string
|
||||
Relay string
|
||||
}
|
||||
|
||||
// New creates a new RelayPool with no relays in it
|
||||
func NewRelayPool() *RelayPool {
|
||||
return &RelayPool{
|
||||
Policies: s.MapOf[string, RelayPoolPolicy]{},
|
||||
Relays: s.MapOf[string, *Relay]{},
|
||||
|
||||
Notices: make(chan *NoticeMessage),
|
||||
}
|
||||
}
|
||||
|
||||
// Add calls AddContext with background context in a separate goroutine, sending
|
||||
// any connection error over the returned channel.
|
||||
//
|
||||
// The returned channel is closed once the connection is successfully
|
||||
// established or RelayConnectContext returned an error.
|
||||
func (r *RelayPool) Add(url string, policy RelayPoolPolicy) <-chan error {
|
||||
cherr := make(chan error)
|
||||
go func() {
|
||||
defer close(cherr)
|
||||
if err := r.AddContext(context.Background(), url, policy); err != nil {
|
||||
cherr <- err
|
||||
}
|
||||
}()
|
||||
return cherr
|
||||
}
|
||||
|
||||
// AddContext connects to a relay at a canonical version specified by the url
|
||||
// and adds it to the pool. The returned error is non-nil only on connection
|
||||
// errors, including an expired context before the connection is complete.
|
||||
//
|
||||
// Once successfully connected, AddContext returns and the context expiration
|
||||
// has no effect: call r.Remove to close the connection and delete a relay from the pool.
|
||||
func (r *RelayPool) AddContext(ctx context.Context, url string, policy RelayPoolPolicy) error {
|
||||
relay, err := RelayConnectContext(ctx, url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to %s: %w", url, err)
|
||||
}
|
||||
if policy == nil {
|
||||
policy = SimplePolicy{Read: true, Write: true}
|
||||
}
|
||||
r.addConnected(relay, policy)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RelayPool) addConnected(relay *Relay, policy RelayPoolPolicy) {
|
||||
r.Policies.Store(relay.URL, policy)
|
||||
r.Relays.Store(relay.URL, relay)
|
||||
|
||||
r.subscriptions.Range(func(id string, filters Filters) bool {
|
||||
sub := relay.prepareSubscription(id)
|
||||
sub.Sub(filters)
|
||||
eventStream, _ := r.eventStreams.Load(id)
|
||||
|
||||
go func(sub *Subscription) {
|
||||
for evt := range sub.Events {
|
||||
eventStream <- EventMessage{Relay: relay.URL, Event: evt}
|
||||
}
|
||||
}(sub)
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// Remove removes a relay from the pool.
|
||||
func (r *RelayPool) Remove(url string) {
|
||||
nm := NormalizeURL(url)
|
||||
|
||||
r.Relays.Delete(nm)
|
||||
r.Policies.Delete(nm)
|
||||
|
||||
if relay, ok := r.Relays.Load(nm); ok {
|
||||
relay.Close()
|
||||
}
|
||||
}
|
||||
|
||||
//Sub subscribes to events matching the passed filters and returns the subscription ID,
|
||||
//a channel which you should pass into Unique to get unique events, and a function which
|
||||
//you should call to clean up and close your subscription so that the relay doesn't block you.
|
||||
func (r *RelayPool) Sub(filters Filters) (subID string, events chan EventMessage, unsubscribe func()) {
|
||||
random := make([]byte, 7)
|
||||
rand.Read(random)
|
||||
id := hex.EncodeToString(random)
|
||||
|
||||
r.subscriptions.Store(id, filters)
|
||||
eventStream := make(chan EventMessage)
|
||||
r.eventStreams.Store(id, eventStream)
|
||||
unsub := make(chan struct{})
|
||||
|
||||
r.Relays.Range(func(_ string, relay *Relay) bool {
|
||||
sub := relay.prepareSubscription(id)
|
||||
sub.Sub(filters)
|
||||
|
||||
go func(sub *Subscription) {
|
||||
for evt := range sub.Events {
|
||||
eventStream <- EventMessage{Relay: relay.URL, Event: evt}
|
||||
}
|
||||
}(sub)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-unsub:
|
||||
sub.Unsub()
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return id, eventStream, func() { gracefulClose(unsub) }
|
||||
}
|
||||
|
||||
func gracefulClose(c chan struct{}) {
|
||||
select {
|
||||
case <-c:
|
||||
default:
|
||||
close(c)
|
||||
}
|
||||
}
|
||||
|
||||
func Unique(all chan EventMessage) chan Event {
|
||||
uniqueEvents := make(chan Event)
|
||||
emittedAlready := s.MapOf[string, struct{}]{}
|
||||
|
||||
go func() {
|
||||
for eventMessage := range all {
|
||||
if _, ok := emittedAlready.LoadOrStore(eventMessage.Event.ID, struct{}{}); !ok {
|
||||
uniqueEvents <- eventMessage.Event
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return uniqueEvents
|
||||
}
|
||||
|
||||
func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error) {
|
||||
size := 0
|
||||
r.Relays.Range(func(_ string, _ *Relay) bool {
|
||||
size++
|
||||
return true
|
||||
})
|
||||
status := make(chan PublishStatus, size)
|
||||
|
||||
if r.SecretKey == nil && (evt.PubKey == "" || evt.Sig == "") {
|
||||
return nil, status, errors.New("PublishEvent needs either a signed event to publish or to have been configured with a .SecretKey.")
|
||||
}
|
||||
|
||||
if evt.PubKey == "" {
|
||||
sk, err := GetPublicKey(*r.SecretKey)
|
||||
if err != nil {
|
||||
return nil, status, fmt.Errorf("The pool's global SecretKey is invalid: %w", err)
|
||||
}
|
||||
evt.PubKey = sk
|
||||
}
|
||||
|
||||
if evt.Sig == "" {
|
||||
err := evt.Sign(*r.SecretKey)
|
||||
if err != nil {
|
||||
return nil, status, fmt.Errorf("Error signing event: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
r.Relays.Range(func(url string, relay *Relay) bool {
|
||||
if r, ok := r.Policies.Load(url); !ok || !r.ShouldWrite(evt) {
|
||||
return true
|
||||
}
|
||||
|
||||
go func(relay *Relay) {
|
||||
for resultStatus := range relay.Publish(*evt) {
|
||||
status <- PublishStatus{relay.URL, resultStatus}
|
||||
}
|
||||
}(relay)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return evt, status, nil
|
||||
}
|
@ -1,140 +0,0 @@
|
||||
package nostr
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
func TestRelayPoolSubUnique(t *testing.T) {
|
||||
// prepare test notes to send to a client subs
|
||||
priv, pub := makeKeyPair(t)
|
||||
notesMap := make(map[string]Event)
|
||||
notesFilter := Filter{}
|
||||
for i := 0; i < 10; i++ {
|
||||
note := Event{
|
||||
Kind: 1,
|
||||
Content: fmt.Sprintf("hello %d", i),
|
||||
CreatedAt: time.Unix(1672068534+int64(i), 0),
|
||||
PubKey: pub,
|
||||
}
|
||||
mustSignEvent(t, priv, ¬e)
|
||||
notesMap[note.ID] = note
|
||||
notesFilter.IDs = append(notesFilter.IDs, note.ID)
|
||||
}
|
||||
|
||||
var mu sync.Mutex // guards subscribed and seenSubID to satisfy go test -race
|
||||
var (
|
||||
subscribed1, subscribed2 bool
|
||||
seenSubID1, seenSubID2 string
|
||||
)
|
||||
|
||||
// fake relay server 1
|
||||
ws1 := newWebsocketServer(func(conn *websocket.Conn) {
|
||||
mu.Lock()
|
||||
subscribed1 = true
|
||||
mu.Unlock()
|
||||
// verify the client sent a good sub request
|
||||
var raw []json.RawMessage
|
||||
if err := websocket.JSON.Receive(conn, &raw); err != nil {
|
||||
t.Errorf("ws1: websocket.JSON.Receive: %v", err)
|
||||
}
|
||||
subid, filters := parseSubscriptionMessage(t, raw)
|
||||
seenSubID1 = subid
|
||||
if len(filters) != 1 || !FilterEqual(filters[0], notesFilter) {
|
||||
t.Errorf("ws1: client sent filters:\n%+v\nwant:\n%+v", filters, Filters{notesFilter})
|
||||
}
|
||||
// send back all the notes
|
||||
for id, note := range notesMap {
|
||||
if err := websocket.JSON.Send(conn, []any{"EVENT", subid, note}); err != nil {
|
||||
t.Errorf("ws1: %s: websocket.JSON.Send: %v", id, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
defer ws1.Close()
|
||||
|
||||
// fake relay server 2
|
||||
ws2 := newWebsocketServer(func(conn *websocket.Conn) {
|
||||
mu.Lock()
|
||||
subscribed2 = true
|
||||
mu.Unlock()
|
||||
// verify the client sent a good sub request
|
||||
var raw []json.RawMessage
|
||||
if err := websocket.JSON.Receive(conn, &raw); err != nil {
|
||||
t.Errorf("ws2: websocket.JSON.Receive: %v", err)
|
||||
}
|
||||
subid, filters := parseSubscriptionMessage(t, raw)
|
||||
seenSubID2 = subid
|
||||
if len(filters) != 1 || !FilterEqual(filters[0], notesFilter) {
|
||||
t.Errorf("ws2: client sent filters:\n%+v\nwant:\n%+v", filters, Filters{notesFilter})
|
||||
}
|
||||
// send back all the notes
|
||||
for id, note := range notesMap {
|
||||
if err := websocket.JSON.Send(conn, []any{"EVENT", subid, note}); err != nil {
|
||||
t.Errorf("ws2: %s: websocket.JSON.Send: %v", id, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
defer ws2.Close()
|
||||
|
||||
// connect a client, sub and verify it receives all events without duplicates
|
||||
pool := mustRelayPoolConnect(ws1.URL, ws2.URL)
|
||||
subid, ch, _ := pool.Sub(Filters{notesFilter})
|
||||
uniq := Unique(ch)
|
||||
|
||||
seen := make(map[string]bool)
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case event := <-uniq:
|
||||
wantNote, ok := notesMap[event.ID]
|
||||
if !ok {
|
||||
t.Errorf("received unknown event: %+v", event)
|
||||
continue
|
||||
}
|
||||
if seen[event.ID] {
|
||||
t.Errorf("client already seen event %s", event.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(event.Serialize(), wantNote.Serialize()) {
|
||||
t.Errorf("received event:\n%+v\nwant:\n%+v", event, wantNote)
|
||||
}
|
||||
seen[event.ID] = true
|
||||
if len(seen) == len(notesMap) {
|
||||
break loop
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Errorf("took too long to receive from sub; seen %d out of %d events", len(seen), len(notesMap))
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if !subscribed1 || !subscribed2 {
|
||||
t.Errorf("subscribed1=%v subscribed2=%v; want both true", subscribed1, subscribed2)
|
||||
}
|
||||
if seenSubID1 != subid || seenSubID2 != subid {
|
||||
t.Errorf("relay saw seenSubID1=%q seenSubID2=%q; want %q", seenSubID1, seenSubID2, subid)
|
||||
}
|
||||
}
|
||||
|
||||
func mustRelayPoolConnect(url ...string) *RelayPool {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
pool := NewRelayPool()
|
||||
readwrite := SimplePolicy{Read: true, Write: true}
|
||||
for _, u := range url {
|
||||
if err := pool.AddContext(ctx, u, readwrite); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
}
|
||||
return pool
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user