mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-07 22:16:46 +02:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7056f1515 | ||
|
|
4e0971bafe | ||
|
|
07ea3a35ec |
22
listener.go
22
listener.go
@@ -67,12 +67,13 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
||||
|
||||
if specs, ok := rl.clients[ws]; ok {
|
||||
// swap delete specs that match this id
|
||||
nswaps := 0
|
||||
for s, spec := range specs {
|
||||
for s := len(specs) - 1; s >= 0; s-- {
|
||||
spec := specs[s]
|
||||
if spec.id == id {
|
||||
spec.cancel(ErrSubscriptionClosedByClient)
|
||||
specs[s] = specs[len(specs)-1-nswaps]
|
||||
nswaps++
|
||||
specs[s] = specs[len(specs)-1]
|
||||
specs = specs[0 : len(specs)-1]
|
||||
rl.clients[ws] = specs
|
||||
|
||||
// swap delete listeners one at a time, as they may be each in a different subrelay
|
||||
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
|
||||
@@ -82,7 +83,8 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
||||
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||
srl.listeners[spec.index] = moved
|
||||
|
||||
// update the index of the listener we just moved
|
||||
// now we must update the the listener we just moved
|
||||
// so its .index reflects its new position on srl.listeners
|
||||
movedSpecs := rl.clients[moved.ws]
|
||||
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||
return ls.index == movedFromIndex
|
||||
@@ -93,7 +95,6 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
||||
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
|
||||
}
|
||||
}
|
||||
rl.clients[ws] = specs[0 : len(specs)-nswaps]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +103,7 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
||||
defer rl.clientsMutex.Unlock()
|
||||
if specs, ok := rl.clients[ws]; ok {
|
||||
// swap delete listeners and delete client (all specs will be deleted)
|
||||
for _, spec := range specs {
|
||||
for s, spec := range specs {
|
||||
// no need to cancel contexts since they inherit from the main connection context
|
||||
// just delete the listeners (swap-delete)
|
||||
srl := spec.subrelay
|
||||
@@ -112,7 +113,12 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
||||
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||
srl.listeners[spec.index] = moved
|
||||
|
||||
// update the index of the listener we just moved
|
||||
// temporarily update the spec of the listener being removed to have index == -1
|
||||
// (since it was removed) so it doesn't match in the search below
|
||||
rl.clients[ws][s].index = -1
|
||||
|
||||
// now we must update the the listener we just moved
|
||||
// so its .index reflects its new position on srl.listeners
|
||||
movedSpecs := rl.clients[moved.ws]
|
||||
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||
return ls.index == movedFromIndex
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
@@ -404,3 +405,99 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
|
||||
}, rlz.listeners)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRandomListenerClientRemoving(t *testing.T) {
|
||||
rl := NewRelay()
|
||||
|
||||
f := nostr.Filter{Kinds: []int{1}}
|
||||
cancel := func(cause error) {}
|
||||
|
||||
websockets := make([]*WebSocket, 0, 20)
|
||||
|
||||
l := 0
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
ws := &WebSocket{}
|
||||
websockets = append(websockets, ws)
|
||||
rl.clients[ws] = nil
|
||||
}
|
||||
|
||||
for j := 0; j < 20; j++ {
|
||||
for i := 0; i < 20; i++ {
|
||||
ws := websockets[i]
|
||||
w := string(rune(i + 65))
|
||||
|
||||
if rand.Intn(2) < 1 {
|
||||
l++
|
||||
rl.addListener(ws, w+":"+string(rune(j+97)), rl, f, cancel)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, rl.clients, 20)
|
||||
require.Len(t, rl.listeners, l)
|
||||
|
||||
for ws := range rl.clients {
|
||||
rl.removeClientAndListeners(ws)
|
||||
}
|
||||
|
||||
require.Len(t, rl.clients, 0)
|
||||
require.Len(t, rl.listeners, 0)
|
||||
}
|
||||
|
||||
func TestRandomListenerIdRemoving(t *testing.T) {
|
||||
rl := NewRelay()
|
||||
|
||||
f := nostr.Filter{Kinds: []int{1}}
|
||||
cancel := func(cause error) {}
|
||||
|
||||
websockets := make([]*WebSocket, 0, 20)
|
||||
|
||||
type wsid struct {
|
||||
ws *WebSocket
|
||||
id string
|
||||
}
|
||||
|
||||
subs := make([]wsid, 0, 20*20)
|
||||
extra := 0
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
ws := &WebSocket{}
|
||||
websockets = append(websockets, ws)
|
||||
rl.clients[ws] = nil
|
||||
}
|
||||
|
||||
for j := 0; j < 20; j++ {
|
||||
for i := 0; i < 20; i++ {
|
||||
ws := websockets[i]
|
||||
w := string(rune(i + 65))
|
||||
|
||||
if rand.Intn(2) < 1 {
|
||||
id := w + ":" + string(rune(j+97))
|
||||
rl.addListener(ws, id, rl, f, cancel)
|
||||
subs = append(subs, wsid{ws, id})
|
||||
|
||||
if rand.Intn(5) < 1 {
|
||||
rl.addListener(ws, id, rl, f, cancel)
|
||||
extra++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, rl.clients, 20)
|
||||
require.Len(t, rl.listeners, len(subs)+extra)
|
||||
|
||||
rand.Shuffle(len(subs), func(i, j int) {
|
||||
subs[i], subs[j] = subs[j], subs[i]
|
||||
})
|
||||
for _, wsidToRemove := range subs {
|
||||
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
|
||||
}
|
||||
|
||||
require.Len(t, rl.listeners, 0)
|
||||
require.Len(t, rl.clients, 20)
|
||||
for _, specs := range rl.clients {
|
||||
require.Len(t, specs, 0)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user