From 4905a46ccdd965ee1094072d7f31d88987b90d9f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 11 Nov 2023 21:08:39 -0300 Subject: [PATCH] make filter rejection actually work and move logic to a separate file. --- handlers.go | 76 ++++---------------------------------------------- serve-req.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 71 deletions(-) create mode 100644 serve-req.go diff --git a/handlers.go b/handlers.go index e987c95..ab8fb59 100644 --- a/handlers.go +++ b/handlers.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "encoding/json" "net/http" + "strings" "sync" "time" @@ -165,30 +166,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter")) continue } - - filter := filters[i] - - // overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support) - for _, ovw := range rl.OverwriteCountFilter { - ovw(ctx, &filter) - } - - // then check if we'll reject this filter - for _, reject := range rl.RejectCountFilter { - if rejecting, msg := reject(ctx, filter); rejecting { - ws.WriteJSON(nostr.NoticeEnvelope(msg)) - continue - } - } - - // run the functions to count (generally it will be just one) - for _, count := range rl.CountEvents { - res, err := count(ctx, filter) - if err != nil { - ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) - } - total += res - } + total += rl.handleCountRequest(ctx, ws, filters[i]) } ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}}) @@ -205,57 +183,13 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { eose.Add(len(request[2:])) for i, filterReq := range request[2:] { - if err := json.Unmarshal( - filterReq, - &filters[i], - ); err != nil { + if err := json.Unmarshal(filterReq, &filters[i]); err != nil { ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter")) eose.Done() continue } - filter := filters[i] - - // overwrite the filter (for example, to eliminate some kinds or - // that we know we don't support) - for _, ovw := range rl.OverwriteFilter { - ovw(ctx, &filter) - } - - // then check if we'll reject this filter (we apply this after overwriting - // because we may, for example, remove some things from the incoming filters - // that we know we don't support, and then if the end result is an empty - // filter we can just reject it) - for _, reject := range rl.RejectFilter { - if rejecting, msg := reject(ctx, filter); rejecting { - ws.WriteJSON(nostr.NoticeEnvelope(msg)) - continue - } - } - - // run the functions to query events (generally just one, - // but we might be fetching stuff from multiple places) - eose.Add(len(rl.QueryEvents)) - for _, query := range rl.QueryEvents { - ch, err := query(ctx, filter) - if err != nil { - ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) - eose.Done() - continue - } - - go func(ch chan *nostr.Event) { - for event := range ch { - for _, ovw := range rl.OverwriteResponseEvent { - ovw(ctx, event) - } - ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) - } - eose.Done() - }(ch) - } - - eose.Done() + go rl.handleRequest(ctx, id, &eose, ws, filters[i]) } go func() { @@ -306,7 +240,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { case <-ticker.C: err := ws.WriteMessage(websocket.PingMessage, nil) if err != nil { - if err.Error() != "use of closed network connection" { + if !strings.HasSuffix(err.Error(), "use of closed network connection") { rl.Log.Printf("error writing ping: %v; closing websocket\n", err) } return diff --git a/serve-req.go b/serve-req.go new file mode 100644 index 0000000..c1c3bba --- /dev/null +++ b/serve-req.go @@ -0,0 +1,78 @@ +package khatru + +import ( + "context" + "sync" + + "github.com/nbd-wtf/go-nostr" +) + +func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, filter nostr.Filter) { + // overwrite the filter (for example, to eliminate some kinds or + // that we know we don't support) + for _, ovw := range rl.OverwriteFilter { + ovw(ctx, &filter) + } + + // then check if we'll reject this filter (we apply this after overwriting + // because we may, for example, remove some things from the incoming filters + // that we know we don't support, and then if the end result is an empty + // filter we can just reject it) + for _, reject := range rl.RejectFilter { + if reject, msg := reject(ctx, filter); reject { + ws.WriteJSON(nostr.NoticeEnvelope(msg)) + return + } + } + + // run the functions to query events (generally just one, + // but we might be fetching stuff from multiple places) + eose.Add(len(rl.QueryEvents)) + for _, query := range rl.QueryEvents { + ch, err := query(ctx, filter) + if err != nil { + ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) + eose.Done() + continue + } + + go func(ch chan *nostr.Event) { + for event := range ch { + for _, ovw := range rl.OverwriteResponseEvent { + ovw(ctx, event) + } + ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) + } + eose.Done() + }(ch) + } + + eose.Done() +} + +func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 { + // overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support) + for _, ovw := range rl.OverwriteCountFilter { + ovw(ctx, &filter) + } + + // then check if we'll reject this filter + for _, reject := range rl.RejectCountFilter { + if rejecting, msg := reject(ctx, filter); rejecting { + ws.WriteJSON(nostr.NoticeEnvelope(msg)) + return 0 + } + } + + // run the functions to count (generally it will be just one) + var subtotal int64 = 0 + for _, count := range rl.CountEvents { + res, err := count(ctx, filter) + if err != nil { + ws.WriteJSON(nostr.NoticeEnvelope(err.Error())) + } + subtotal += res + } + + return subtotal +}