From d27f582a0bf1fa52a93ce275c0bffdfe2eb1070c Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 13 Jan 2025 00:05:41 -0300 Subject: [PATCH] nip-40 expiration manager. --- adding.go | 3 ++ expiration.go | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 6 +-- go.sum | 13 ++--- relay.go | 10 +++- 5 files changed, 151 insertions(+), 14 deletions(-) create mode 100644 expiration.go diff --git a/adding.go b/adding.go index 86b3612..caa021a 100644 --- a/adding.go +++ b/adding.go @@ -106,6 +106,9 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast for _, ons := range rl.OnEventSaved { ons(ctx, evt) } + + // track event expiration if applicable + rl.expirationManager.trackEvent(evt) } return false, nil diff --git a/expiration.go b/expiration.go new file mode 100644 index 0000000..7c2173b --- /dev/null +++ b/expiration.go @@ -0,0 +1,133 @@ +package khatru + +import ( + "container/heap" + "context" + "sync" + "time" + + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip40" +) + +type expiringEvent struct { + id string + expiresAt nostr.Timestamp +} + +type expiringEventHeap []expiringEvent + +func (h expiringEventHeap) Len() int { return len(h) } +func (h expiringEventHeap) Less(i, j int) bool { return h[i].expiresAt < h[j].expiresAt } +func (h expiringEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *expiringEventHeap) Push(x interface{}) { + *h = append(*h, x.(expiringEvent)) +} + +func (h *expiringEventHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type ExpirationManager struct { + events expiringEventHeap + mu sync.Mutex + relay *Relay + initialScanDone bool +} + +func newExpirationManager(relay *Relay) *ExpirationManager { + return &ExpirationManager{ + events: make(expiringEventHeap, 0), + relay: relay, + } +} + +func (em *ExpirationManager) start(ctx context.Context) { + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !em.initialScanDone { + em.initialScan(ctx) + em.initialScanDone = true + } + + em.checkExpiredEvents(ctx) + } + } +} + +func (em *ExpirationManager) initialScan(ctx context.Context) { + em.mu.Lock() + defer em.mu.Unlock() + + // query all events + for _, query := range em.relay.QueryEvents { + ch, err := query(ctx, nostr.Filter{}) + if err != nil { + continue + } + + for evt := range ch { + if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 { + heap.Push(&em.events, expiringEvent{ + id: evt.ID, + expiresAt: expiresAt, + }) + } + } + } + + heap.Init(&em.events) +} + +func (em *ExpirationManager) checkExpiredEvents(ctx context.Context) { + em.mu.Lock() + defer em.mu.Unlock() + + now := nostr.Now() + + // keep deleting events from the heap as long as they're expired + for em.events.Len() > 0 { + next := em.events[0] + if now < next.expiresAt { + break + } + + heap.Pop(&em.events) + + for _, query := range em.relay.QueryEvents { + ch, err := query(ctx, nostr.Filter{IDs: []string{next.id}}) + if err != nil { + continue + } + + if evt := <-ch; evt != nil { + for _, del := range em.relay.DeleteEvent { + del(ctx, evt) + } + } + break + } + } +} + +func (em *ExpirationManager) trackEvent(evt *nostr.Event) { + if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 { + em.mu.Lock() + heap.Push(&em.events, expiringEvent{ + id: evt.ID, + expiresAt: expiresAt, + }) + em.mu.Unlock() + } +} diff --git a/go.mod b/go.mod index fc86117..fed1fc4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/fasthttp/websocket v1.5.7 github.com/fiatjaf/eventstore v0.14.2 github.com/liamg/magic v0.0.1 - github.com/nbd-wtf/go-nostr v0.43.0 + github.com/nbd-wtf/go-nostr v0.46.0 github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/rs/cors v1.11.1 github.com/stretchr/testify v1.10.0 @@ -21,6 +21,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/coder/websocket v1.8.12 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect @@ -33,9 +34,6 @@ require ( github.com/fatih/structs v1.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gobwas/httphead v0.1.0 // indirect - github.com/gobwas/pool v0.2.1 // indirect - github.com/gobwas/ws v1.4.0 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/jmoiron/sqlx v1.4.0 // indirect diff --git a/go.sum b/go.sum index e14f2f4..357c7d2 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -60,12 +62,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= -github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= -github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= -github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= -github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs= -github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= @@ -117,8 +113,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM= -github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU= +github.com/nbd-wtf/go-nostr v0.46.0 h1:aR+xXEC6MPutNMIRhNdi+2iBPEHW7SO10sFaOAVSz3Y= +github.com/nbd-wtf/go-nostr v0.46.0/go.mod h1:xVNOqkn0GImeTmaF6VDwgYsuSkfG3yrIbd0dT6NZDIQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -184,7 +180,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/relay.go b/relay.go index 2deba4b..48b7a07 100644 --- a/relay.go +++ b/relay.go @@ -17,13 +17,15 @@ import ( ) func NewRelay() *Relay { + ctx := context.Background() + rl := &Relay{ Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags), Info: &nip11.RelayInformationDocument{ Software: "https://github.com/fiatjaf/khatru", Version: "n/a", - SupportedNIPs: []any{1, 11, 42, 70, 86}, + SupportedNIPs: []any{1, 11, 40, 42, 70, 86}, }, upgrader: websocket.Upgrader{ @@ -43,6 +45,9 @@ func NewRelay() *Relay { MaxMessageSize: 512000, } + rl.expirationManager = newExpirationManager(rl) + go rl.expirationManager.start(ctx) + return rl } @@ -108,6 +113,9 @@ type Relay struct { PongWait time.Duration // Time allowed to read the next pong message from the peer. PingPeriod time.Duration // Send pings to peer with this period. Must be less than pongWait. MaxMessageSize int64 // Maximum message size allowed from peer. + + // NIP-40 expiration manager + expirationManager *ExpirationManager } func (rl *Relay) getBaseURL(r *http.Request) string {