Compare commits

...

6 Commits

Author SHA1 Message Date
fiatjaf
583f712fe4 admin: normalize urls for nip86 checking. 2025-04-17 08:02:25 -03:00
Jon Staab
28b1061166 Reject deleted events 2025-04-16 18:55:36 -03:00
Jon Staab
25f19ce46e Store and serve delete events 2025-04-16 18:55:28 -03:00
fiatjaf
33545587b6 make it so ephemeral events respond with ok:false if no one is listening. 2025-04-14 09:24:34 -03:00
Kay
214371f8bd refactor(adding): check kind range with proper function. 2025-04-13 09:05:23 -03:00
fiatjaf
fbb40f3b74 use .Find() instead of .GetFirst() everywhere. 2025-04-04 23:07:18 -03:00
13 changed files with 238 additions and 115 deletions

176
adding.go
View File

@@ -11,34 +11,90 @@ import (
// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket.
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if evt == nil {
return false, errors.New("error: event is nil")
}
if nostr.IsEphemeralKind(evt.Kind) {
return false, rl.handleEphemeral(ctx, evt)
} else {
return rl.handleNormal(ctx, evt)
}
}
func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
for _, reject := range rl.RejectEvent {
if reject, msg := reject(ctx, evt); reject {
if msg == "" {
return false, errors.New("blocked: no reason")
return true, errors.New("blocked: no reason")
} else {
return false, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
return true, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
}
}
}
if 20000 <= evt.Kind && evt.Kind < 30000 {
// do not store ephemeral events
for _, oee := range rl.OnEphemeralEvent {
oee(ctx, evt)
// Check to see if the event has been deleted by ID
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{
Kinds: []int{5},
Tags: nostr.TagMap{"#e": []string{evt.ID}},
})
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
return true, errors.New("blocked: this event has been deleted")
}
// will store
// regular kinds are just saved directly
if nostr.IsRegularKind(evt.Kind) {
for _, store := range rl.StoreEvent {
if err := store(ctx, evt); err != nil {
switch err {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
}
}
}
} else {
// will store
// regular kinds are just saved directly
if nostr.IsRegularKind(evt.Kind) {
for _, store := range rl.StoreEvent {
if err := store(ctx, evt); err != nil {
// Check to see if the event has been deleted by address
for _, query := range rl.QueryEvents {
dTagValue := ""
for _, tag := range evt.Tags {
if len(tag) > 0 && tag[0] == "d" {
dTagValue = tag[1]
break
}
}
address := fmt.Sprintf("%d:%s:%s", evt.Kind, evt.PubKey, dTagValue)
ch, err := query(ctx, nostr.Filter{
Kinds: []int{5},
Since: &evt.CreatedAt,
Tags: nostr.TagMap{"#a": []string{address}},
})
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
return true, errors.New("blocked: this event has been deleted")
}
// otherwise it's a replaceable -- so we'll use the replacer functions if we have any
if len(rl.ReplaceEvent) > 0 {
for _, repl := range rl.ReplaceEvent {
if err := repl(ctx, evt); err != nil {
switch err {
case eventstore.ErrDupEvent:
return true, nil
@@ -48,68 +104,54 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast
}
}
} else {
// otherwise it's a replaceable -- so we'll use the replacer functions if we have any
if len(rl.ReplaceEvent) > 0 {
for _, repl := range rl.ReplaceEvent {
if err := repl(ctx, evt); err != nil {
switch err {
// otherwise do it the manual way
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
if nostr.IsAddressableKind(evt.Kind) {
// when addressable, add the "d" tag to the filter
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
}
// now we fetch old events and delete them
shouldStore := true
for _, query := range rl.QueryEvents {
ch, err := query(ctx, filter)
if err != nil {
continue
}
for previous := range ch {
if isOlder(previous, evt) {
for _, del := range rl.DeleteEvent {
del(ctx, previous)
}
} else {
// we found a more recent event, so we won't delete it and also will not store this new one
shouldStore = false
}
}
}
// store
if shouldStore {
for _, store := range rl.StoreEvent {
if saveErr := store(ctx, evt); saveErr != nil {
switch saveErr {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
}
}
}
} else {
// otherwise do it the manual way
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
if nostr.IsAddressableKind(evt.Kind) {
// when addressable, add the "d" tag to the filter
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
}
// now we fetch old events and delete them
shouldStore := true
for _, query := range rl.QueryEvents {
ch, err := query(ctx, filter)
if err != nil {
continue
}
for previous := range ch {
if isOlder(previous, evt) {
for _, del := range rl.DeleteEvent {
del(ctx, previous)
}
} else {
// we found a more recent event, so we won't delete it and also will not store this new one
shouldStore = false
}
}
}
// store
if shouldStore {
for _, store := range rl.StoreEvent {
if saveErr := store(ctx, evt); saveErr != nil {
switch saveErr {
case eventstore.ErrDupEvent:
return true, nil
default:
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error"))
}
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error"))
}
}
}
}
}
for _, ons := range rl.OnEventSaved {
ons(ctx, evt)
}
// track event expiration if applicable
rl.expirationManager.trackEvent(evt)
}
for _, ons := range rl.OnEventSaved {
ons(ctx, evt)
}
// track event expiration if applicable
rl.expirationManager.trackEvent(evt)
return false, nil
}

View File

@@ -25,7 +25,7 @@ func (bs BlossomServer) handleUploadCheck(w http.ResponseWriter, r *http.Request
blossomError(w, "missing \"Authorization\" header", 401)
return
}
if auth.Tags.GetFirst([]string{"t", "upload"}) == nil {
if auth.Tags.FindWithValue("t", "upload") == nil {
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
return
}
@@ -59,7 +59,7 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
blossomError(w, "missing \"Authorization\" header", 401)
return
}
if auth.Tags.GetFirst([]string{"t", "upload"}) == nil {
if auth.Tags.FindWithValue("t", "upload") == nil {
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
return
}
@@ -163,13 +163,13 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) {
// if there is one, we check if it has the extra requirements
if auth != nil {
if auth.Tags.GetFirst([]string{"t", "get"}) == nil {
if auth.Tags.FindWithValue("t", "get") == nil {
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
return
}
if auth.Tags.GetFirst([]string{"x", hhash}) == nil &&
auth.Tags.GetFirst([]string{"server", bs.ServiceURL}) == nil {
if auth.Tags.FindWithValue("x", hhash) == nil &&
auth.Tags.FindWithValue("server", bs.ServiceURL) == nil {
blossomError(w, "invalid \"Authorization\" event \"x\" or \"server\" tag", 403)
return
}
@@ -239,7 +239,7 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) {
// if there is one, we check if it has the extra requirements
if auth != nil {
if auth.Tags.GetFirst([]string{"t", "list"}) == nil {
if auth.Tags.FindWithValue("t", "list") == nil {
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
return
}
@@ -283,7 +283,7 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
}
if auth != nil {
if auth.Tags.GetFirst([]string{"t", "delete"}) == nil {
if auth.Tags.FindWithValue("t", "delete") == nil {
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
return
}
@@ -296,8 +296,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
return
}
hhash = hhash[1:]
if auth.Tags.GetFirst([]string{"x", hhash}) == nil &&
auth.Tags.GetFirst([]string{"server", bs.ServiceURL}) == nil {
if auth.Tags.FindWithValue("x", hhash) == nil &&
auth.Tags.FindWithValue("server", bs.ServiceURL) == nil {
blossomError(w, "invalid \"Authorization\" event \"x\" or \"server\" tag", 403)
return
}

View File

@@ -6,6 +6,6 @@ import (
// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt *nostr.Event) {
rl.notifyListeners(evt)
func (rl *Relay) BroadcastEvent(evt *nostr.Event) int {
return rl.notifyListeners(evt)
}

View File

@@ -47,7 +47,7 @@ router.Route().
return true
case event.Kind <= 12 && event.Kind >= 9:
return true
case event.Tags.GetFirst([]string{"h", ""}) != nil:
case event.Tags.Find("h") != nil:
return true
default:
return false

26
ephemeral.go Normal file
View File

@@ -0,0 +1,26 @@
package khatru
import (
"context"
"errors"
"github.com/nbd-wtf/go-nostr"
)
func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error {
for _, reject := range rl.RejectEvent {
if reject, msg := reject(ctx, evt); reject {
if msg == "" {
return errors.New("blocked: no reason")
} else {
return errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
}
}
}
for _, oee := range rl.OnEphemeralEvent {
oee(ctx, evt)
}
return nil
}

View File

@@ -13,7 +13,7 @@ func main() {
relay := khatru.NewRelay()
db := lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"}
os.MkdirAll(db.Path, 0755)
os.MkdirAll(db.Path, 0o755)
if err := db.Init(); err != nil {
panic(err)
}

View File

@@ -16,7 +16,7 @@ func main() {
relay := khatru.NewRelay()
db := lmdb.LMDBBackend{Path: "/tmp/exclusive"}
os.MkdirAll(db.Path, 0755)
os.MkdirAll(db.Path, 0o755)
if err := db.Init(); err != nil {
panic(err)
}

View File

@@ -52,7 +52,7 @@ func main() {
return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam")
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 1 && event.Tags.GetFirst([]string{"t", "spam"}) != nil
return event.Kind == 1 && event.Tags.FindWithValue("t", "spam") != nil
}).
Relay(r2)

View File

@@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"net/http"
"strconv"
"strings"
"sync"
"time"
@@ -141,6 +142,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
continue
}
// this is safe because ReadMessage() will always create a new slice
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
// parse messages sequentially otherwise sonic breaks
@@ -215,9 +217,16 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
if env.Event.Kind == 5 {
// this always returns "blocked: " whenever it returns an error
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
} else {
// this will also always return a prefixed reason
skipBroadcast, writeErr = srl.AddEvent(ctx, &env.Event)
}
if writeErr == nil {
if nostr.IsEphemeralKind(env.Event.Kind) {
// this will also always return a prefixed reason
writeErr = srl.handleEphemeral(ctx, &env.Event)
} else {
// this will also always return a prefixed reason
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
}
}
var reason string
@@ -227,9 +236,20 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ovw(ctx, &env.Event)
}
if !skipBroadcast {
srl.notifyListeners(&env.Event)
n := srl.notifyListeners(&env.Event)
// the number of notified listeners matters in ephemeral events
if nostr.IsEphemeralKind(env.Event.Kind) {
if n == 0 {
ok = false
reason = "mute: no one was listening for this"
} else {
reason = "broadcasted to " + strconv.Itoa(n) + " listeners"
}
}
}
} else {
ok = false
reason = writeErr.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
@@ -244,14 +264,13 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
var total int64
var hll *hyperloglog.HyperLogLog
uneligibleForHLL := false
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(env.Filter)
}
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 && !uneligibleForHLL {
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 {
total, hll = srl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset)
} else {
total = srl.handleCountRequest(ctx, ws, env.Filter)

View File

@@ -132,15 +132,20 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
delete(rl.clients, ws)
}
func (rl *Relay) notifyListeners(event *nostr.Event) {
// returns how many listeners were notified
func (rl *Relay) notifyListeners(event *nostr.Event) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
if listener.filter.Matches(event) {
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
return
continue listenersloop
}
}
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
count++
}
}
return count
}

View File

@@ -86,10 +86,11 @@ func (rl *Relay) HandleNIP86(w http.ResponseWriter, r *http.Request) {
goto respond
}
if uTag := evt.Tags.GetFirst([]string{"u", ""}); uTag == nil || rl.getBaseURL(r) != (*uTag)[1] {
resp.Error = "invalid 'u' tag"
if uTag := evt.Tags.Find("u"); uTag == nil || nostr.NormalizeURL(rl.getBaseURL(r)) != nostr.NormalizeURL(uTag[1]) {
resp.Error = fmt.Sprintf("invalid 'u' tag, got '%s', expected '%s'",
nostr.NormalizeURL(rl.getBaseURL(r)), nostr.NormalizeURL(uTag[1]))
goto respond
} else if pht := evt.Tags.GetFirst([]string{"payload", hex.EncodeToString(payloadHash[:])}); pht == nil {
} else if pht := evt.Tags.FindWithValue("payload", hex.EncodeToString(payloadHash[:])); pht == nil {
resp.Error = "invalid auth event payload hash"
goto respond
} else if evt.CreatedAt < nostr.Now()-30 {

View File

@@ -2,7 +2,6 @@ package policies
import (
"context"
"slices"
"github.com/fiatjaf/khatru"

View File

@@ -151,33 +151,64 @@ func TestBasicRelayFunctionality(t *testing.T) {
t.Fatalf("failed to publish deletion event: %v", err)
}
// Try to query the deleted event
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt3.ID},
}})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
defer sub.Unsub()
{
// Try to query the deleted event
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
IDs: []string{evt3.ID},
}})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
defer sub.Unsub()
// Should get EOSE without receiving the deleted event
gotEvent := false
for {
select {
case <-sub.Events:
gotEvent = true
case <-sub.EndOfStoredEvents:
if gotEvent {
t.Error("should not have received deleted event")
// Should get EOSE without receiving the deleted event
gotEvent := false
DeletedLoop:
for {
select {
case <-sub.Events:
gotEvent = true
case <-sub.EndOfStoredEvents:
if gotEvent {
t.Error("should not have received deleted event")
}
break DeletedLoop
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE")
}
}
}
{
// Try to query the deletion itself
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
Kinds: []int{5},
}})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
defer sub.Unsub()
// Should get EOSE without receiving the deleted event
gotEvent := false
DeletionLoop:
for {
select {
case <-sub.Events:
gotEvent = true
case <-sub.EndOfStoredEvents:
if !gotEvent {
t.Error("should have received deletion event")
}
break DeletionLoop
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE")
}
return
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE")
}
}
})
// test 4: teplaceable events
// test 4: replaceable events
t.Run("replaceable events", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()