Compare commits

..

4 Commits

2 changed files with 266 additions and 32 deletions

View File

@@ -67,12 +67,13 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
if specs, ok := rl.clients[ws]; ok {
// swap delete specs that match this id
nswaps := 0
for s, spec := range specs {
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1-nswaps]
nswaps++
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
@@ -82,7 +83,8 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// update the index of the listener we just moved
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex
@@ -93,7 +95,6 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
}
}
rl.clients[ws] = specs[0 : len(specs)-nswaps]
}
}
@@ -102,7 +103,7 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client (all specs will be deleted)
for _, spec := range specs {
for s, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners (swap-delete)
srl := spec.subrelay
@@ -112,7 +113,12 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// update the index of the listener we just moved
// temporarily update the spec of the listener being removed to have index == -1
// (since it was removed) so it doesn't match in the search below
rl.clients[ws][s].index = -1
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex

View File

@@ -1,6 +1,7 @@
package khatru
import (
"math/rand"
"testing"
"github.com/nbd-wtf/go-nostr"
@@ -228,48 +229,275 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 3, rlx},
{"f", cancel, 3, rly},
{"f", cancel, 2, rly},
},
ws4: {
{"d", cancel, 1, rlx},
{"e", cancel, 2, rlx},
{"e", cancel, 2, rly},
{"e", cancel, 1, rly},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"b", f2, ws2},
{"a", f3, ws3},
{"d", f3, ws4},
{"e", f3, ws4},
{"a", f3, ws3},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rl.listeners)
}, rlz.listeners)
})
// t.Run("removing a client", func(t *testing.T) {
// rl.removeClientAndListeners(ws2)
t.Run("removing a subscription id", func(t *testing.T) {
// removing 'd' from ws4
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws4, "d")
// require.Equal(t, map[*WebSocket][]listenerSpec{
// ws1: {
// {"c", cancel, 0, rl},
// },
// ws3: {
// {"a", cancel, 2, rl},
// },
// ws4: {
// {"d", cancel, 1, rl},
// },
// }, rl.clients)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 2, rlz},
},
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 1, rlx},
{"f", cancel, 2, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 2, rlx},
},
}, rl.clients)
// require.Equal(t, []listener{
// {"c", f1, ws1},
// {"d", f3, ws4},
// {"a", f3, ws3},
// }, rl.listeners)
// })
require.Equal(t, []listener{
{"c", f1, ws1},
{"a", f3, ws3},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
// removing 'a' from ws3
rl.clients[ws3][0].cancel = func(cause error) {} // set since removing will call it
rl.clients[ws3][1].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws3, "a")
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 2, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f2, ws2},
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing a connection", func(t *testing.T) {
rl.removeClientAndListeners(ws2)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 0, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
{"e", f3, ws4},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
// removing 'e' from ws4
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
rl.clients[ws4][1].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws4, "e")
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 0, rly},
},
ws4: {},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
}
func TestRandomListenerClientRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
l := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
for j := 0; j < 20; j++ {
for i := 0; i < 20; i++ {
ws := websockets[i]
w := string(rune(i + 65))
if rand.Intn(2) < 1 {
l++
rl.addListener(ws, w+":"+string(rune(j+97)), rl, f, cancel)
}
}
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, l)
for ws := range rl.clients {
rl.removeClientAndListeners(ws)
}
require.Len(t, rl.clients, 0)
require.Len(t, rl.listeners, 0)
}
func TestRandomListenerIdRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
type wsid struct {
ws *WebSocket
id string
}
subs := make([]wsid, 0, 20*20)
extra := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
for j := 0; j < 20; j++ {
for i := 0; i < 20; i++ {
ws := websockets[i]
w := string(rune(i + 65))
if rand.Intn(2) < 1 {
id := w + ":" + string(rune(j+97))
rl.addListener(ws, id, rl, f, cancel)
subs = append(subs, wsid{ws, id})
if rand.Intn(5) < 1 {
rl.addListener(ws, id, rl, f, cancel)
extra++
}
}
}
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, len(subs)+extra)
rand.Shuffle(len(subs), func(i, j int) {
subs[i], subs[j] = subs[j], subs[i]
})
for _, wsidToRemove := range subs {
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
}
require.Len(t, rl.listeners, 0)
require.Len(t, rl.clients, 20)
for _, specs := range rl.clients {
require.Len(t, specs, 0)
}
}