Compare commits

..

1 Commits

Author SHA1 Message Date
fiatjaf
aa9e53b61d wip 2024-08-01 15:36:14 -03:00
2 changed files with 32 additions and 266 deletions

View File

@@ -67,13 +67,12 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
if specs, ok := rl.clients[ws]; ok {
// swap delete specs that match this id
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
nswaps := 0
for s, spec := range specs {
if spec.id == id {
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
specs[s] = specs[len(specs)-1-nswaps]
nswaps++
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
@@ -83,8 +82,7 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
// update the index of the listener we just moved
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex
@@ -95,6 +93,7 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
}
}
rl.clients[ws] = specs[0 : len(specs)-nswaps]
}
}
@@ -103,7 +102,7 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client (all specs will be deleted)
for s, spec := range specs {
for _, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners (swap-delete)
srl := spec.subrelay
@@ -113,12 +112,7 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// temporarily update the spec of the listener being removed to have index == -1
// (since it was removed) so it doesn't match in the search below
rl.clients[ws][s].index = -1
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
// update the index of the listener we just moved
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex

View File

@@ -1,7 +1,6 @@
package khatru
import (
"math/rand"
"testing"
"github.com/nbd-wtf/go-nostr"
@@ -229,275 +228,48 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 3, rlx},
{"f", cancel, 2, rly},
{"f", cancel, 3, rly},
},
ws4: {
{"d", cancel, 1, rlx},
{"e", cancel, 2, rlx},
{"e", cancel, 1, rly},
{"e", cancel, 2, rly},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"b", f2, ws2},
{"a", f3, ws3},
{"d", f3, ws4},
{"e", f3, ws4},
{"a", f3, ws3},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rlz.listeners)
}, rl.listeners)
})
t.Run("removing a subscription id", func(t *testing.T) {
// removing 'd' from ws4
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws4, "d")
// t.Run("removing a client", func(t *testing.T) {
// rl.removeClientAndListeners(ws2)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 2, rlz},
},
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 1, rlx},
{"f", cancel, 2, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 2, rlx},
},
}, rl.clients)
// require.Equal(t, map[*WebSocket][]listenerSpec{
// ws1: {
// {"c", cancel, 0, rl},
// },
// ws3: {
// {"a", cancel, 2, rl},
// },
// ws4: {
// {"d", cancel, 1, rl},
// },
// }, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"a", f3, ws3},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
// removing 'a' from ws3
rl.clients[ws3][0].cancel = func(cause error) {} // set since removing will call it
rl.clients[ws3][1].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws3, "a")
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 2, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f2, ws2},
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing a connection", func(t *testing.T) {
rl.removeClientAndListeners(ws2)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 0, rly},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
{"e", f3, ws4},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
// removing 'e' from ws4
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
rl.clients[ws4][1].cancel = func(cause error) {} // set since removing will call it
rl.removeListenerId(ws4, "e")
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
},
ws3: {
{"f", cancel, 0, rly},
},
ws4: {},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
}
func TestRandomListenerClientRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
l := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
for j := 0; j < 20; j++ {
for i := 0; i < 20; i++ {
ws := websockets[i]
w := string(rune(i + 65))
if rand.Intn(2) < 1 {
l++
rl.addListener(ws, w+":"+string(rune(j+97)), rl, f, cancel)
}
}
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, l)
for ws := range rl.clients {
rl.removeClientAndListeners(ws)
}
require.Len(t, rl.clients, 0)
require.Len(t, rl.listeners, 0)
}
func TestRandomListenerIdRemoving(t *testing.T) {
rl := NewRelay()
f := nostr.Filter{Kinds: []int{1}}
cancel := func(cause error) {}
websockets := make([]*WebSocket, 0, 20)
type wsid struct {
ws *WebSocket
id string
}
subs := make([]wsid, 0, 20*20)
extra := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
for j := 0; j < 20; j++ {
for i := 0; i < 20; i++ {
ws := websockets[i]
w := string(rune(i + 65))
if rand.Intn(2) < 1 {
id := w + ":" + string(rune(j+97))
rl.addListener(ws, id, rl, f, cancel)
subs = append(subs, wsid{ws, id})
if rand.Intn(5) < 1 {
rl.addListener(ws, id, rl, f, cancel)
extra++
}
}
}
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, len(subs)+extra)
rand.Shuffle(len(subs), func(i, j int) {
subs[i], subs[j] = subs[j], subs[i]
})
for _, wsidToRemove := range subs {
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
}
require.Len(t, rl.listeners, 0)
require.Len(t, rl.clients, 20)
for _, specs := range rl.clients {
require.Len(t, specs, 0)
}
// require.Equal(t, []listener{
// {"c", f1, ws1},
// {"d", f3, ws4},
// {"a", f3, ws3},
// }, rl.listeners)
// })
}