From 28663f21f0602daf61a9b08f94fd398332ade6b5 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 26 Dec 2022 19:54:37 +0100 Subject: [PATCH] relaypool: add a unique events subscription smoke test was trying to reproduce the issue described in https://github.com/nbd-wtf/go-nostr/issues/23 no success in reproducing that specific problem, but i believe the test can still be useful to help in avoiding regression bugs in the future. --- event_test.go | 7 +++ relay_test.go | 25 +++++++++ relaypool_test.go | 140 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 relaypool_test.go diff --git a/event_test.go b/event_test.go index 1de9366..0ca82bf 100644 --- a/event_test.go +++ b/event_test.go @@ -85,3 +85,10 @@ func TestEventSerialization(t *testing.T) { } } } + +func mustSignEvent(t *testing.T, privkey string, event *Event) { + t.Helper() + if err := event.Sign(privkey); err != nil { + t.Fatalf("event.Sign: %v", err) + } +} diff --git a/relay_test.go b/relay_test.go index 4f377a1..0801733 100644 --- a/relay_test.go +++ b/relay_test.go @@ -187,6 +187,31 @@ func parseEventMessage(t *testing.T, raw []json.RawMessage) Event { return event } +func parseSubscriptionMessage(t *testing.T, raw []json.RawMessage) (subid string, filters []Filter) { + t.Helper() + if len(raw) < 3 { + t.Fatalf("len(raw) = %d; want at least 3", len(raw)) + } + var typ string + json.Unmarshal(raw[0], &typ) + if typ != "REQ" { + t.Errorf("typ = %q; want REQ", typ) + } + var id string + if err := json.Unmarshal(raw[1], &id); err != nil { + t.Errorf("json.Unmarshal sub id: %v", err) + } + var ff []Filter + for i, b := range raw[2:] { + var f Filter + if err := json.Unmarshal(b, &f); err != nil { + t.Errorf("json.Unmarshal filter %d: %v", i, err) + } + ff = append(ff, f) + } + return id, ff +} + func testPublishStatus(t *testing.T, ch <-chan Status, want map[Status]bool) { for stat := range ch { if !want[stat] { diff --git a/relaypool_test.go b/relaypool_test.go new file mode 100644 index 0000000..7920a38 --- /dev/null +++ b/relaypool_test.go @@ -0,0 +1,140 @@ +package nostr + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sync" + "testing" + "time" + + "golang.org/x/net/websocket" +) + +func TestRelayPoolSubUnique(t *testing.T) { + // prepare test notes to send to a client subs + priv, pub := makeKeyPair(t) + notesMap := make(map[string]Event) + notesFilter := Filter{} + for i := 0; i < 10; i++ { + note := Event{ + Kind: 1, + Content: fmt.Sprintf("hello %d", i), + CreatedAt: time.Unix(1672068534+int64(i), 0), + PubKey: pub, + } + mustSignEvent(t, priv, ¬e) + notesMap[note.ID] = note + notesFilter.IDs = append(notesFilter.IDs, note.ID) + } + + var mu sync.Mutex // guards subscribed and seenSubID to satisfy go test -race + var ( + subscribed1, subscribed2 bool + seenSubID1, seenSubID2 string + ) + + // fake relay server 1 + ws1 := newWebsocketServer(func(conn *websocket.Conn) { + mu.Lock() + subscribed1 = true + mu.Unlock() + // verify the client sent a good sub request + var raw []json.RawMessage + if err := websocket.JSON.Receive(conn, &raw); err != nil { + t.Errorf("ws1: websocket.JSON.Receive: %v", err) + } + subid, filters := parseSubscriptionMessage(t, raw) + seenSubID1 = subid + if len(filters) != 1 || !FilterEqual(filters[0], notesFilter) { + t.Errorf("ws1: client sent filters:\n%+v\nwant:\n%+v", filters, Filters{notesFilter}) + } + // send back all the notes + for id, note := range notesMap { + if err := websocket.JSON.Send(conn, []any{"EVENT", subid, note}); err != nil { + t.Errorf("ws1: %s: websocket.JSON.Send: %v", id, err) + } + } + }) + defer ws1.Close() + + // fake relay server 2 + ws2 := newWebsocketServer(func(conn *websocket.Conn) { + mu.Lock() + subscribed2 = true + mu.Unlock() + // verify the client sent a good sub request + var raw []json.RawMessage + if err := websocket.JSON.Receive(conn, &raw); err != nil { + t.Errorf("ws2: websocket.JSON.Receive: %v", err) + } + subid, filters := parseSubscriptionMessage(t, raw) + seenSubID2 = subid + if len(filters) != 1 || !FilterEqual(filters[0], notesFilter) { + t.Errorf("ws2: client sent filters:\n%+v\nwant:\n%+v", filters, Filters{notesFilter}) + } + // send back all the notes + for id, note := range notesMap { + if err := websocket.JSON.Send(conn, []any{"EVENT", subid, note}); err != nil { + t.Errorf("ws2: %s: websocket.JSON.Send: %v", id, err) + } + } + }) + defer ws2.Close() + + // connect a client, sub and verify it receives all events without duplicates + pool := mustRelayPoolConnect(ws1.URL, ws2.URL) + subid, ch, _ := pool.Sub(Filters{notesFilter}) + uniq := Unique(ch) + + seen := make(map[string]bool) +loop: + for { + select { + case event := <-uniq: + wantNote, ok := notesMap[event.ID] + if !ok { + t.Errorf("received unknown event: %+v", event) + continue + } + if seen[event.ID] { + t.Errorf("client already seen event %s", event.ID) + continue + } + + if !bytes.Equal(event.Serialize(), wantNote.Serialize()) { + t.Errorf("received event:\n%+v\nwant:\n%+v", event, wantNote) + } + seen[event.ID] = true + if len(seen) == len(notesMap) { + break loop + } + case <-time.After(2 * time.Second): + t.Errorf("took too long to receive from sub; seen %d out of %d events", len(seen), len(notesMap)) + break loop + } + } + + mu.Lock() + defer mu.Unlock() + if !subscribed1 || !subscribed2 { + t.Errorf("subscribed1=%v subscribed2=%v; want both true", subscribed1, subscribed2) + } + if seenSubID1 != subid || seenSubID2 != subid { + t.Errorf("relay saw seenSubID1=%q seenSubID2=%q; want %q", seenSubID1, seenSubID2, subid) + } +} + +func mustRelayPoolConnect(url ...string) *RelayPool { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + pool := NewRelayPool() + readwrite := SimplePolicy{Read: true, Write: true} + for _, u := range url { + if err := pool.AddContext(ctx, u, readwrite); err != nil { + panic(err.Error()) + } + } + return pool +}