khatru/listener.go

147 lines
4.5 KiB
Go
Raw Permalink Normal View History

package khatru
2021-01-13 23:46:06 -03:00
import (
"context"
2024-07-30 09:43:04 -03:00
"errors"
"slices"
"github.com/nbd-wtf/go-nostr"
2021-01-13 23:46:06 -03:00
)
2024-07-30 09:43:04 -03:00
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
2024-07-30 09:43:04 -03:00
type listenerSpec struct {
id string // kept here so we can easily match against it removeListenerId
cancel context.CancelCauseFunc
index int
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
2024-07-30 09:43:04 -03:00
}
2024-07-30 09:43:04 -03:00
type listener struct {
id string // duplicated here so we can easily send it on notifyListeners
filter nostr.Filter
ws *WebSocket
2024-07-30 09:43:04 -03:00
}
2024-07-30 09:43:04 -03:00
func (rl *Relay) GetListeningFilters() []nostr.Filter {
respfilters := make([]nostr.Filter, len(rl.listeners))
for i, l := range rl.listeners {
respfilters[i] = l.filter
}
return respfilters
}
2024-07-30 09:43:04 -03:00
// addListener may be called multiple times for each id and ws -- in which case each filter will
// be added as an independent listener
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
idx := len(subrelay.listeners)
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
2024-07-30 09:43:04 -03:00
})
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
id: id,
filter: filter,
2024-07-30 09:43:04 -03:00
})
}
2021-01-13 23:46:06 -03:00
}
// remove a specific subscription id from listeners for a given ws client
// and cancel its specific context
2024-07-30 09:43:04 -03:00
func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete specs that match this id
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
2024-07-30 09:43:04 -03:00
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
2024-07-30 09:43:04 -03:00
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
2024-07-30 09:43:04 -03:00
}
}
}
2021-02-14 21:08:02 -03:00
}
2021-01-13 23:46:06 -03:00
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client (all specs will be deleted)
for s, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners (swap-delete)
srl := spec.subrelay
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// temporarily update the spec of the listener being removed to have index == -1
// (since it was removed) so it doesn't match in the search below
rl.clients[ws][s].index = -1
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
}
}
delete(rl.clients, ws)
}
2024-07-30 09:43:04 -03:00
func (rl *Relay) notifyListeners(event *nostr.Event) {
for _, listener := range rl.listeners {
if listener.filter.Matches(event) {
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
return
}
}
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
2024-07-30 09:43:04 -03:00
}
}
2021-01-13 23:46:06 -03:00
}