mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-18 19:36:54 +02:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7056f1515 | ||
|
|
4e0971bafe | ||
|
|
07ea3a35ec |
22
listener.go
22
listener.go
@@ -67,12 +67,13 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
|||||||
|
|
||||||
if specs, ok := rl.clients[ws]; ok {
|
if specs, ok := rl.clients[ws]; ok {
|
||||||
// swap delete specs that match this id
|
// swap delete specs that match this id
|
||||||
nswaps := 0
|
for s := len(specs) - 1; s >= 0; s-- {
|
||||||
for s, spec := range specs {
|
spec := specs[s]
|
||||||
if spec.id == id {
|
if spec.id == id {
|
||||||
spec.cancel(ErrSubscriptionClosedByClient)
|
spec.cancel(ErrSubscriptionClosedByClient)
|
||||||
specs[s] = specs[len(specs)-1-nswaps]
|
specs[s] = specs[len(specs)-1]
|
||||||
nswaps++
|
specs = specs[0 : len(specs)-1]
|
||||||
|
rl.clients[ws] = specs
|
||||||
|
|
||||||
// swap delete listeners one at a time, as they may be each in a different subrelay
|
// swap delete listeners one at a time, as they may be each in a different subrelay
|
||||||
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
|
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
|
||||||
@@ -82,7 +83,8 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
|||||||
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||||
srl.listeners[spec.index] = moved
|
srl.listeners[spec.index] = moved
|
||||||
|
|
||||||
// update the index of the listener we just moved
|
// now we must update the the listener we just moved
|
||||||
|
// so its .index reflects its new position on srl.listeners
|
||||||
movedSpecs := rl.clients[moved.ws]
|
movedSpecs := rl.clients[moved.ws]
|
||||||
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||||
return ls.index == movedFromIndex
|
return ls.index == movedFromIndex
|
||||||
@@ -93,7 +95,6 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
|||||||
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
|
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rl.clients[ws] = specs[0 : len(specs)-nswaps]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +103,7 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
|||||||
defer rl.clientsMutex.Unlock()
|
defer rl.clientsMutex.Unlock()
|
||||||
if specs, ok := rl.clients[ws]; ok {
|
if specs, ok := rl.clients[ws]; ok {
|
||||||
// swap delete listeners and delete client (all specs will be deleted)
|
// swap delete listeners and delete client (all specs will be deleted)
|
||||||
for _, spec := range specs {
|
for s, spec := range specs {
|
||||||
// no need to cancel contexts since they inherit from the main connection context
|
// no need to cancel contexts since they inherit from the main connection context
|
||||||
// just delete the listeners (swap-delete)
|
// just delete the listeners (swap-delete)
|
||||||
srl := spec.subrelay
|
srl := spec.subrelay
|
||||||
@@ -112,7 +113,12 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
|||||||
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||||
srl.listeners[spec.index] = moved
|
srl.listeners[spec.index] = moved
|
||||||
|
|
||||||
// update the index of the listener we just moved
|
// temporarily update the spec of the listener being removed to have index == -1
|
||||||
|
// (since it was removed) so it doesn't match in the search below
|
||||||
|
rl.clients[ws][s].index = -1
|
||||||
|
|
||||||
|
// now we must update the the listener we just moved
|
||||||
|
// so its .index reflects its new position on srl.listeners
|
||||||
movedSpecs := rl.clients[moved.ws]
|
movedSpecs := rl.clients[moved.ws]
|
||||||
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||||
return ls.index == movedFromIndex
|
return ls.index == movedFromIndex
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package khatru
|
package khatru
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@@ -404,3 +405,99 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
|
|||||||
}, rlz.listeners)
|
}, rlz.listeners)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRandomListenerClientRemoving(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
websockets := make([]*WebSocket, 0, 20)
|
||||||
|
|
||||||
|
l := 0
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < 20; j++ {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := string(rune(i + 65))
|
||||||
|
|
||||||
|
if rand.Intn(2) < 1 {
|
||||||
|
l++
|
||||||
|
rl.addListener(ws, w+":"+string(rune(j+97)), rl, f, cancel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
require.Len(t, rl.listeners, l)
|
||||||
|
|
||||||
|
for ws := range rl.clients {
|
||||||
|
rl.removeClientAndListeners(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 0)
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRandomListenerIdRemoving(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
websockets := make([]*WebSocket, 0, 20)
|
||||||
|
|
||||||
|
type wsid struct {
|
||||||
|
ws *WebSocket
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
subs := make([]wsid, 0, 20*20)
|
||||||
|
extra := 0
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < 20; j++ {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := string(rune(i + 65))
|
||||||
|
|
||||||
|
if rand.Intn(2) < 1 {
|
||||||
|
id := w + ":" + string(rune(j+97))
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
subs = append(subs, wsid{ws, id})
|
||||||
|
|
||||||
|
if rand.Intn(5) < 1 {
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
extra++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
require.Len(t, rl.listeners, len(subs)+extra)
|
||||||
|
|
||||||
|
rand.Shuffle(len(subs), func(i, j int) {
|
||||||
|
subs[i], subs[j] = subs[j], subs[i]
|
||||||
|
})
|
||||||
|
for _, wsidToRemove := range subs {
|
||||||
|
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
for _, specs := range rl.clients {
|
||||||
|
require.Len(t, specs, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user