nip-40 expiration manager.

This commit is contained in:
fiatjaf
2025-01-13 00:05:41 -03:00
parent e1de0432fe
commit d27f582a0b
5 changed files with 151 additions and 14 deletions

View File

@@ -106,6 +106,9 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast
for _, ons := range rl.OnEventSaved {
ons(ctx, evt)
}
// track event expiration if applicable
rl.expirationManager.trackEvent(evt)
}
return false, nil

133
expiration.go Normal file
View File

@@ -0,0 +1,133 @@
package khatru
import (
"container/heap"
"context"
"sync"
"time"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip40"
)
type expiringEvent struct {
id string
expiresAt nostr.Timestamp
}
type expiringEventHeap []expiringEvent
func (h expiringEventHeap) Len() int { return len(h) }
func (h expiringEventHeap) Less(i, j int) bool { return h[i].expiresAt < h[j].expiresAt }
func (h expiringEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *expiringEventHeap) Push(x interface{}) {
*h = append(*h, x.(expiringEvent))
}
func (h *expiringEventHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type ExpirationManager struct {
events expiringEventHeap
mu sync.Mutex
relay *Relay
initialScanDone bool
}
func newExpirationManager(relay *Relay) *ExpirationManager {
return &ExpirationManager{
events: make(expiringEventHeap, 0),
relay: relay,
}
}
func (em *ExpirationManager) start(ctx context.Context) {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !em.initialScanDone {
em.initialScan(ctx)
em.initialScanDone = true
}
em.checkExpiredEvents(ctx)
}
}
}
func (em *ExpirationManager) initialScan(ctx context.Context) {
em.mu.Lock()
defer em.mu.Unlock()
// query all events
for _, query := range em.relay.QueryEvents {
ch, err := query(ctx, nostr.Filter{})
if err != nil {
continue
}
for evt := range ch {
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
heap.Push(&em.events, expiringEvent{
id: evt.ID,
expiresAt: expiresAt,
})
}
}
}
heap.Init(&em.events)
}
func (em *ExpirationManager) checkExpiredEvents(ctx context.Context) {
em.mu.Lock()
defer em.mu.Unlock()
now := nostr.Now()
// keep deleting events from the heap as long as they're expired
for em.events.Len() > 0 {
next := em.events[0]
if now < next.expiresAt {
break
}
heap.Pop(&em.events)
for _, query := range em.relay.QueryEvents {
ch, err := query(ctx, nostr.Filter{IDs: []string{next.id}})
if err != nil {
continue
}
if evt := <-ch; evt != nil {
for _, del := range em.relay.DeleteEvent {
del(ctx, evt)
}
}
break
}
}
}
func (em *ExpirationManager) trackEvent(evt *nostr.Event) {
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
em.mu.Lock()
heap.Push(&em.events, expiringEvent{
id: evt.ID,
expiresAt: expiresAt,
})
em.mu.Unlock()
}
}

6
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/fasthttp/websocket v1.5.7
github.com/fiatjaf/eventstore v0.14.2
github.com/liamg/magic v0.0.1
github.com/nbd-wtf/go-nostr v0.43.0
github.com/nbd-wtf/go-nostr v0.46.0
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.10.0
@@ -21,6 +21,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
@@ -33,9 +34,6 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.4.0 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/jmoiron/sqlx v1.4.0 // indirect

13
go.sum
View File

@@ -21,6 +21,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -60,12 +62,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs=
github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ=
@@ -117,8 +113,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM=
github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU=
github.com/nbd-wtf/go-nostr v0.46.0 h1:aR+xXEC6MPutNMIRhNdi+2iBPEHW7SO10sFaOAVSz3Y=
github.com/nbd-wtf/go-nostr v0.46.0/go.mod h1:xVNOqkn0GImeTmaF6VDwgYsuSkfG3yrIbd0dT6NZDIQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -184,7 +180,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -17,13 +17,15 @@ import (
)
func NewRelay() *Relay {
ctx := context.Background()
rl := &Relay{
Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags),
Info: &nip11.RelayInformationDocument{
Software: "https://github.com/fiatjaf/khatru",
Version: "n/a",
SupportedNIPs: []any{1, 11, 42, 70, 86},
SupportedNIPs: []any{1, 11, 40, 42, 70, 86},
},
upgrader: websocket.Upgrader{
@@ -43,6 +45,9 @@ func NewRelay() *Relay {
MaxMessageSize: 512000,
}
rl.expirationManager = newExpirationManager(rl)
go rl.expirationManager.start(ctx)
return rl
}
@@ -108,6 +113,9 @@ type Relay struct {
PongWait time.Duration // Time allowed to read the next pong message from the peer.
PingPeriod time.Duration // Send pings to peer with this period. Must be less than pongWait.
MaxMessageSize int64 // Maximum message size allowed from peer.
// NIP-40 expiration manager
expirationManager *ExpirationManager
}
func (rl *Relay) getBaseURL(r *http.Request) string {