This commit is contained in:
fiatjaf 2024-07-30 09:43:04 -03:00
parent 096890804f
commit a63dc829df
12 changed files with 353 additions and 115 deletions

View File

@ -7,5 +7,5 @@ import (
// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions // BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks // it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt *nostr.Event) { func (rl *Relay) BroadcastEvent(evt *nostr.Event) {
notifyListeners(evt) rl.notifyListeners(evt)
} }

View File

@ -8,3 +8,4 @@
- [Live event generation](custom-live-events) - [Live event generation](custom-live-events)
- [Embedding `khatru` inside other Go HTTP servers](embed) - [Embedding `khatru` inside other Go HTTP servers](embed)
- [Generating relays dynamically and serving them from the same path](dynamic) - [Generating relays dynamically and serving them from the same path](dynamic)
- [Routing between multiple relays](routing)

63
docs/cookbook/routing.md Normal file
View File

@ -0,0 +1,63 @@
---
outline: deep
---
# Routing
If you have one (or more) set of policies that have to be executed in sequence (for example, first you check for the presence of a tag, then later in the next policies you use that tag without checking) and they only apply to some class of events, but you still want your relay to deal with other classes of events that can lead to cumbersome sets of rules, always having to check if an event meets the requirements and so on. There is where routing can help you.
It also can be handy if you get a [`khatru.Relay`](https://pkg.go.dev/github.com/fiatjaf/khatru#Relay) from somewhere else, like a library such as [`relay29`](https://github.com/fiatjaf/relay29), and you want to combine it with other policies without some interfering with the others. As in the example below:
```go
sk := os.Getenv("RELAY_SECRET_KEY")
// a relay for NIP-29 groups
groupsStore := badger.BadgerBackend{}
groupsStore.Init()
groupsRelay, _ := khatru29.Init(relay29.Options{Domain: "example.com", DB: groupsStore, SecretKey: sk})
// ...
// a relay for everything else
publicStore := slicestore.SliceStore{}
publicStore.Init()
publicRelay := khatru.NewRelay()
publicRelay.StoreEvent = append(publicRelay.StoreEvent, publicStore.SaveEvent)
publicRelay.QueryEvents = append(publicRelay.QueryEvents, publicStore.QueryEvents)
publicRelay.CountEvents = append(publicRelay.CountEvents, publicStore.CountEvents)
publicRelay.DeleteEvent = append(publicRelay.DeleteEvent, publicStore.DeleteEvent)
// ...
// a higher-level relay that just routes between the two above
router := khatru.NewRouter()
// route requests and events to the groups relay
router.Route().
Req(func (filter nostr.Filter) bool {
_, hasHTag := filter.Tags["h"]
if hasHTag {
return true
}
return slices.Contains(filter.Kinds, func (k int) bool { return k == 39000 || k == 39001 || k == 39002 })
}).
Event(func (event *nostr.Event) bool {
switch {
case event.Kind <= 9021 && event.Kind >= 9000:
return true
case event.Kind <= 39010 && event.Kind >= 39000:
return true
case event.Kind <= 12 && event.Kind >= 9:
return true
case event.Tags.GetFirst([]string{"h", ""}) != nil:
return true
default:
return false
}
}).
Relay(groupsRelay)
// route requests and events to the other
router.Route().
Req(func (filter nostr.Filter) bool { return true }).
Event(func (event *nostr.Event) bool { return true }).
Relay(publicRelay)
```

70
examples/routing/main.go Normal file
View File

@ -0,0 +1,70 @@
package main
import (
"fmt"
"net/http"
"slices"
"github.com/fiatjaf/eventstore/slicestore"
"github.com/fiatjaf/eventstore/sqlite3"
"github.com/fiatjaf/khatru"
"github.com/nbd-wtf/go-nostr"
)
func main() {
db1 := slicestore.SliceStore{}
db1.Init()
r1 := khatru.NewRelay()
r1.StoreEvent = append(r1.StoreEvent, db1.SaveEvent)
r1.QueryEvents = append(r1.QueryEvents, db1.QueryEvents)
r1.CountEvents = append(r1.CountEvents, db1.CountEvents)
r1.DeleteEvent = append(r1.DeleteEvent, db1.DeleteEvent)
db2 := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/t"}
db2.Init()
r2 := khatru.NewRelay()
r2.StoreEvent = append(r2.StoreEvent, db2.SaveEvent)
r2.QueryEvents = append(r2.QueryEvents, db2.QueryEvents)
r2.CountEvents = append(r2.CountEvents, db2.CountEvents)
r2.DeleteEvent = append(r2.DeleteEvent, db2.DeleteEvent)
db3 := slicestore.SliceStore{}
db3.Init()
r3 := khatru.NewRelay()
r3.StoreEvent = append(r3.StoreEvent, db3.SaveEvent)
r3.QueryEvents = append(r3.QueryEvents, db3.QueryEvents)
r3.CountEvents = append(r3.CountEvents, db3.CountEvents)
r3.DeleteEvent = append(r3.DeleteEvent, db3.DeleteEvent)
router := khatru.NewRouter()
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 30023)
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 30023
}).
Relay(r1)
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam")
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 1 && event.Tags.GetFirst([]string{"t", "spam"}) != nil
}).
Relay(r2)
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 1)
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 1
}).
Relay(r3)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", router)
}

View File

@ -49,11 +49,12 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) error {
// Shutdown sends a websocket close control message to all connected clients. // Shutdown sends a websocket close control message to all connected clients.
func (rl *Relay) Shutdown(ctx context.Context) { func (rl *Relay) Shutdown(ctx context.Context) {
rl.httpServer.Shutdown(ctx) rl.httpServer.Shutdown(ctx)
rl.clientsMutex.Lock()
rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { defer rl.clientsMutex.Unlock()
conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second)) for ws := range rl.clients {
conn.Close() ws.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))
rl.clients.Delete(conn) ws.conn.Close()
return true }
}) clear(rl.clients)
rl.listeners = rl.listeners[:0]
} }

2
go.mod
View File

@ -54,3 +54,5 @@ require (
golang.org/x/sys v0.20.0 // indirect golang.org/x/sys v0.20.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect google.golang.org/protobuf v1.31.0 // indirect
) )
replace github.com/nbd-wtf/go-nostr => ../go-nostr

2
go.sum
View File

@ -113,8 +113,6 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nbd-wtf/go-nostr v0.34.3 h1:JfDOHOje7gzUhisbZD0v2Y9b9vh2PmP6eHsU/GfU8QE=
github.com/nbd-wtf/go-nostr v0.34.3/go.mod h1:NZQkxl96ggbO8rvDpVjcsojJqKTPwqhP4i82O7K5DJs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

View File

@ -47,7 +47,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
rl.Log.Printf("failed to upgrade websocket: %v\n", err) rl.Log.Printf("failed to upgrade websocket: %v\n", err)
return return
} }
rl.clients.Store(conn, struct{}{})
ticker := time.NewTicker(rl.PingPeriod) ticker := time.NewTicker(rl.PingPeriod)
// NIP-42 challenge // NIP-42 challenge
@ -60,6 +60,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
Challenge: hex.EncodeToString(challenge), Challenge: hex.EncodeToString(challenge),
} }
rl.clientsMutex.Lock()
rl.clients[ws] = make([]listenerSpec, 0, 2)
rl.clientsMutex.Unlock()
ctx, cancel := context.WithCancel( ctx, cancel := context.WithCancel(
context.WithValue( context.WithValue(
context.Background(), context.Background(),
@ -74,11 +78,22 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ticker.Stop() ticker.Stop()
cancel() cancel()
if _, ok := rl.clients.Load(conn); ok { conn.Close()
conn.Close()
rl.clients.Delete(conn) rl.clientsMutex.Lock()
removeListener(ws) defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client
for s, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners
srl := spec.subrelay
srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1]
specs[s] = specs[len(specs)-1]
srl.listeners = srl.listeners[0:len(srl.listeners)]
}
} }
delete(rl.clients, ws)
} }
go func() { go func() {
@ -167,25 +182,30 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
} }
} }
srl := rl
if rl.getSubRelayFromEvent != nil {
srl = rl.getSubRelayFromEvent(&env.Event)
}
var ok bool var ok bool
var writeErr error var writeErr error
var skipBroadcast bool var skipBroadcast bool
if env.Event.Kind == 5 { if env.Event.Kind == 5 {
// this always returns "blocked: " whenever it returns an error // this always returns "blocked: " whenever it returns an error
writeErr = rl.handleDeleteRequest(ctx, &env.Event) writeErr = srl.handleDeleteRequest(ctx, &env.Event)
} else { } else {
// this will also always return a prefixed reason // this will also always return a prefixed reason
skipBroadcast, writeErr = rl.AddEvent(ctx, &env.Event) skipBroadcast, writeErr = srl.AddEvent(ctx, &env.Event)
} }
var reason string var reason string
if writeErr == nil { if writeErr == nil {
ok = true ok = true
for _, ovw := range rl.OverwriteResponseEvent { for _, ovw := range srl.OverwriteResponseEvent {
ovw(ctx, &env.Event) ovw(ctx, &env.Event)
} }
if !skipBroadcast { if !skipBroadcast {
notifyListeners(&env.Event) srl.notifyListeners(&env.Event)
} }
} else { } else {
reason = writeErr.Error() reason = writeErr.Error()
@ -199,9 +219,14 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"}) ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"})
return return
} }
var total int64 var total int64
for _, filter := range env.Filters { for _, filter := range env.Filters {
total += rl.handleCountRequest(ctx, ws, filter) srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
total += srl.handleCountRequest(ctx, ws, filter)
} }
ws.WriteJSON(nostr.CountEnvelope{SubscriptionID: env.SubscriptionID, Count: &total}) ws.WriteJSON(nostr.CountEnvelope{SubscriptionID: env.SubscriptionID, Count: &total})
case *nostr.ReqEnvelope: case *nostr.ReqEnvelope:
@ -216,7 +241,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// handle each filter separately -- dispatching events as they're loaded from databases // handle each filter separately -- dispatching events as they're loaded from databases
for _, filter := range env.Filters { for _, filter := range env.Filters {
err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil { if err != nil {
// fail everything if any filter is rejected // fail everything if any filter is rejected
reason := err.Error() reason := err.Error()
@ -226,6 +255,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
cancelReqCtx(errors.New("filter rejected")) cancelReqCtx(errors.New("filter rejected"))
return return
} else {
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
} }
} }
@ -236,10 +267,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
cancelReqCtx(nil) cancelReqCtx(nil)
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)) ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}() }()
setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx)
case *nostr.CloseEnvelope: case *nostr.CloseEnvelope:
removeListenerId(ws, string(*env)) id := string(*env)
rl.removeListenerId(ws, id)
case *nostr.AuthEnvelope: case *nostr.AuthEnvelope:
wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1) wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1)
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok { if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {

View File

@ -2,86 +2,91 @@ package khatru
import ( import (
"context" "context"
"fmt" "errors"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/puzpuzpuz/xsync/v3"
) )
type Listener struct { var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
filters nostr.Filters
cancel context.CancelCauseFunc type listenerSpec struct {
subscriptionId string // kept here so we can easily match against it removeListenerId
cancel context.CancelCauseFunc
index int
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
} }
var listeners = xsync.NewMapOf[*WebSocket, *xsync.MapOf[string, *Listener]]() type listener struct {
subscriptionId string // duplicated here so we can easily send it on notifyListeners
filter nostr.Filter
ws *WebSocket
}
func GetListeningFilters() nostr.Filters { func (rl *Relay) GetListeningFilters() []nostr.Filter {
respfilters := make(nostr.Filters, 0, listeners.Size()*2) respfilters := make([]nostr.Filter, len(rl.listeners))
for i, l := range rl.listeners {
// here we go through all the existing listeners respfilters[i] = l.filter
listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { }
subs.Range(func(_ string, listener *Listener) bool {
for _, listenerfilter := range listener.filters {
for _, respfilter := range respfilters {
// check if this filter specifically is already added to respfilters
if nostr.FilterEqual(listenerfilter, respfilter) {
goto nextconn
}
}
// field not yet present on respfilters, add it
respfilters = append(respfilters, listenerfilter)
// continue to the next filter
nextconn:
continue
}
return true
})
return true
})
// respfilters will be a slice with all the distinct filter we currently have active
return respfilters return respfilters
} }
func setListener(id string, ws *WebSocket, filters nostr.Filters, cancel context.CancelCauseFunc) { // addListener may be called multiple times for each id and ws -- in which case each filter will
subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { // be added as an independent listener
return xsync.NewMapOf[string, *Listener]() func (rl *Relay) addListener(
}) ws *WebSocket,
subs.Store(id, &Listener{filters: filters, cancel: cancel}) id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
idx := len(subrelay.listeners)
rl.clients[ws] = append(specs, listenerSpec{
subscriptionId: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
subscriptionId: id,
filter: filter,
})
}
} }
// remove a specific subscription id from listeners for a given ws client // remove a specific subscription id from listeners for a given ws client
// and cancel its specific context // and cancel its specific context
func removeListenerId(ws *WebSocket, id string) { func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
if subs, ok := listeners.Load(ws); ok { rl.clientsMutex.Lock()
if listener, ok := subs.LoadAndDelete(id); ok { defer rl.clientsMutex.Unlock()
listener.cancel(fmt.Errorf("subscription closed by client"))
} if specs, ok := rl.clients[ws]; ok {
if subs.Size() == 0 { // swap delete specs that match this id
listeners.Delete(ws) nswaps := 0
for s, spec := range specs {
if spec.subscriptionId == id {
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1-nswaps]
nswaps++
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1]
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
} }
rl.clients[ws] = specs[0 : len(specs)-nswaps]
} }
} }
// remove WebSocket conn from listeners func (rl *Relay) notifyListeners(event *nostr.Event) {
// (no need to cancel contexts as they are all inherited from the main connection context) for _, listener := range rl.listeners {
func removeListener(ws *WebSocket) { if listener.filter.Matches(event) {
listeners.Delete(ws) listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.subscriptionId, Event: *event})
} }
}
func notifyListeners(event *nostr.Event) {
listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool {
subs.Range(func(id string, listener *Listener) bool {
if !listener.filters.Match(event) {
return true
}
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
return true
})
return true
})
} }

View File

@ -5,16 +5,16 @@ import (
"log" "log"
"net/http" "net/http"
"os" "os"
"sync"
"time" "time"
"github.com/fasthttp/websocket" "github.com/fasthttp/websocket"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11" "github.com/nbd-wtf/go-nostr/nip11"
"github.com/puzpuzpuz/xsync/v3"
) )
func NewRelay() *Relay { func NewRelay() *Relay {
return &Relay{ rl := &Relay{
Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags), Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags),
Info: &nip11.RelayInformationDocument{ Info: &nip11.RelayInformationDocument{
@ -29,7 +29,9 @@ func NewRelay() *Relay {
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
}, },
clients: xsync.NewMapOf[*websocket.Conn, struct{}](), clients: make(map[*WebSocket][]listenerSpec, 100),
listeners: make([]listener, 0, 100),
serveMux: &http.ServeMux{}, serveMux: &http.ServeMux{},
WriteWait: 10 * time.Second, WriteWait: 10 * time.Second,
@ -37,28 +39,36 @@ func NewRelay() *Relay {
PingPeriod: 30 * time.Second, PingPeriod: 30 * time.Second,
MaxMessageSize: 512000, MaxMessageSize: 512000,
} }
return rl
} }
type Relay struct { type Relay struct {
ServiceURL string ServiceURL string
// these structs keeps track of all the things that can be customized when handling events or requests
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string) RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
RejectConnection []func(r *http.Request) bool
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string) OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter)
OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
StoreEvent []func(ctx context.Context, event *nostr.Event) error StoreEvent []func(ctx context.Context, event *nostr.Event) error
DeleteEvent []func(ctx context.Context, event *nostr.Event) error DeleteEvent []func(ctx context.Context, event *nostr.Event) error
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
OnEventSaved []func(ctx context.Context, event *nostr.Event) OnEventSaved []func(ctx context.Context, event *nostr.Event)
OnEphemeralEvent []func(ctx context.Context, event *nostr.Event) OnEphemeralEvent []func(ctx context.Context, event *nostr.Event)
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter)
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
RejectConnection []func(r *http.Request) bool
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
// these are used when this relays acts as a router
routes []Route
getSubRelayFromEvent func(*nostr.Event) *Relay // used for handling EVENTs
getSubRelayFromFilter func(nostr.Filter) *Relay // used for handling REQs
// setting up handlers here will enable these methods // setting up handlers here will enable these methods
ManagementAPI RelayManagementAPI ManagementAPI RelayManagementAPI
@ -74,7 +84,10 @@ type Relay struct {
upgrader websocket.Upgrader upgrader websocket.Upgrader
// keep a connection reference to all connected clients for Server.Shutdown // keep a connection reference to all connected clients for Server.Shutdown
clients *xsync.MapOf[*websocket.Conn, struct{}] // also used for keeping track of who is listening to what
clients map[*WebSocket][]listenerSpec
listeners []listener
clientsMutex sync.Mutex
// in case you call Server.Start // in case you call Server.Start
Addr string Addr string

67
router.go Normal file
View File

@ -0,0 +1,67 @@
package khatru
import (
"github.com/nbd-wtf/go-nostr"
)
type Router struct{ *Relay }
type Route struct {
eventMatcher func(*nostr.Event) bool
filterMatcher func(nostr.Filter) bool
relay *Relay
}
type routeBuilder struct {
router *Router
eventMatcher func(*nostr.Event) bool
filterMatcher func(nostr.Filter) bool
}
func NewRouter() *Router {
rr := &Router{Relay: NewRelay()}
rr.routes = make([]Route, 0, 3)
rr.getSubRelayFromFilter = func(f nostr.Filter) *Relay {
for _, route := range rr.routes {
if route.filterMatcher(f) {
return route.relay
}
}
return rr.Relay
}
rr.getSubRelayFromEvent = func(e *nostr.Event) *Relay {
for _, route := range rr.routes {
if route.eventMatcher(e) {
return route.relay
}
}
return rr.Relay
}
return rr
}
func (rr *Router) Route() routeBuilder {
return routeBuilder{
router: rr,
filterMatcher: func(f nostr.Filter) bool { return false },
eventMatcher: func(e *nostr.Event) bool { return false },
}
}
func (rb routeBuilder) Req(fn func(nostr.Filter) bool) routeBuilder {
rb.filterMatcher = fn
return rb
}
func (rb routeBuilder) Event(fn func(*nostr.Event) bool) routeBuilder {
rb.eventMatcher = fn
return rb
}
func (rb routeBuilder) Relay(relay *Relay) {
rb.router.routes = append(rb.router.routes, Route{
filterMatcher: rb.filterMatcher,
eventMatcher: rb.eventMatcher,
relay: relay,
})
}

View File

@ -47,15 +47,3 @@ func GetIP(ctx context.Context) string {
func GetSubscriptionID(ctx context.Context) string { func GetSubscriptionID(ctx context.Context) string {
return ctx.Value(subscriptionIdKey).(string) return ctx.Value(subscriptionIdKey).(string)
} }
func GetOpenSubscriptions(ctx context.Context) []nostr.Filter {
if subs, ok := listeners.Load(GetConnection(ctx)); ok {
res := make([]nostr.Filter, 0, listeners.Size()*2)
subs.Range(func(_ string, sub *Listener) bool {
res = append(res, sub.filters...)
return true
})
return res
}
return nil
}