mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-24 22:07:59 +02:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8637afa38 | ||
|
|
e38036a5e6 | ||
|
|
6a9dcdcbd4 | ||
|
|
5ae4d1194a | ||
|
|
753a1191b9 | ||
|
|
3e6d763a79 | ||
|
|
3b03f6700c | ||
|
|
d7056f1515 | ||
|
|
4e0971bafe | ||
|
|
07ea3a35ec | ||
|
|
ef57f54a28 | ||
|
|
a103353254 | ||
|
|
5f0f9eec99 |
@@ -39,3 +39,21 @@ features:
|
|||||||
link: https://pkg.go.dev/github.com/fiatjaf/khatru
|
link: https://pkg.go.dev/github.com/fiatjaf/khatru
|
||||||
details: That means it is fast and lightweight, you can learn the language in 5 minutes and it builds your relay into a single binary that's easy to ship and deploy.
|
details: That means it is fast and lightweight, you can learn the language in 5 minutes and it builds your relay into a single binary that's easy to ship and deploy.
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## A glimpse of `khatru`'s power
|
||||||
|
|
||||||
|
It allows you to create a fully-functional relay in 7 lines of code:
|
||||||
|
|
||||||
|
```go
|
||||||
|
func main() {
|
||||||
|
relay := khatru.NewRelay()
|
||||||
|
db := badger.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"}
|
||||||
|
db.Init()
|
||||||
|
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
|
||||||
|
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||||
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
|
http.ListenAndServe(":3334", relay)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
After that you can customize it in infinite ways. See the links above.
|
||||||
|
|||||||
8
go.mod
8
go.mod
@@ -5,9 +5,10 @@ go 1.21.4
|
|||||||
require (
|
require (
|
||||||
github.com/fasthttp/websocket v1.5.7
|
github.com/fasthttp/websocket v1.5.7
|
||||||
github.com/fiatjaf/eventstore v0.5.1
|
github.com/fiatjaf/eventstore v0.5.1
|
||||||
github.com/nbd-wtf/go-nostr v0.34.3
|
github.com/nbd-wtf/go-nostr v0.34.5
|
||||||
github.com/puzpuzpuz/xsync/v3 v3.0.2
|
github.com/puzpuzpuz/xsync/v3 v3.0.2
|
||||||
github.com/rs/cors v1.7.0
|
github.com/rs/cors v1.7.0
|
||||||
|
github.com/stretchr/testify v1.9.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@@ -17,6 +18,7 @@ require (
|
|||||||
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
|
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
|
||||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
|
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
|
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||||
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
|
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
|
||||||
@@ -42,6 +44,7 @@ require (
|
|||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
github.com/mattn/go-sqlite3 v1.14.18 // indirect
|
github.com/mattn/go-sqlite3 v1.14.18 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
|
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
|
||||||
github.com/tidwall/gjson v1.17.0 // indirect
|
github.com/tidwall/gjson v1.17.0 // indirect
|
||||||
github.com/tidwall/match v1.1.1 // indirect
|
github.com/tidwall/match v1.1.1 // indirect
|
||||||
@@ -53,6 +56,5 @@ require (
|
|||||||
golang.org/x/net v0.18.0 // indirect
|
golang.org/x/net v0.18.0 // indirect
|
||||||
golang.org/x/sys v0.20.0 // indirect
|
golang.org/x/sys v0.20.0 // indirect
|
||||||
google.golang.org/protobuf v1.31.0 // indirect
|
google.golang.org/protobuf v1.31.0 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/nbd-wtf/go-nostr => ../go-nostr
|
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -105,6 +105,10 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
|||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
|
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
|
||||||
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||||
|
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||||
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
@@ -113,6 +117,8 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
|
|||||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||||
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
|
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
|
||||||
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||||
|
github.com/nbd-wtf/go-nostr v0.34.5 h1:vti8WqvGWbVoWAPniaz7li2TpCyC+7ZS62Gmy7ib/z0=
|
||||||
|
github.com/nbd-wtf/go-nostr v0.34.5/go.mod h1:NZQkxl96ggbO8rvDpVjcsojJqKTPwqhP4i82O7K5DJs=
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
@@ -222,6 +228,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
|
|||||||
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||||
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
|||||||
15
handlers.go
15
handlers.go
@@ -80,20 +80,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
cancel()
|
cancel()
|
||||||
conn.Close()
|
conn.Close()
|
||||||
|
|
||||||
rl.clientsMutex.Lock()
|
rl.removeClientAndListeners(ws)
|
||||||
defer rl.clientsMutex.Unlock()
|
|
||||||
if specs, ok := rl.clients[ws]; ok {
|
|
||||||
// swap delete listeners and delete client
|
|
||||||
for s, spec := range specs {
|
|
||||||
// no need to cancel contexts since they inherit from the main connection context
|
|
||||||
// just delete the listeners
|
|
||||||
srl := spec.subrelay
|
|
||||||
srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1]
|
|
||||||
specs[s] = specs[len(specs)-1]
|
|
||||||
srl.listeners = srl.listeners[0:len(srl.listeners)]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delete(rl.clients, ws)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
100
listener.go
100
listener.go
@@ -3,6 +3,7 @@ package khatru
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"slices"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
)
|
)
|
||||||
@@ -10,16 +11,16 @@ import (
|
|||||||
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
|
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
|
||||||
|
|
||||||
type listenerSpec struct {
|
type listenerSpec struct {
|
||||||
subscriptionId string // kept here so we can easily match against it removeListenerId
|
id string // kept here so we can easily match against it removeListenerId
|
||||||
cancel context.CancelCauseFunc
|
cancel context.CancelCauseFunc
|
||||||
index int
|
index int
|
||||||
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
|
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
|
||||||
}
|
}
|
||||||
|
|
||||||
type listener struct {
|
type listener struct {
|
||||||
subscriptionId string // duplicated here so we can easily send it on notifyListeners
|
id string // duplicated here so we can easily send it on notifyListeners
|
||||||
filter nostr.Filter
|
filter nostr.Filter
|
||||||
ws *WebSocket
|
ws *WebSocket
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rl *Relay) GetListeningFilters() []nostr.Filter {
|
func (rl *Relay) GetListeningFilters() []nostr.Filter {
|
||||||
@@ -45,15 +46,15 @@ func (rl *Relay) addListener(
|
|||||||
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
|
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
|
||||||
idx := len(subrelay.listeners)
|
idx := len(subrelay.listeners)
|
||||||
rl.clients[ws] = append(specs, listenerSpec{
|
rl.clients[ws] = append(specs, listenerSpec{
|
||||||
subscriptionId: id,
|
id: id,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
subrelay: subrelay,
|
subrelay: subrelay,
|
||||||
index: idx,
|
index: idx,
|
||||||
})
|
})
|
||||||
subrelay.listeners = append(subrelay.listeners, listener{
|
subrelay.listeners = append(subrelay.listeners, listener{
|
||||||
ws: ws,
|
ws: ws,
|
||||||
subscriptionId: id,
|
id: id,
|
||||||
filter: filter,
|
filter: filter,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -66,27 +67,80 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
|
|||||||
|
|
||||||
if specs, ok := rl.clients[ws]; ok {
|
if specs, ok := rl.clients[ws]; ok {
|
||||||
// swap delete specs that match this id
|
// swap delete specs that match this id
|
||||||
nswaps := 0
|
for s := len(specs) - 1; s >= 0; s-- {
|
||||||
for s, spec := range specs {
|
spec := specs[s]
|
||||||
if spec.subscriptionId == id {
|
if spec.id == id {
|
||||||
spec.cancel(ErrSubscriptionClosedByClient)
|
spec.cancel(ErrSubscriptionClosedByClient)
|
||||||
specs[s] = specs[len(specs)-1-nswaps]
|
specs[s] = specs[len(specs)-1]
|
||||||
nswaps++
|
specs = specs[0 : len(specs)-1]
|
||||||
|
rl.clients[ws] = specs
|
||||||
|
|
||||||
// swap delete listeners one at a time, as they may be each in a different subrelay
|
// swap delete listeners one at a time, as they may be each in a different subrelay
|
||||||
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
|
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
|
||||||
srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1]
|
|
||||||
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
|
if spec.index != len(srl.listeners)-1 {
|
||||||
|
movedFromIndex := len(srl.listeners) - 1
|
||||||
|
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||||
|
srl.listeners[spec.index] = moved
|
||||||
|
|
||||||
|
// now we must update the the listener we just moved
|
||||||
|
// so its .index reflects its new position on srl.listeners
|
||||||
|
movedSpecs := rl.clients[moved.ws]
|
||||||
|
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||||
|
return ls.index == movedFromIndex && ls.subrelay == srl
|
||||||
|
})
|
||||||
|
movedSpecs[idx].index = spec.index
|
||||||
|
rl.clients[moved.ws] = movedSpecs
|
||||||
|
}
|
||||||
|
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rl.clients[ws] = specs[0 : len(specs)-nswaps]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
||||||
|
rl.clientsMutex.Lock()
|
||||||
|
defer rl.clientsMutex.Unlock()
|
||||||
|
if specs, ok := rl.clients[ws]; ok {
|
||||||
|
// swap delete listeners and delete client (all specs will be deleted)
|
||||||
|
for s, spec := range specs {
|
||||||
|
// no need to cancel contexts since they inherit from the main connection context
|
||||||
|
// just delete the listeners (swap-delete)
|
||||||
|
srl := spec.subrelay
|
||||||
|
|
||||||
|
if spec.index != len(srl.listeners)-1 {
|
||||||
|
movedFromIndex := len(srl.listeners) - 1
|
||||||
|
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
|
||||||
|
srl.listeners[spec.index] = moved
|
||||||
|
|
||||||
|
// temporarily update the spec of the listener being removed to have index == -1
|
||||||
|
// (since it was removed) so it doesn't match in the search below
|
||||||
|
rl.clients[ws][s].index = -1
|
||||||
|
|
||||||
|
// now we must update the the listener we just moved
|
||||||
|
// so its .index reflects its new position on srl.listeners
|
||||||
|
movedSpecs := rl.clients[moved.ws]
|
||||||
|
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
|
||||||
|
return ls.index == movedFromIndex && ls.subrelay == srl
|
||||||
|
})
|
||||||
|
movedSpecs[idx].index = spec.index
|
||||||
|
rl.clients[moved.ws] = movedSpecs
|
||||||
|
}
|
||||||
|
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(rl.clients, ws)
|
||||||
|
}
|
||||||
|
|
||||||
func (rl *Relay) notifyListeners(event *nostr.Event) {
|
func (rl *Relay) notifyListeners(event *nostr.Event) {
|
||||||
for _, listener := range rl.listeners {
|
for _, listener := range rl.listeners {
|
||||||
if listener.filter.Matches(event) {
|
if listener.filter.Matches(event) {
|
||||||
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.subscriptionId, Event: *event})
|
for _, pb := range rl.PreventBroadcast {
|
||||||
|
if pb(listener.ws, event) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
188
listener_fuzz_test.go
Normal file
188
listener_fuzz_test.go
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
package khatru
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FuzzRandomListenerClientRemoving(f *testing.F) {
|
||||||
|
f.Add(uint(20), uint(20), uint(1))
|
||||||
|
f.Fuzz(func(t *testing.T, utw uint, ubs uint, ualf uint) {
|
||||||
|
totalWebsockets := int(utw)
|
||||||
|
baseSubs := int(ubs)
|
||||||
|
addListenerFreq := int(ualf) + 1
|
||||||
|
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
websockets := make([]*WebSocket, 0, totalWebsockets*baseSubs)
|
||||||
|
|
||||||
|
l := 0
|
||||||
|
|
||||||
|
for i := 0; i < totalWebsockets; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s := 0
|
||||||
|
for j := 0; j < baseSubs; j++ {
|
||||||
|
for i := 0; i < totalWebsockets; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := idFromSeqUpper(i)
|
||||||
|
|
||||||
|
if s%addListenerFreq == 0 {
|
||||||
|
l++
|
||||||
|
rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
s++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, totalWebsockets)
|
||||||
|
require.Len(t, rl.listeners, l)
|
||||||
|
|
||||||
|
for ws := range rl.clients {
|
||||||
|
rl.removeClientAndListeners(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 0)
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzRandomListenerIdRemoving(f *testing.F) {
|
||||||
|
f.Add(uint(20), uint(20), uint(1), uint(4))
|
||||||
|
f.Fuzz(func(t *testing.T, utw uint, ubs uint, ualf uint, ualef uint) {
|
||||||
|
totalWebsockets := int(utw)
|
||||||
|
baseSubs := int(ubs)
|
||||||
|
addListenerFreq := int(ualf) + 1
|
||||||
|
addExtraListenerFreq := int(ualef) + 1
|
||||||
|
|
||||||
|
if totalWebsockets > 1024 || baseSubs > 1024 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
websockets := make([]*WebSocket, 0, totalWebsockets)
|
||||||
|
|
||||||
|
type wsid struct {
|
||||||
|
ws *WebSocket
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
subs := make([]wsid, 0, totalWebsockets*baseSubs)
|
||||||
|
extra := 0
|
||||||
|
|
||||||
|
for i := 0; i < totalWebsockets; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s := 0
|
||||||
|
for j := 0; j < baseSubs; j++ {
|
||||||
|
for i := 0; i < totalWebsockets; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := idFromSeqUpper(i)
|
||||||
|
|
||||||
|
if s%addListenerFreq == 0 {
|
||||||
|
id := w + ":" + idFromSeqLower(j)
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
subs = append(subs, wsid{ws, id})
|
||||||
|
|
||||||
|
if s%addExtraListenerFreq == 0 {
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
extra++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, totalWebsockets)
|
||||||
|
require.Len(t, rl.listeners, len(subs)+extra)
|
||||||
|
|
||||||
|
rand.Shuffle(len(subs), func(i, j int) {
|
||||||
|
subs[i], subs[j] = subs[j], subs[i]
|
||||||
|
})
|
||||||
|
for _, wsidToRemove := range subs {
|
||||||
|
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
require.Len(t, rl.clients, totalWebsockets)
|
||||||
|
for _, specs := range rl.clients {
|
||||||
|
require.Len(t, specs, 0)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzRouterListenersPabloCrash(f *testing.F) {
|
||||||
|
f.Add(uint(3), uint(6), uint(2), uint(20))
|
||||||
|
f.Fuzz(func(t *testing.T, totalRelays uint, totalConns uint, subFreq uint, subIterations uint) {
|
||||||
|
totalRelays++
|
||||||
|
totalConns++
|
||||||
|
subFreq++
|
||||||
|
subIterations++
|
||||||
|
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
relays := make([]*Relay, int(totalRelays))
|
||||||
|
for i := 0; i < int(totalRelays); i++ {
|
||||||
|
relays[i] = NewRelay()
|
||||||
|
}
|
||||||
|
|
||||||
|
conns := make([]*WebSocket, int(totalConns))
|
||||||
|
for i := 0; i < int(totalConns); i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
conns[i] = ws
|
||||||
|
rl.clients[ws] = make([]listenerSpec, 0, subIterations)
|
||||||
|
}
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
type wsid struct {
|
||||||
|
ws *WebSocket
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
s := 0
|
||||||
|
subs := make([]wsid, 0, subIterations*totalConns*totalRelays)
|
||||||
|
for i, conn := range conns {
|
||||||
|
w := idFromSeqUpper(i)
|
||||||
|
for j := 0; j < int(subIterations); j++ {
|
||||||
|
id := w + ":" + idFromSeqLower(j)
|
||||||
|
for _, rlt := range relays {
|
||||||
|
if s%int(subFreq) == 0 {
|
||||||
|
rl.addListener(conn, id, rlt, f, cancel)
|
||||||
|
subs = append(subs, wsid{conn, id})
|
||||||
|
}
|
||||||
|
s++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, wsid := range subs {
|
||||||
|
rl.removeListenerId(wsid.ws, wsid.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, wsid := range subs {
|
||||||
|
require.Len(t, rl.clients[wsid.ws], 0)
|
||||||
|
}
|
||||||
|
for _, rlt := range relays {
|
||||||
|
require.Len(t, rlt.listeners, 0)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
545
listener_test.go
Normal file
545
listener_test.go
Normal file
@@ -0,0 +1,545 @@
|
|||||||
|
package khatru
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func idFromSeqUpper(seq int) string { return idFromSeq(seq, 65, 90) }
|
||||||
|
func idFromSeqLower(seq int) string { return idFromSeq(seq, 97, 122) }
|
||||||
|
func idFromSeq(seq int, min, max int) string {
|
||||||
|
maxSeq := max - min + 1
|
||||||
|
nLetters := seq/maxSeq + 1
|
||||||
|
result := strings.Builder{}
|
||||||
|
result.Grow(nLetters)
|
||||||
|
for l := 0; l < nLetters; l++ {
|
||||||
|
letter := rune(seq%maxSeq + min)
|
||||||
|
result.WriteRune(letter)
|
||||||
|
}
|
||||||
|
return result.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListenerSetupAndRemoveOnce(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
ws1 := &WebSocket{}
|
||||||
|
ws2 := &WebSocket{}
|
||||||
|
|
||||||
|
f1 := nostr.Filter{Kinds: []int{1}}
|
||||||
|
f2 := nostr.Filter{Kinds: []int{2}}
|
||||||
|
f3 := nostr.Filter{Kinds: []int{3}}
|
||||||
|
|
||||||
|
rl.clients[ws1] = nil
|
||||||
|
rl.clients[ws2] = nil
|
||||||
|
|
||||||
|
var cancel func(cause error) = nil
|
||||||
|
|
||||||
|
t.Run("adding listeners", func(t *testing.T) {
|
||||||
|
rl.addListener(ws1, "1a", rl, f1, cancel)
|
||||||
|
rl.addListener(ws1, "1b", rl, f2, cancel)
|
||||||
|
rl.addListener(ws2, "2a", rl, f3, cancel)
|
||||||
|
rl.addListener(ws1, "1c", rl, f3, cancel)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"1a", cancel, 0, rl},
|
||||||
|
{"1b", cancel, 1, rl},
|
||||||
|
{"1c", cancel, 3, rl},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"2a", cancel, 2, rl},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"1a", f1, ws1},
|
||||||
|
{"1b", f2, ws1},
|
||||||
|
{"2a", f3, ws2},
|
||||||
|
{"1c", f3, ws1},
|
||||||
|
}, rl.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing a client", func(t *testing.T) {
|
||||||
|
rl.removeClientAndListeners(ws1)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws2: {
|
||||||
|
{"2a", cancel, 0, rl},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"2a", f3, ws2},
|
||||||
|
}, rl.listeners)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListenerMoreConvolutedCase(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
ws1 := &WebSocket{}
|
||||||
|
ws2 := &WebSocket{}
|
||||||
|
ws3 := &WebSocket{}
|
||||||
|
ws4 := &WebSocket{}
|
||||||
|
|
||||||
|
f1 := nostr.Filter{Kinds: []int{1}}
|
||||||
|
f2 := nostr.Filter{Kinds: []int{2}}
|
||||||
|
f3 := nostr.Filter{Kinds: []int{3}}
|
||||||
|
|
||||||
|
rl.clients[ws1] = nil
|
||||||
|
rl.clients[ws2] = nil
|
||||||
|
rl.clients[ws3] = nil
|
||||||
|
rl.clients[ws4] = nil
|
||||||
|
|
||||||
|
var cancel func(cause error) = nil
|
||||||
|
|
||||||
|
t.Run("adding listeners", func(t *testing.T) {
|
||||||
|
rl.addListener(ws1, "c", rl, f1, cancel)
|
||||||
|
rl.addListener(ws2, "b", rl, f2, cancel)
|
||||||
|
rl.addListener(ws3, "a", rl, f3, cancel)
|
||||||
|
rl.addListener(ws4, "d", rl, f3, cancel)
|
||||||
|
rl.addListener(ws2, "b", rl, f1, cancel)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rl},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"b", cancel, 1, rl},
|
||||||
|
{"b", cancel, 4, rl},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 2, rl},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"d", cancel, 3, rl},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"b", f2, ws2},
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"d", f3, ws4},
|
||||||
|
{"b", f1, ws2},
|
||||||
|
}, rl.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing a client", func(t *testing.T) {
|
||||||
|
rl.removeClientAndListeners(ws2)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rl},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 2, rl},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"d", cancel, 1, rl},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"d", f3, ws4},
|
||||||
|
{"a", f3, ws3},
|
||||||
|
}, rl.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("reorganize the first case differently and then remove again", func(t *testing.T) {
|
||||||
|
rl.clients = map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 1, rl},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"b", cancel, 2, rl},
|
||||||
|
{"b", cancel, 4, rl},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 0, rl},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"d", cancel, 3, rl},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rl.listeners = []listener{
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"b", f2, ws2},
|
||||||
|
{"d", f3, ws4},
|
||||||
|
{"b", f1, ws2},
|
||||||
|
}
|
||||||
|
|
||||||
|
rl.removeClientAndListeners(ws2)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 1, rl},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 0, rl},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"d", cancel, 2, rl},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"d", f3, ws4},
|
||||||
|
}, rl.listeners)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
ws1 := &WebSocket{}
|
||||||
|
ws2 := &WebSocket{}
|
||||||
|
ws3 := &WebSocket{}
|
||||||
|
ws4 := &WebSocket{}
|
||||||
|
|
||||||
|
f1 := nostr.Filter{Kinds: []int{1}}
|
||||||
|
f2 := nostr.Filter{Kinds: []int{2}}
|
||||||
|
f3 := nostr.Filter{Kinds: []int{3}}
|
||||||
|
|
||||||
|
rlx := NewRelay()
|
||||||
|
rly := NewRelay()
|
||||||
|
rlz := NewRelay()
|
||||||
|
|
||||||
|
rl.clients[ws1] = nil
|
||||||
|
rl.clients[ws2] = nil
|
||||||
|
rl.clients[ws3] = nil
|
||||||
|
rl.clients[ws4] = nil
|
||||||
|
|
||||||
|
var cancel func(cause error) = nil
|
||||||
|
|
||||||
|
t.Run("adding listeners", func(t *testing.T) {
|
||||||
|
rl.addListener(ws1, "c", rlx, f1, cancel)
|
||||||
|
rl.addListener(ws2, "b", rly, f2, cancel)
|
||||||
|
rl.addListener(ws3, "a", rlz, f3, cancel)
|
||||||
|
rl.addListener(ws4, "d", rlx, f3, cancel)
|
||||||
|
rl.addListener(ws4, "e", rlx, f3, cancel)
|
||||||
|
rl.addListener(ws3, "a", rlx, f3, cancel)
|
||||||
|
rl.addListener(ws4, "e", rly, f3, cancel)
|
||||||
|
rl.addListener(ws3, "f", rly, f3, cancel)
|
||||||
|
rl.addListener(ws1, "g", rlz, f1, cancel)
|
||||||
|
rl.addListener(ws2, "g", rlz, f2, cancel)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rlx},
|
||||||
|
{"g", cancel, 1, rlz},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"b", cancel, 0, rly},
|
||||||
|
{"g", cancel, 2, rlz},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 0, rlz},
|
||||||
|
{"a", cancel, 3, rlx},
|
||||||
|
{"f", cancel, 2, rly},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"d", cancel, 1, rlx},
|
||||||
|
{"e", cancel, 2, rlx},
|
||||||
|
{"e", cancel, 1, rly},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"d", f3, ws4},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
{"a", f3, ws3},
|
||||||
|
}, rlx.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"b", f2, ws2},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
{"f", f3, ws3},
|
||||||
|
}, rly.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"g", f1, ws1},
|
||||||
|
{"g", f2, ws2},
|
||||||
|
}, rlz.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing a subscription id", func(t *testing.T) {
|
||||||
|
// removing 'd' from ws4
|
||||||
|
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
|
||||||
|
rl.removeListenerId(ws4, "d")
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rlx},
|
||||||
|
{"g", cancel, 1, rlz},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"b", cancel, 0, rly},
|
||||||
|
{"g", cancel, 2, rlz},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"a", cancel, 0, rlz},
|
||||||
|
{"a", cancel, 1, rlx},
|
||||||
|
{"f", cancel, 2, rly},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"e", cancel, 1, rly},
|
||||||
|
{"e", cancel, 2, rlx},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
}, rlx.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"b", f2, ws2},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
{"f", f3, ws3},
|
||||||
|
}, rly.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"a", f3, ws3},
|
||||||
|
{"g", f1, ws1},
|
||||||
|
{"g", f2, ws2},
|
||||||
|
}, rlz.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing another subscription id", func(t *testing.T) {
|
||||||
|
// removing 'a' from ws3
|
||||||
|
rl.clients[ws3][0].cancel = func(cause error) {} // set since removing will call it
|
||||||
|
rl.clients[ws3][1].cancel = func(cause error) {} // set since removing will call it
|
||||||
|
rl.removeListenerId(ws3, "a")
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rlx},
|
||||||
|
{"g", cancel, 1, rlz},
|
||||||
|
},
|
||||||
|
ws2: {
|
||||||
|
{"b", cancel, 0, rly},
|
||||||
|
{"g", cancel, 0, rlz},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"f", cancel, 2, rly},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"e", cancel, 1, rly},
|
||||||
|
{"e", cancel, 1, rlx},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
}, rlx.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"b", f2, ws2},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
{"f", f3, ws3},
|
||||||
|
}, rly.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"g", f2, ws2},
|
||||||
|
{"g", f1, ws1},
|
||||||
|
}, rlz.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing a connection", func(t *testing.T) {
|
||||||
|
rl.removeClientAndListeners(ws2)
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rlx},
|
||||||
|
{"g", cancel, 0, rlz},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"f", cancel, 0, rly},
|
||||||
|
},
|
||||||
|
ws4: {
|
||||||
|
{"e", cancel, 1, rly},
|
||||||
|
{"e", cancel, 1, rlx},
|
||||||
|
},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
}, rlx.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"f", f3, ws3},
|
||||||
|
{"e", f3, ws4},
|
||||||
|
}, rly.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"g", f1, ws1},
|
||||||
|
}, rlz.listeners)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("removing another subscription id", func(t *testing.T) {
|
||||||
|
// removing 'e' from ws4
|
||||||
|
rl.clients[ws4][0].cancel = func(cause error) {} // set since removing will call it
|
||||||
|
rl.clients[ws4][1].cancel = func(cause error) {} // set since removing will call it
|
||||||
|
rl.removeListenerId(ws4, "e")
|
||||||
|
|
||||||
|
require.Equal(t, map[*WebSocket][]listenerSpec{
|
||||||
|
ws1: {
|
||||||
|
{"c", cancel, 0, rlx},
|
||||||
|
{"g", cancel, 0, rlz},
|
||||||
|
},
|
||||||
|
ws3: {
|
||||||
|
{"f", cancel, 0, rly},
|
||||||
|
},
|
||||||
|
ws4: {},
|
||||||
|
}, rl.clients)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"c", f1, ws1},
|
||||||
|
}, rlx.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"f", f3, ws3},
|
||||||
|
}, rly.listeners)
|
||||||
|
|
||||||
|
require.Equal(t, []listener{
|
||||||
|
{"g", f1, ws1},
|
||||||
|
}, rlz.listeners)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRandomListenerClientRemoving(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
websockets := make([]*WebSocket, 0, 20)
|
||||||
|
|
||||||
|
l := 0
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < 20; j++ {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := idFromSeqUpper(i)
|
||||||
|
|
||||||
|
if rand.Intn(2) < 1 {
|
||||||
|
l++
|
||||||
|
rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
require.Len(t, rl.listeners, l)
|
||||||
|
|
||||||
|
for ws := range rl.clients {
|
||||||
|
rl.removeClientAndListeners(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 0)
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRandomListenerIdRemoving(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
websockets := make([]*WebSocket, 0, 20)
|
||||||
|
|
||||||
|
type wsid struct {
|
||||||
|
ws *WebSocket
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
subs := make([]wsid, 0, 20*20)
|
||||||
|
extra := 0
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := &WebSocket{}
|
||||||
|
websockets = append(websockets, ws)
|
||||||
|
rl.clients[ws] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < 20; j++ {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
ws := websockets[i]
|
||||||
|
w := idFromSeqUpper(i)
|
||||||
|
|
||||||
|
if rand.Intn(2) < 1 {
|
||||||
|
id := w + ":" + idFromSeqLower(j)
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
subs = append(subs, wsid{ws, id})
|
||||||
|
|
||||||
|
if rand.Intn(5) < 1 {
|
||||||
|
rl.addListener(ws, id, rl, f, cancel)
|
||||||
|
extra++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
require.Len(t, rl.listeners, len(subs)+extra)
|
||||||
|
|
||||||
|
rand.Shuffle(len(subs), func(i, j int) {
|
||||||
|
subs[i], subs[j] = subs[j], subs[i]
|
||||||
|
})
|
||||||
|
for _, wsidToRemove := range subs {
|
||||||
|
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, rl.listeners, 0)
|
||||||
|
require.Len(t, rl.clients, 20)
|
||||||
|
for _, specs := range rl.clients {
|
||||||
|
require.Len(t, specs, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRouterListenersPabloCrash(t *testing.T) {
|
||||||
|
rl := NewRelay()
|
||||||
|
|
||||||
|
rla := NewRelay()
|
||||||
|
rlb := NewRelay()
|
||||||
|
|
||||||
|
ws1 := &WebSocket{}
|
||||||
|
ws2 := &WebSocket{}
|
||||||
|
ws3 := &WebSocket{}
|
||||||
|
|
||||||
|
rl.clients[ws1] = nil
|
||||||
|
rl.clients[ws2] = nil
|
||||||
|
rl.clients[ws3] = nil
|
||||||
|
|
||||||
|
f := nostr.Filter{Kinds: []int{1}}
|
||||||
|
cancel := func(cause error) {}
|
||||||
|
|
||||||
|
rl.addListener(ws1, ":1", rla, f, cancel)
|
||||||
|
rl.addListener(ws2, ":1", rlb, f, cancel)
|
||||||
|
rl.addListener(ws3, "a", rlb, f, cancel)
|
||||||
|
rl.addListener(ws3, "b", rla, f, cancel)
|
||||||
|
rl.addListener(ws3, "c", rlb, f, cancel)
|
||||||
|
|
||||||
|
rl.removeClientAndListeners(ws1)
|
||||||
|
rl.removeClientAndListeners(ws3)
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
)
|
)
|
||||||
@@ -79,7 +80,8 @@ func RestrictToSpecifiedKinds(kinds ...uint16) func(context.Context, *nostr.Even
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func PreventTimestampsInThePast(thresholdSeconds nostr.Timestamp) func(context.Context, *nostr.Event) (bool, string) {
|
func PreventTimestampsInThePast(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
thresholdSeconds := nostr.Timestamp(threshold.Seconds())
|
||||||
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
if nostr.Now()-event.CreatedAt > thresholdSeconds {
|
if nostr.Now()-event.CreatedAt > thresholdSeconds {
|
||||||
return true, "event too old"
|
return true, "event too old"
|
||||||
@@ -88,7 +90,8 @@ func PreventTimestampsInThePast(thresholdSeconds nostr.Timestamp) func(context.C
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func PreventTimestampsInTheFuture(thresholdSeconds nostr.Timestamp) func(context.Context, *nostr.Event) (bool, string) {
|
func PreventTimestampsInTheFuture(threshold time.Duration) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
thresholdSeconds := nostr.Timestamp(threshold.Seconds())
|
||||||
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
if event.CreatedAt-nostr.Now() > thresholdSeconds {
|
if event.CreatedAt-nostr.Now() > thresholdSeconds {
|
||||||
return true, "event too much in the future"
|
return true, "event too much in the future"
|
||||||
|
|||||||
1
relay.go
1
relay.go
@@ -64,6 +64,7 @@ type Relay struct {
|
|||||||
OnDisconnect []func(ctx context.Context)
|
OnDisconnect []func(ctx context.Context)
|
||||||
OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
|
OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
|
||||||
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
|
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
|
||||||
|
PreventBroadcast []func(ws *WebSocket, event *nostr.Event) bool
|
||||||
|
|
||||||
// these are used when this relays acts as a router
|
// these are used when this relays acts as a router
|
||||||
routes []Route
|
routes []Route
|
||||||
|
|||||||
Reference in New Issue
Block a user