rethink relayer.

This commit is contained in:
fiatjaf
2023-08-07 21:22:57 -03:00
commit 58ee39df8a
49 changed files with 4454 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.env
rss-bridge

24
LICENSE Normal file
View File

@@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <https://unlicense.org>

1
README.md Normal file
View File

@@ -0,0 +1 @@
khatru

134
add-event.go Normal file
View File

@@ -0,0 +1,134 @@
package khatru
import (
"context"
"fmt"
"github.com/nbd-wtf/go-nostr"
)
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
if evt == nil {
return fmt.Errorf("event is nil")
}
msg := ""
rejecting := false
for _, reject := range rl.RejectEvent {
rejecting, msg = reject(ctx, evt)
if rejecting {
break
}
}
if rejecting {
if msg == "" {
msg = "no reason"
}
return fmt.Errorf(msg)
}
if 20000 <= evt.Kind && evt.Kind < 30000 {
// do not store ephemeral events
} else {
if evt.Kind == 0 || evt.Kind == 3 || (10000 <= evt.Kind && evt.Kind < 20000) {
// replaceable event, delete before storing
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}})
if err != nil {
continue
}
previous := <-ch
if previous != nil {
for _, del := range rl.DeleteEvent {
del(ctx, previous)
}
}
}
} else if 30000 <= evt.Kind && evt.Kind < 40000 {
// parameterized replaceable event, delete before storing
d := evt.Tags.GetFirst([]string{"d", ""})
if d != nil {
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}, Tags: nostr.TagMap{"d": []string{d.Value()}}})
if err != nil {
continue
}
previous := <-ch
if previous != nil {
for _, del := range rl.DeleteEvent {
del(ctx, previous)
}
}
}
}
}
// store
for _, store := range rl.StoreEvent {
if saveErr := store(ctx, evt); saveErr != nil {
switch saveErr {
case ErrDupEvent:
return nil
default:
errmsg := saveErr.Error()
if nip20prefixmatcher.MatchString(errmsg) {
return saveErr
} else {
return fmt.Errorf("error: failed to save (%s)", errmsg)
}
}
}
}
for _, ons := range rl.OnEventSaved {
ons(ctx, evt)
}
}
notifyListeners(evt)
return nil
}
func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) error {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
// first we fetch the event
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{IDs: []string{tag[1]}})
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
// got the event, now check if the user can delete it
acceptDeletion := target.PubKey == evt.PubKey
var msg string
if acceptDeletion == false {
msg = "you are not the author of this event"
}
// but if we have a function to overwrite this outcome, use that instead
for _, odo := range rl.OverwriteDeletionOutcome {
acceptDeletion, msg = odo(ctx, target, evt)
}
if acceptDeletion {
// delete it
for _, del := range rl.DeleteEvent {
del(ctx, target)
}
} else {
// fail and stop here
return fmt.Errorf("blocked: %s", msg)
}
// don't try to query this same event again
break
}
}
}
return nil
}

5
errors.go Normal file
View File

@@ -0,0 +1,5 @@
package khatru
import "fmt"
var ErrDupEvent = fmt.Errorf("duplicate: event already exists")

View File

@@ -0,0 +1,26 @@
package main
import (
"fmt"
"net/http"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins/storage/badgern"
)
func main() {
relay := khatru.NewRelay()
db := badgern.BadgerBackend{Path: "/tmp/khatru-badgern-tmp"}
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}

View File

@@ -0,0 +1,26 @@
package main
import (
"fmt"
"net/http"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins/storage/elasticsearch"
)
func main() {
relay := khatru.NewRelay()
db := elasticsearch.ElasticsearchStorage{URL: ""}
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}

View File

@@ -0,0 +1,28 @@
package main
import (
"fmt"
"net/http"
"os"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins/storage/lmdbn"
)
func main() {
relay := khatru.NewRelay()
db := lmdbn.LMDBBackend{Path: "/tmp/khatru-lmdbn-tmp"}
os.MkdirAll(db.Path, 0755)
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}

View File

@@ -0,0 +1,26 @@
package main
import (
"fmt"
"net/http"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins/storage/postgresql"
)
func main() {
relay := khatru.NewRelay()
db := postgresql.PostgresBackend{DatabaseURL: "postgresql://localhost:5432/tmp-khatru-relay"}
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}

View File

@@ -0,0 +1,26 @@
package main
import (
"fmt"
"net/http"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins/storage/sqlite3"
)
func main() {
relay := khatru.NewRelay()
db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"}
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}

View File

@@ -0,0 +1,40 @@
package main
import (
"context"
"fmt"
"net/http"
"os"
"github.com/fiatjaf/khatru"
"github.com/fiatjaf/khatru/plugins"
"github.com/fiatjaf/khatru/plugins/storage/lmdbn"
"github.com/nbd-wtf/go-nostr"
)
func main() {
relay := khatru.NewRelay()
db := lmdbn.LMDBBackend{Path: "/tmp/exclusive"}
os.MkdirAll(db.Path, 0755)
if err := db.Init(); err != nil {
panic(err)
}
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.RejectEvent = append(relay.RejectEvent, plugins.PreventTooManyIndexableTags(10))
relay.RejectFilter = append(relay.RejectFilter, plugins.NoPrefixFilters, plugins.NoComplexFilters)
relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) {
})
fmt.Println("running on :3334")
http.ListenAndServe(":3334", relay)
}
func deleteStuffThatCanBeFoundElsewhere() {
}

60
go.mod Normal file
View File

@@ -0,0 +1,60 @@
module github.com/fiatjaf/khatru
go 1.20
require (
github.com/aquasecurity/esquery v0.2.0
github.com/bmatsuo/lmdb-go v1.8.0
github.com/dgraph-io/badger/v4 v4.1.0
github.com/elastic/go-elasticsearch/v8 v8.6.0
github.com/fasthttp/websocket v1.5.3
github.com/gobwas/ws v1.2.0
github.com/jmoiron/sqlx v1.3.1
github.com/lib/pq v1.10.3
github.com/mattn/go-sqlite3 v1.14.6
github.com/nbd-wtf/go-nostr v0.19.5
github.com/rs/cors v1.7.0
github.com/stretchr/testify v1.8.2
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect
github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/puzpuzpuz/xsync v1.5.2 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.47.0 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

193
go.sum Normal file
View File

@@ -0,0 +1,193 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA=
github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao=
github.com/bmatsuo/lmdb-go v1.8.0 h1:ohf3Q4xjXZBKh4AayUY4bb2CXuhRAI8BYGlJq08EfNA=
github.com/bmatsuo/lmdb-go v1.8.0/go.mod h1:wWPZmKdOAZsl4qOqkowQ1aCrFie1HU8gWloHMCeAUdM=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ=
github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8=
github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4=
github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k=
github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek=
github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.0 h1:u0p9s3xLYpZCA1z5JgCkMeB34CKCMMQbM+G8Ii7YD0I=
github.com/gobwas/ws v1.2.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI=
github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs=
github.com/jgroeneveld/trial v2.0.0+incompatible h1:d59ctdgor+VqdZCAiUfVN8K13s0ALDioG5DWwZNtRuQ=
github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M=
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/nbd-wtf/go-nostr v0.19.5 h1:BdNTIKVJOyxRJwxzlblJqV7sja4pmCYFBXQdCDVowhI=
github.com/nbd-wtf/go-nostr v0.19.5/go.mod h1:F9y6+M8askJCjilLgMC3rD0moA6UtG1MCnyClNYXeys=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY=
github.com/puzpuzpuz/xsync v1.5.2/go.mod h1:K98BYhX3k1dQ2M63t1YNVDanbwUPmBCAhNmVrrxfiGg=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM=
github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c=
github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

335
handlers.go Normal file
View File

@@ -0,0 +1,335 @@
package khatru
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/fasthttp/websocket"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11"
"github.com/nbd-wtf/go-nostr/nip42"
)
// ServeHTTP implements http.Handler interface.
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
rl.HandleWebsocket(w, r)
} else if r.Header.Get("Accept") == "application/nostr+json" {
rl.HandleNIP11(w, r)
} else {
rl.serveMux.ServeHTTP(w, r)
}
}
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
conn, err := rl.upgrader.Upgrade(w, r, nil)
if err != nil {
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
return
}
rl.clientsMu.Lock()
defer rl.clientsMu.Unlock()
rl.clients[conn] = struct{}{}
ticker := time.NewTicker(rl.PingPeriod)
// NIP-42 challenge
challenge := make([]byte, 8)
rand.Read(challenge)
ws := &WebSocket{
conn: conn,
Challenge: hex.EncodeToString(challenge),
WaitingForAuth: make(chan struct{}),
}
// reader
go func() {
defer func() {
ticker.Stop()
rl.clientsMu.Lock()
if _, ok := rl.clients[conn]; ok {
conn.Close()
delete(rl.clients, conn)
removeListener(ws)
}
rl.clientsMu.Unlock()
}()
conn.SetReadLimit(rl.MaxMessageSize)
conn.SetReadDeadline(time.Now().Add(rl.PongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(rl.PongWait))
return nil
})
for _, onconnect := range rl.OnConnect {
onconnect(ctx)
}
for {
typ, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
) {
rl.Log.Printf("unexpected close error from %s: %v\n", r.Header.Get("X-Forwarded-For"), err)
}
break
}
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
go func(message []byte) {
ctx = context.Background()
var request []json.RawMessage
if err := json.Unmarshal(message, &request); err != nil {
// stop silently
return
}
if len(request) < 2 {
ws.WriteJSON(nostr.NoticeEnvelope("request has less than 2 parameters"))
return
}
var typ string
json.Unmarshal(request[0], &typ)
switch typ {
case "EVENT":
// it's a new event
var evt nostr.Event
if err := json.Unmarshal(request[1], &evt); err != nil {
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode event: " + err.Error()))
return
}
// check serialization
serialized := evt.Serialize()
// assign ID
hash := sha256.Sum256(serialized)
evt.ID = hex.EncodeToString(hash[:])
// check signature (requires the ID to be set)
if ok, err := evt.CheckSignature(); err != nil {
reason := "error: failed to verify signature"
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason})
return
} else if !ok {
reason := "invalid: signature is invalid"
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason})
return
}
var ok bool
if evt.Kind == 5 {
err = rl.handleDeleteRequest(ctx, &evt)
} else {
err = rl.AddEvent(ctx, &evt)
}
var reason *string
if err == nil {
ok = true
} else {
msg := err.Error()
reason = &msg
}
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: ok, Reason: reason})
case "COUNT":
if rl.CountEvents == nil {
ws.WriteJSON(nostr.NoticeEnvelope("this relay does not support NIP-45"))
return
}
var id string
json.Unmarshal(request[1], &id)
if id == "" {
ws.WriteJSON(nostr.NoticeEnvelope("COUNT has no <id>"))
return
}
var total int64
filters := make(nostr.Filters, len(request)-2)
for i, filterReq := range request[2:] {
if err := json.Unmarshal(filterReq, &filters[i]); err != nil {
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
continue
}
filter := filters[i]
for _, reject := range rl.RejectFilter {
if rejecting, msg := reject(ctx, filter); rejecting {
ws.WriteJSON(nostr.NoticeEnvelope(msg))
continue
}
}
for _, reject := range rl.RejectCountFilter {
if rejecting, msg := reject(ctx, filter); rejecting {
ws.WriteJSON(nostr.NoticeEnvelope(msg))
continue
}
}
for _, count := range rl.CountEvents {
res, err := count(ctx, filter)
if err != nil {
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
}
total += res
}
}
ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}})
case "REQ":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
ws.WriteJSON(nostr.NoticeEnvelope("REQ has no <id>"))
return
}
filters := make(nostr.Filters, len(request)-2)
eose := sync.WaitGroup{}
eose.Add(len(request[2:]))
for i, filterReq := range request[2:] {
if err := json.Unmarshal(
filterReq,
&filters[i],
); err != nil {
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
eose.Done()
continue
}
filter := filters[i]
for _, reject := range rl.RejectCountFilter {
if rejecting, msg := reject(ctx, filter); rejecting {
ws.WriteJSON(nostr.NoticeEnvelope(msg))
eose.Done()
continue
}
}
eose.Add(len(rl.QueryEvents))
for _, query := range rl.QueryEvents {
ch, err := query(ctx, filter)
if err != nil {
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
eose.Done()
continue
}
go func(ch chan *nostr.Event) {
for event := range ch {
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
}
eose.Done()
}(ch)
}
eose.Done()
}
go func() {
eose.Wait()
ws.WriteJSON(nostr.EOSEEnvelope(id))
}()
setListener(id, ws, filters)
case "CLOSE":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
ws.WriteJSON(nostr.NoticeEnvelope("CLOSE has no <id>"))
return
}
removeListenerId(ws, id)
case "AUTH":
if rl.ServiceURL != "" {
var evt nostr.Event
if err := json.Unmarshal(request[1], &evt); err != nil {
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode auth event: " + err.Error()))
return
}
if pubkey, ok := nip42.ValidateAuthEvent(&evt, ws.Challenge, rl.ServiceURL); ok {
ws.Authed = pubkey
close(ws.WaitingForAuth)
ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, pubkey)
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: true})
} else {
reason := "error: failed to authenticate"
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: &reason})
}
}
}
}(message)
}
}()
// writer
go func() {
defer func() {
ticker.Stop()
conn.Close()
}()
for {
select {
case <-ticker.C:
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
return
}
}
}
}()
}
func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
supportedNIPs := []int{9, 11, 12, 15, 16, 20, 33}
if rl.ServiceURL != "" {
supportedNIPs = append(supportedNIPs, 42)
}
if rl.CountEvents != nil {
supportedNIPs = append(supportedNIPs, 45)
}
info := nip11.RelayInformationDocument{
Name: rl.Name,
Description: rl.Description,
PubKey: rl.PubKey,
Contact: rl.Contact,
SupportedNIPs: supportedNIPs,
Software: "https://github.com/trailriver/khatru",
Version: "n/a",
}
for _, edit := range rl.EditInformation {
edit(r.Context(), &info)
}
json.NewEncoder(w).Encode(info)
}

94
listener.go Normal file
View File

@@ -0,0 +1,94 @@
package khatru
import (
"sync"
"github.com/nbd-wtf/go-nostr"
)
type Listener struct {
filters nostr.Filters
}
var (
listeners = make(map[*WebSocket]map[string]*Listener)
listenersMutex = sync.Mutex{}
)
func GetListeningFilters() nostr.Filters {
respfilters := make(nostr.Filters, 0, len(listeners)*2)
listenersMutex.Lock()
defer listenersMutex.Unlock()
// here we go through all the existing listeners
for _, connlisteners := range listeners {
for _, listener := range connlisteners {
for _, listenerfilter := range listener.filters {
for _, respfilter := range respfilters {
// check if this filter specifically is already added to respfilters
if nostr.FilterEqual(listenerfilter, respfilter) {
goto nextconn
}
}
// field not yet present on respfilters, add it
respfilters = append(respfilters, listenerfilter)
// continue to the next filter
nextconn:
continue
}
}
}
// respfilters will be a slice with all the distinct filter we currently have active
return respfilters
}
func setListener(id string, ws *WebSocket, filters nostr.Filters) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
subs, ok := listeners[ws]
if !ok {
subs = make(map[string]*Listener)
listeners[ws] = subs
}
subs[id] = &Listener{filters: filters}
}
// Remove a specific subscription id from listeners for a given ws client
func removeListenerId(ws *WebSocket, id string) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
if subs, ok := listeners[ws]; ok {
delete(listeners[ws], id)
if len(subs) == 0 {
delete(listeners, ws)
}
}
}
// Remove WebSocket conn from listeners
func removeListener(ws *WebSocket) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
delete(listeners, ws)
}
func notifyListeners(event *nostr.Event) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
for ws, subs := range listeners {
for id, listener := range subs {
if !listener.filters.Match(event) {
continue
}
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
}
}
}

22
plugins/events.go Normal file
View File

@@ -0,0 +1,22 @@
package plugins
import (
"context"
"github.com/nbd-wtf/go-nostr"
)
func PreventTooManyIndexableTags(max int) func(context.Context, *nostr.Event) (bool, string) {
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
ntags := 0
for _, tag := range event.Tags {
if len(tag) > 0 && len(tag[0]) == 1 {
ntags++
}
}
if ntags > max {
return true, "too many indexable tags"
}
return false, ""
}
}

33
plugins/filters.go Normal file
View File

@@ -0,0 +1,33 @@
package plugins
import (
"context"
"fmt"
"github.com/nbd-wtf/go-nostr"
)
func NoPrefixFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
for _, id := range filter.IDs {
if len(id) != 64 {
return true, fmt.Sprintf("filters can only contain full ids")
}
}
for _, pk := range filter.Authors {
if len(pk) != 64 {
return true, fmt.Sprintf("filters can only contain full pubkeys")
}
}
return false, ""
}
func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
items := len(filter.Tags) + len(filter.Kinds)
if items > 4 && len(filter.Tags) > 2 {
return true, "too many things to filter for"
}
return false, ""
}

37
plugins/nip04.go Normal file
View File

@@ -0,0 +1,37 @@
package plugins
import (
"context"
"github.com/fiatjaf/khatru"
"github.com/nbd-wtf/go-nostr"
"golang.org/x/exp/slices"
)
func rejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, string) {
// prevent kind-4 events from being returned to unauthed users,
// only when authentication is a thing
if !slices.Contains(filter.Kinds, 4) {
return false, ""
}
ws := khatru.GetConnection(ctx)
senders := filter.Authors
receivers, _ := filter.Tags["p"]
switch {
case ws.Authed == "":
// not authenticated
return true, "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?"
case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.Authed):
// allowed filter: ws.authed is sole sender (filter specifies one or all receivers)
return false, ""
case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.Authed):
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
return false, ""
default:
// restricted filter: do not return any events,
// even if other elements in filters array were not restricted).
// client should know better.
return true, "restricted: authenticated user does not have authorization for requested filters."
}
}

View File

@@ -0,0 +1,83 @@
package badgern
import (
"context"
"encoding/binary"
"github.com/dgraph-io/badger/v4"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
var count int64 = 0
queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter)
if err != nil {
return 0, err
}
err = b.View(func(txn *badger.Txn) error {
// iterate only through keys and in reverse order
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.Reverse = true
// actually iterate
for _, q := range queries {
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() {
item := it.Item()
key := item.Key()
if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset])
if createdAt < since {
break
}
}
idx := make([]byte, 5)
idx[0] = rawEventStorePrefix
copy(idx[1:], key[idxOffset:])
// fetch actual event
item, err := txn.Get(idx)
if err != nil {
if err == badger.ErrDiscardedTxn {
return err
}
panic(err)
}
if extraFilter == nil {
count++
} else {
err = item.Value(func(val []byte) error {
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
return err
}
// check if this matches the other filters that were not part of the index
if extraFilter == nil || extraFilter.Matches(evt) {
count++
}
return nil
})
if err != nil {
panic(err)
}
}
}
}
return nil
})
return count, err
}

View File

@@ -0,0 +1,78 @@
package badgern
import (
"context"
"encoding/hex"
"github.com/dgraph-io/badger/v4"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b *BadgerBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
deletionHappened := false
err := b.Update(func(txn *badger.Txn) error {
idx := make([]byte, 1, 5)
idx[0] = rawEventStorePrefix
// query event by id to get its idx
id, _ := hex.DecodeString(evt.ID)
prefix := make([]byte, 1+32)
copy(prefix[1:], id)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
it.Seek(prefix)
if it.ValidForPrefix(prefix) {
// the key is the last 32 bytes
idx = append(idx, it.Item().Key()[1+32:]...)
}
it.Close()
// if no idx was found, end here, this event doesn't exist
if len(idx) == 1 {
return nil
}
// fetch the event
item, err := txn.Get(idx)
if err != nil {
return err
}
item.Value(func(val []byte) error {
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
return err
}
// set this so we'll run the GC later
deletionHappened = true
// calculate all index keys we have for this event and delete them
for _, k := range getIndexKeysForEvent(evt, idx[1:]) {
if err := txn.Delete(k); err != nil {
return err
}
}
// delete the raw event
return txn.Delete(idx)
})
return nil
})
if err != nil {
return err
}
// after deleting, run garbage collector
if deletionHappened {
if err := b.RunValueLogGC(0.8); err != nil {
panic(err)
}
}
return nil
}

View File

@@ -0,0 +1,159 @@
package badgern
import (
"encoding/binary"
"encoding/hex"
"github.com/dgraph-io/badger/v4"
"github.com/nbd-wtf/go-nostr"
)
const (
rawEventStorePrefix byte = 0
indexCreatedAtPrefix byte = 1
indexIdPrefix byte = 2
indexKindPrefix byte = 3
indexPubkeyPrefix byte = 4
indexPubkeyKindPrefix byte = 5
indexTagPrefix byte = 6
)
type BadgerBackend struct {
Path string
MaxLimit int
*badger.DB
seq *badger.Sequence
}
func (b *BadgerBackend) Init() error {
db, err := badger.Open(badger.DefaultOptions(b.Path))
if err != nil {
return err
}
b.DB = db
b.seq, err = db.GetSequence([]byte("events"), 1000)
if err != nil {
return err
}
if b.MaxLimit == 0 {
b.MaxLimit = 500
}
// DEBUG: inspecting keys on startup
// db.View(func(txn *badger.Txn) error {
// opts := badger.DefaultIteratorOptions
// opts.PrefetchSize = 10
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// item := it.Item()
// k := item.Key()
// err := item.Value(func(v []byte) error {
// fmt.Println("key:", k)
// return nil
// })
// if err != nil {
// return err
// }
// }
// return nil
// })
return nil
}
func (b BadgerBackend) Close() {
b.DB.Close()
b.seq.Release()
}
func (b BadgerBackend) Serial() []byte {
v, _ := b.seq.Next()
vb := make([]byte, 5)
vb[0] = rawEventStorePrefix
binary.BigEndian.PutUint32(vb[1:], uint32(v))
return vb
}
func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte {
keys := make([][]byte, 0, 18)
// indexes
{
// ~ by id
id, _ := hex.DecodeString(evt.ID)
k := make([]byte, 1+32+4)
k[0] = indexIdPrefix
copy(k[1:], id)
copy(k[1+32:], idx)
keys = append(keys, k)
}
{
// ~ by pubkey+date
pubkey, _ := hex.DecodeString(evt.PubKey)
k := make([]byte, 1+32+4+4)
k[0] = indexPubkeyPrefix
copy(k[1:], pubkey)
binary.BigEndian.PutUint32(k[1+32:], uint32(evt.CreatedAt))
copy(k[1+32+4:], idx)
keys = append(keys, k)
}
{
// ~ by kind+date
k := make([]byte, 1+2+4+4)
k[0] = indexKindPrefix
binary.BigEndian.PutUint16(k[1:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[1+2:], uint32(evt.CreatedAt))
copy(k[1+2+4:], idx)
keys = append(keys, k)
}
{
// ~ by pubkey+kind+date
pubkey, _ := hex.DecodeString(evt.PubKey)
k := make([]byte, 1+32+2+4+4)
k[0] = indexPubkeyKindPrefix
copy(k[1:], pubkey)
binary.BigEndian.PutUint16(k[1+32:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[1+32+2:], uint32(evt.CreatedAt))
copy(k[1+32+2+4:], idx)
keys = append(keys, k)
}
// ~ by tagvalue+date
for _, tag := range evt.Tags {
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
continue
}
var v []byte
if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 {
// store value as bytes
v = vb
} else {
v = []byte(tag[1])
}
k := make([]byte, 1+len(v)+4+4)
k[0] = indexTagPrefix
copy(k[1:], v)
binary.BigEndian.PutUint32(k[1+len(v):], uint32(evt.CreatedAt))
copy(k[1+len(v)+4:], idx)
keys = append(keys, k)
}
{
// ~ by date only
k := make([]byte, 1+4+4)
k[0] = indexCreatedAtPrefix
binary.BigEndian.PutUint32(k[1:], uint32(evt.CreatedAt))
copy(k[1+4:], idx)
keys = append(keys, k)
}
return keys
}

View File

@@ -0,0 +1,318 @@
package badgern
import (
"container/heap"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/dgraph-io/badger/v4"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
type query struct {
i int
prefix []byte
startingPoint []byte
results chan *nostr.Event
skipTimestamp bool
}
type queryEvent struct {
*nostr.Event
query int
}
func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter)
if err != nil {
return nil, err
}
go func() {
err := b.View(func(txn *badger.Txn) error {
// iterate only through keys and in reverse order
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.Reverse = true
// actually iterate
iteratorClosers := make([]func(), len(queries))
for i, q := range queries {
go func(i int, q query) {
it := txn.NewIterator(opts)
iteratorClosers[i] = it.Close
defer close(q.results)
for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() {
item := it.Item()
key := item.Key()
if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset])
if createdAt < since {
break
}
}
idx := make([]byte, 5)
idx[0] = rawEventStorePrefix
copy(idx[1:], key[idxOffset:])
// fetch actual event
item, err := txn.Get(idx)
if err != nil {
if err == badger.ErrDiscardedTxn {
return
}
panic(err)
}
err = item.Value(func(val []byte) error {
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
return err
}
// check if this matches the other filters that were not part of the index
if extraFilter == nil || extraFilter.Matches(evt) {
q.results <- evt
}
return nil
})
if err != nil {
panic(err)
}
}
}(i, q)
}
// max number of events we'll return
limit := b.MaxLimit
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
// receive results and ensure we only return the most recent ones always
emittedEvents := 0
// first pass
emitQueue := make(priorityQueue, 0, len(queries)+limit)
for _, q := range queries {
evt, ok := <-q.results
if ok {
emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i})
}
}
// now it's a good time to schedule this
defer func() {
close(ch)
for _, itclose := range iteratorClosers {
itclose()
}
}()
// queue may be empty here if we have literally nothing
if len(emitQueue) == 0 {
return nil
}
heap.Init(&emitQueue)
// iterate until we've emitted all events required
for {
// emit latest event in queue
latest := emitQueue[0]
ch <- latest.Event
// stop when reaching limit
emittedEvents++
if emittedEvents == limit {
break
}
// fetch a new one from query results and replace the previous one with it
if evt, ok := <-queries[latest.query].results; ok {
emitQueue[0].Event = evt
heap.Fix(&emitQueue, 0)
} else {
// if this query has no more events we just remove this and proceed normally
heap.Remove(&emitQueue, 0)
// check if the list is empty and end
if len(emitQueue) == 0 {
break
}
}
}
return nil
})
if err != nil {
panic(err)
}
}()
return ch, nil
}
type priorityQueue []*queryEvent
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].CreatedAt > pq[j].CreatedAt
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *priorityQueue) Push(x any) {
item := x.(*queryEvent)
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*pq = old[0 : n-1]
return item
}
func prepareQueries(filter nostr.Filter) (
queries []query,
extraFilter *nostr.Filter,
since uint32,
prefixLen int,
idxOffset int,
err error,
) {
var index byte
if len(filter.IDs) > 0 {
index = indexIdPrefix
queries = make([]query, len(filter.IDs))
for i, idHex := range filter.IDs {
prefix := make([]byte, 1+32)
prefix[0] = index
id, _ := hex.DecodeString(idHex)
if len(id) != 32 {
return nil, nil, 0, 0, 0, fmt.Errorf("invalid id '%s'", idHex)
}
copy(prefix[1:], id)
queries[i] = query{i: i, prefix: prefix, skipTimestamp: true}
}
} else if len(filter.Authors) > 0 {
if len(filter.Kinds) == 0 {
index = indexPubkeyPrefix
queries = make([]query, len(filter.Authors))
for i, pubkeyHex := range filter.Authors {
pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 {
continue
}
prefix := make([]byte, 1+32)
prefix[0] = index
copy(prefix[1:], pubkey)
queries[i] = query{i: i, prefix: prefix}
}
} else {
index = indexPubkeyKindPrefix
queries = make([]query, len(filter.Authors)*len(filter.Kinds))
i := 0
for _, pubkeyHex := range filter.Authors {
for _, kind := range filter.Kinds {
pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 {
return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
}
prefix := make([]byte, 1+32+2)
prefix[0] = index
copy(prefix[1:], pubkey)
binary.BigEndian.PutUint16(prefix[1+32:], uint16(kind))
queries[i] = query{i: i, prefix: prefix}
i++
}
}
}
extraFilter = &nostr.Filter{Tags: filter.Tags}
} else if len(filter.Tags) > 0 {
index = indexTagPrefix
queries = make([]query, len(filter.Tags))
extraFilter = &nostr.Filter{Kinds: filter.Kinds}
i := 0
for _, values := range filter.Tags {
for _, value := range values {
bv, _ := hex.DecodeString(value)
var size int
if len(bv) == 32 {
// hex tag
size = 32
} else {
// string tag
bv = []byte(value)
size = len(bv)
}
prefix := make([]byte, 1+size)
prefix[0] = index
copy(prefix[1:], bv)
queries[i] = query{i: i, prefix: prefix}
i++
}
}
} else if len(filter.Kinds) > 0 {
index = indexKindPrefix
queries = make([]query, len(filter.Kinds))
for i, kind := range filter.Kinds {
prefix := make([]byte, 1+2)
prefix[0] = index
binary.BigEndian.PutUint16(prefix[1:], uint16(kind))
queries[i] = query{i: i, prefix: prefix}
}
} else {
index = indexCreatedAtPrefix
queries = make([]query, 1)
prefix := make([]byte, 1)
prefix[0] = index
queries[0] = query{i: 0, prefix: prefix}
extraFilter = nil
}
prefixLen = len(queries[0].prefix)
if index == indexIdPrefix {
idxOffset = prefixLen
} else {
idxOffset = prefixLen + 4
}
var until uint32 = 4294967295
if filter.Until != nil {
if fu := uint32(*filter.Until); fu < until {
until = fu + 1
}
}
for i, q := range queries {
queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until))
queries[i].results = make(chan *nostr.Event, 12)
}
// this is where we'll end the iteration
if filter.Since != nil {
if fs := uint32(*filter.Since); fs > since {
since = fs
}
}
return queries, extraFilter, since, prefixLen, idxOffset, nil
}

View File

@@ -0,0 +1,32 @@
package badgern
import (
"context"
"github.com/dgraph-io/badger/v4"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b *BadgerBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
return b.Update(func(txn *badger.Txn) error {
nson, err := nson.Marshal(evt)
if err != nil {
return err
}
idx := b.Serial()
// raw event store
if err := txn.Set(idx, []byte(nson)); err != nil {
return err
}
for _, k := range getIndexKeysForEvent(evt, idx[1:]) {
if err := txn.Set(k, nil); err != nil {
return err
}
}
return nil
})
}

View File

@@ -0,0 +1,182 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
type IndexedEvent struct {
Event nostr.Event `json:"event"`
ContentSearch string `json:"content_search"`
}
var indexMapping = `
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"dynamic": false,
"properties": {
"event": {
"dynamic": false,
"properties": {
"id": {"type": "keyword"},
"pubkey": {"type": "keyword"},
"kind": {"type": "integer"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"}
}
},
"content_search": {"type": "text"}
}
}
}
`
type ElasticsearchStorage struct {
URL string
IndexName string
es *elasticsearch.Client
bi esutil.BulkIndexer
}
func (ess *ElasticsearchStorage) Init() error {
if ess.IndexName == "" {
ess.IndexName = "events"
}
cfg := elasticsearch.Config{}
if ess.URL != "" {
cfg.Addresses = strings.Split(ess.URL, ",")
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return err
}
res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
if err != nil {
return err
}
if res.IsError() {
body, _ := io.ReadAll(res.Body)
txt := string(body)
if !strings.Contains(txt, "resource_already_exists_exception") {
return fmt.Errorf("%s", txt)
}
}
// bulk indexer
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: ess.IndexName,
Client: es,
NumWorkers: 2,
FlushInterval: 3 * time.Second,
})
if err != nil {
return fmt.Errorf("error creating the indexer: %s", err)
}
ess.es = es
ess.bi = bi
return nil
}
func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
done := make(chan error)
err := ess.bi.Add(
ctx,
esutil.BulkIndexerItem{
Action: "delete",
DocumentID: evt.ID,
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
// ok if deleted item not found
if res.Status == 404 {
close(done)
return
}
txt, _ := json.Marshal(res)
err := fmt.Errorf("ERROR: %s", txt)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}
func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error {
ie := &IndexedEvent{
Event: *evt,
}
// post processing: index for FTS
// some ideas:
// - index kind=0 fields a set of dedicated mapped fields
// (or use a separate index for profiles with a dedicated mapping)
// - if it's valid JSON just index the "values" and not the keys
// - more content introspection: language detection
// - denormalization... attach profile + ranking signals to events
if evt.Kind != 4 {
ie.ContentSearch = evt.Content
}
data, err := json.Marshal(ie)
if err != nil {
return err
}
done := make(chan error)
// adapted from:
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
err = ess.bi.Add(
ctx,
esutil.BulkIndexerItem{
Action: "index",
DocumentID: evt.ID,
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}

View File

@@ -0,0 +1,261 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"reflect"
"github.com/aquasecurity/esquery"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
type EsSearchResult struct {
Took int
TimedOut bool `json:"timed_out"`
Hits struct {
Total struct {
Value int
Relation string
}
Hits []struct {
Source IndexedEvent `json:"_source"`
}
}
}
type EsCountResult struct {
Count int64
}
func buildDsl(filter nostr.Filter) ([]byte, error) {
dsl := esquery.Bool()
prefixFilter := func(fieldName string, values []string) {
if len(values) == 0 {
return
}
prefixQ := esquery.Bool()
for _, v := range values {
if len(v) < 64 {
prefixQ.Should(esquery.Prefix(fieldName, v))
} else {
prefixQ.Should(esquery.Term(fieldName, v))
}
}
dsl.Must(prefixQ)
}
// ids
prefixFilter("event.id", filter.IDs)
// authors
prefixFilter("event.pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...))
}
// tags
if len(filter.Tags) > 0 {
tagQ := esquery.Bool()
for char, terms := range filter.Tags {
vs := toInterfaceSlice(append(terms, char))
tagQ.Should(esquery.Terms("event.tags", vs...))
}
dsl.Must(tagQ)
}
// since
if filter.Since != nil {
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since))
}
// until
if filter.Until != nil {
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until))
}
// search
if filter.Search != "" {
dsl.Must(esquery.Match("content_search", filter.Search))
}
return json.Marshal(esquery.Query(dsl))
}
func (ess *ElasticsearchStorage) getByID(filter nostr.Filter) ([]*nostr.Event, error) {
got, err := ess.es.Mget(
esutil.NewJSONReader(filter),
ess.es.Mget.WithIndex(ess.IndexName))
if err != nil {
return nil, err
}
var mgetResponse struct {
Docs []struct {
Found bool
Source IndexedEvent `json:"_source"`
}
}
if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil {
return nil, err
}
events := make([]*nostr.Event, 0, len(mgetResponse.Docs))
for _, e := range mgetResponse.Docs {
if e.Found {
events = append(events, &e.Source.Event)
}
}
return events, nil
}
func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
// optimization: get by id
if isGetByID(filter) {
if evts, err := ess.getByID(filter); err == nil {
for _, evt := range evts {
ch <- evt
}
close(ch)
} else {
return nil, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return nil, err
}
limit := 1000
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
es := ess.es
res, err := es.Search(
es.Search.WithContext(ctx),
es.Search.WithIndex(ess.IndexName),
es.Search.WithBody(bytes.NewReader(dsl)),
es.Search.WithSize(limit),
es.Search.WithSort("event.created_at:desc"),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return nil, fmt.Errorf("%s", txt)
}
var r EsSearchResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
go func() {
for _, e := range r.Hits.Hits {
ch <- &e.Source.Event
}
close(ch)
}()
return ch, nil
}
func isGetByID(filter nostr.Filter) bool {
isGetById := len(filter.IDs) > 0 &&
len(filter.Authors) == 0 &&
len(filter.Kinds) == 0 &&
len(filter.Tags) == 0 &&
len(filter.Search) == 0 &&
filter.Since == nil &&
filter.Until == nil
if isGetById {
for _, id := range filter.IDs {
if len(id) != 64 {
return false
}
}
}
return isGetById
}
// from: https://stackoverflow.com/a/12754757
func toInterfaceSlice(slice interface{}) []interface{} {
s := reflect.ValueOf(slice)
if s.Kind() != reflect.Slice {
panic("InterfaceSlice() given a non-slice type")
}
// Keep the distinction between nil and empty slice input
if s.IsNil() {
return nil
}
ret := make([]interface{}, s.Len())
for i := 0; i < s.Len(); i++ {
ret[i] = s.Index(i).Interface()
}
return ret
}
func (ess *ElasticsearchStorage) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
count := int64(0)
// optimization: get by id
if isGetByID(filter) {
if evts, err := ess.getByID(filter); err == nil {
count += int64(len(evts))
} else {
return 0, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return 0, err
}
es := ess.es
res, err := es.Count(
es.Count.WithContext(ctx),
es.Count.WithIndex(ess.IndexName),
es.Count.WithBody(bytes.NewReader(dsl)),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return 0, fmt.Errorf("%s", txt)
}
var r EsCountResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return 0, err
}
return r.Count + count, nil
}

View File

@@ -0,0 +1,43 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"testing"
"github.com/nbd-wtf/go-nostr"
)
func TestQuery(t *testing.T) {
now := nostr.Now()
yesterday := now - 60*60*24
filter := &nostr.Filter{
IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
Kinds: []int{0, 1},
Tags: nostr.TagMap{
"e": []string{"abc"},
"p": []string{"aaa", "bbb"},
},
Since: &yesterday,
Until: &now,
Limit: 100,
Search: "other stuff",
}
dsl, err := buildDsl(filter)
if err != nil {
t.Fatal(err)
}
pprint(dsl)
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
}
}

View File

@@ -0,0 +1,91 @@
package lmdbn
import (
"bytes"
"context"
"encoding/binary"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
var count int64 = 0
dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter)
if err != nil {
return 0, err
}
err = b.lmdbEnv.View(func(txn *lmdb.Txn) error {
// actually iterate
for _, q := range queries {
cursor, err := txn.OpenCursor(dbi)
if err != nil {
continue
}
var k []byte
var idx []byte
var iterr error
if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(err)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) {
break
}
if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(k[prefixLen:])
if createdAt < since {
break
}
}
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}
if extraFilter == nil {
count++
} else {
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
return err
}
// check if this matches the other filters that were not part of the index
if extraFilter == nil || extraFilter.Matches(evt) {
count++
}
return nil
}
// move one back (we'll look into k and v and err in the next iteration)
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}
return nil
})
return count, err
}

View File

@@ -0,0 +1,50 @@
package lmdbn
import (
"context"
"encoding/hex"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b *LMDBBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
id, _ := hex.DecodeString(evt.ID)
idx, err := txn.Get(b.indexId, id)
if operr, ok := err.(*lmdb.OpError); ok && operr.Errno == lmdb.NotFound {
// we already do not have this
return nil
}
if err != nil {
return err
}
// fetch the event
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
return err
}
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
return err
}
// calculate all index keys we have for this event and delete them
for _, k := range b.getIndexKeysForEvent(evt) {
if err := txn.Del(k.dbi, k.key, nil); err != nil {
return err
}
}
// delete the raw event
return txn.Del(b.rawEventStore, idx, nil)
})
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,208 @@
package lmdbn
import (
"encoding/binary"
"encoding/hex"
"sync/atomic"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/nbd-wtf/go-nostr"
)
const (
maxuint16 = 65535
maxuint32 = 4294967295
)
type LMDBBackend struct {
Path string
MaxLimit int
lmdbEnv *lmdb.Env
rawEventStore lmdb.DBI
indexCreatedAt lmdb.DBI
indexId lmdb.DBI
indexKind lmdb.DBI
indexPubkey lmdb.DBI
indexPubkeyKind lmdb.DBI
indexTag lmdb.DBI
lastId atomic.Uint32
}
func (b *LMDBBackend) Init() error {
if b.MaxLimit == 0 {
b.MaxLimit = 500
}
// open lmdb
env, err := lmdb.NewEnv()
if err != nil {
return err
}
env.SetMaxDBs(7)
env.SetMaxReaders(500)
env.SetMapSize(1 << 38) // ~273GB
err = env.Open(b.Path, lmdb.NoTLS, 0644)
if err != nil {
return err
}
b.lmdbEnv = env
// open each db
if err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
if dbi, err := txn.OpenDBI("raw", lmdb.Create); err != nil {
return err
} else {
b.rawEventStore = dbi
return nil
}
}); err != nil {
return err
}
if err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
if dbi, err := txn.OpenDBI("created_at", lmdb.Create); err != nil {
return err
} else {
b.indexCreatedAt = dbi
}
if dbi, err := txn.OpenDBI("id", lmdb.Create); err != nil {
return err
} else {
b.indexId = dbi
}
if dbi, err := txn.OpenDBI("kind", lmdb.Create); err != nil {
return err
} else {
b.indexKind = dbi
}
if dbi, err := txn.OpenDBI("pubkey", lmdb.Create); err != nil {
return err
} else {
b.indexPubkey = dbi
}
if dbi, err := txn.OpenDBI("pubkeyKind", lmdb.Create); err != nil {
return err
} else {
b.indexPubkeyKind = dbi
}
if dbi, err := txn.OpenDBI("tag", lmdb.Create); err != nil {
return err
} else {
b.indexTag = dbi
}
return nil
}); err != nil {
return err
}
// get lastId
if err := b.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
cursor, err := txn.OpenCursor(b.rawEventStore)
if err != nil {
return err
}
defer cursor.Close()
k, _, err := cursor.Get(nil, nil, lmdb.Last)
if operr, ok := err.(*lmdb.OpError); ok && operr.Errno == lmdb.NotFound {
// nothing found, so we're at zero
return nil
}
if err != nil {
}
b.lastId.Store(binary.BigEndian.Uint32(k))
return nil
}); err != nil {
return err
}
return nil
}
func (b *LMDBBackend) Close() {
b.lmdbEnv.Close()
}
func (b *LMDBBackend) Serial() []byte {
v := b.lastId.Add(1)
vb := make([]byte, 4)
binary.BigEndian.PutUint32(vb[:], uint32(v))
return vb
}
type key struct {
dbi lmdb.DBI
key []byte
}
func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {
keys := make([]key, 0, 18)
// indexes
{
// ~ by id
k, _ := hex.DecodeString(evt.ID)
keys = append(keys, key{dbi: b.indexId, key: k})
}
{
// ~ by pubkey+date
pubkey, _ := hex.DecodeString(evt.PubKey)
k := make([]byte, 32+4)
copy(k[:], pubkey)
binary.BigEndian.PutUint32(k[32:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkey, key: k})
}
{
// ~ by kind+date
k := make([]byte, 2+4)
binary.BigEndian.PutUint16(k[:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexKind, key: k})
}
{
// ~ by pubkey+kind+date
pubkey, _ := hex.DecodeString(evt.PubKey)
k := make([]byte, 32+2+4)
copy(k[:], pubkey)
binary.BigEndian.PutUint16(k[32:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[32+2:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkeyKind, key: k})
}
// ~ by tagvalue+date
for _, tag := range evt.Tags {
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
continue
}
var v []byte
if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 {
// store value as bytes
v = vb
} else {
v = []byte(tag[1])
}
k := make([]byte, len(v)+4)
copy(k[:], v)
binary.BigEndian.PutUint32(k[len(v):], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexTag, key: k})
}
{
// ~ by date only
k := make([]byte, 4)
binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexCreatedAt, key: k})
}
return keys
}

View File

@@ -0,0 +1,314 @@
package lmdbn
import (
"bytes"
"container/heap"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
type query struct {
i int
prefix []byte
startingPoint []byte
results chan *nostr.Event
skipTimestamp bool
}
type queryEvent struct {
*nostr.Event
query int
}
func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter)
if err != nil {
return nil, err
}
go func() {
err := b.lmdbEnv.View(func(txn *lmdb.Txn) error {
// actually iterate
cursorClosers := make([]func(), len(queries))
for i, q := range queries {
go func(i int, q query) {
defer close(q.results)
cursor, err := txn.OpenCursor(dbi)
if err != nil {
return
}
cursorClosers[i] = cursor.Close
var k []byte
var idx []byte
var iterr error
if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(err)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
for {
select {
case <-ctx.Done():
break
default:
}
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) {
return
}
if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(k[prefixLen:])
if createdAt < since {
break
}
}
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}
evt := &nostr.Event{}
if err := nson.Unmarshal(string(val), evt); err != nil {
panic(err)
}
// check if this matches the other filters that were not part of the index
if extraFilter == nil || extraFilter.Matches(evt) {
q.results <- evt
}
// move one back (we'll look into k and v and err in the next iteration)
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}(i, q)
}
// max number of events we'll return
limit := b.MaxLimit
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
// receive results and ensure we only return the most recent ones always
emittedEvents := 0
// first pass
emitQueue := make(priorityQueue, 0, len(queries)+limit)
for _, q := range queries {
evt, ok := <-q.results
if ok {
emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i})
}
}
// now it's a good time to schedule this
defer func() {
close(ch)
for _, cclose := range cursorClosers {
cclose()
}
}()
// queue may be empty here if we have literally nothing
if len(emitQueue) == 0 {
return nil
}
heap.Init(&emitQueue)
// iterate until we've emitted all events required
for {
// emit latest event in queue
latest := emitQueue[0]
ch <- latest.Event
// stop when reaching limit
emittedEvents++
if emittedEvents >= limit {
break
}
// fetch a new one from query results and replace the previous one with it
if evt, ok := <-queries[latest.query].results; ok {
emitQueue[0].Event = evt
heap.Fix(&emitQueue, 0)
} else {
// if this query has no more events we just remove this and proceed normally
heap.Remove(&emitQueue, 0)
// check if the list is empty and end
if len(emitQueue) == 0 {
break
}
}
}
return nil
})
if err != nil {
panic(err)
}
}()
return ch, nil
}
type priorityQueue []*queryEvent
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].CreatedAt > pq[j].CreatedAt
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *priorityQueue) Push(x any) {
item := x.(*queryEvent)
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*pq = old[0 : n-1]
return item
}
func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
dbi lmdb.DBI,
queries []query,
extraFilter *nostr.Filter,
since uint32,
prefixLen int,
err error,
) {
if len(filter.IDs) > 0 {
dbi = b.indexId
queries = make([]query, len(filter.IDs))
for i, idHex := range filter.IDs {
prefix, _ := hex.DecodeString(idHex)
if len(prefix) != 32 {
return dbi, nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex)
}
queries[i] = query{i: i, prefix: prefix, skipTimestamp: true}
}
} else if len(filter.Authors) > 0 {
if len(filter.Kinds) == 0 {
dbi = b.indexPubkey
queries = make([]query, len(filter.Authors))
for i, pubkeyHex := range filter.Authors {
prefix, _ := hex.DecodeString(pubkeyHex)
if len(prefix) != 32 {
return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
}
queries[i] = query{i: i, prefix: prefix}
}
} else {
dbi = b.indexPubkeyKind
queries = make([]query, len(filter.Authors)*len(filter.Kinds))
i := 0
for _, pubkeyHex := range filter.Authors {
for _, kind := range filter.Kinds {
pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 {
return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
}
prefix := make([]byte, 32+2)
copy(prefix[:], pubkey)
binary.BigEndian.PutUint16(prefix[+32:], uint16(kind))
queries[i] = query{i: i, prefix: prefix}
i++
}
}
}
extraFilter = &nostr.Filter{Tags: filter.Tags}
} else if len(filter.Tags) > 0 {
dbi = b.indexTag
queries = make([]query, len(filter.Tags))
extraFilter = &nostr.Filter{Kinds: filter.Kinds}
i := 0
for _, values := range filter.Tags {
for _, value := range values {
bv, _ := hex.DecodeString(value)
var size int
if len(bv) == 32 {
// hex tag
size = 32
} else {
// string tag
bv = []byte(value)
size = len(bv)
}
prefix := make([]byte, size)
copy(prefix[:], bv)
queries[i] = query{i: i, prefix: prefix}
i++
}
}
} else if len(filter.Kinds) > 0 {
dbi = b.indexKind
queries = make([]query, len(filter.Kinds))
for i, kind := range filter.Kinds {
prefix := make([]byte, 2)
binary.BigEndian.PutUint16(prefix[:], uint16(kind))
queries[i] = query{i: i, prefix: prefix}
}
} else {
dbi = b.indexCreatedAt
queries = make([]query, 1)
prefix := make([]byte, 0)
queries[0] = query{i: 0, prefix: prefix}
extraFilter = nil
}
prefixLen = len(queries[0].prefix)
var until uint32 = 4294967295
if filter.Until != nil {
if fu := uint32(*filter.Until); fu < until {
until = fu + 1
}
}
for i, q := range queries {
queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until))
queries[i].results = make(chan *nostr.Event, 12)
}
// this is where we'll end the iteration
if filter.Since != nil {
if fs := uint32(*filter.Since); fs > since {
since = fs
}
}
return dbi, queries, extraFilter, since, prefixLen, nil
}

View File

@@ -0,0 +1,38 @@
package lmdbn
import (
"context"
"fmt"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nson"
)
func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
// sanity checking
if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 {
return fmt.Errorf("event with values out of expected boundaries")
}
return b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
nson, err := nson.Marshal(evt)
if err != nil {
return err
}
idx := b.Serial()
// raw event store
if err := txn.Put(b.rawEventStore, idx, []byte(nson), 0); err != nil {
return err
}
for _, k := range b.getIndexKeysForEvent(evt) {
if err := txn.Put(k.dbi, k.key, idx, 0); err != nil {
return err
}
}
return nil
})
}

View File

@@ -0,0 +1,12 @@
package postgresql
import (
"context"
"github.com/nbd-wtf/go-nostr"
)
func (b PostgresBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
_, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1", evt.ID)
return err
}

View File

@@ -0,0 +1,71 @@
package postgresql
import (
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/lib/pq"
)
const (
queryLimit = 100
queryIDsLimit = 500
queryAuthorsLimit = 500
queryKindsLimit = 10
queryTagsLimit = 10
)
func (b *PostgresBackend) Init() error {
db, err := sqlx.Connect("postgres", b.DatabaseURL)
if err != nil {
return err
}
// sqlx default is 0 (unlimited), while postgresql by default accepts up to 100 connections
db.SetMaxOpenConns(80)
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
b.DB = db
_, err = b.DB.Exec(`
CREATE OR REPLACE FUNCTION tags_to_tagvalues(jsonb) RETURNS text[]
AS 'SELECT array_agg(t->>1) FROM (SELECT jsonb_array_elements($1) AS t)s WHERE length(t->>0) = 1;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE TABLE IF NOT EXISTS event (
id text NOT NULL,
pubkey text NOT NULL,
created_at integer NOT NULL,
kind integer NOT NULL,
tags jsonb NOT NULL,
content text NOT NULL,
sig text NOT NULL,
tagvalues text[] GENERATED ALWAYS AS (tags_to_tagvalues(tags)) STORED
);
CREATE UNIQUE INDEX IF NOT EXISTS ididx ON event USING btree (id text_pattern_ops);
CREATE INDEX IF NOT EXISTS pubkeyprefix ON event USING btree (pubkey text_pattern_ops);
CREATE INDEX IF NOT EXISTS timeidx ON event (created_at DESC);
CREATE INDEX IF NOT EXISTS kindidx ON event (kind);
CREATE INDEX IF NOT EXISTS arbitrarytagvalues ON event USING gin (tagvalues);
`)
if b.QueryLimit == 0 {
b.QueryLimit = queryLimit
}
if b.QueryIDsLimit == 0 {
b.QueryIDsLimit = queryIDsLimit
}
if b.QueryAuthorsLimit == 0 {
b.QueryAuthorsLimit = queryAuthorsLimit
}
if b.QueryKindsLimit == 0 {
b.QueryKindsLimit = queryKindsLimit
}
if b.QueryTagsLimit == 0 {
b.QueryTagsLimit = queryTagsLimit
}
return err
}

View File

@@ -0,0 +1,15 @@
package postgresql
import (
"github.com/jmoiron/sqlx"
)
type PostgresBackend struct {
*sqlx.DB
DatabaseURL string
QueryLimit int
QueryIDsLimit int
QueryAuthorsLimit int
QueryKindsLimit int
QueryTagsLimit int
}

View File

@@ -0,0 +1,193 @@
package postgresql
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"strconv"
"strings"
"github.com/jmoiron/sqlx"
"github.com/nbd-wtf/go-nostr"
)
func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch chan *nostr.Event, err error) {
ch = make(chan *nostr.Event)
query, params, err := b.queryEventsSql(filter, false)
if err != nil {
return nil, err
}
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
go func() {
defer rows.Close()
defer close(ch)
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return
}
evt.CreatedAt = nostr.Timestamp(timestamp)
ch <- &evt
}
}()
return ch, nil
}
func (b PostgresBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
query, params, err := b.queryEventsSql(filter, true)
if err != nil {
return 0, err
}
var count int64
if err = b.DB.QueryRow(query, params...).Scan(&count); err != nil && err != sql.ErrNoRows {
return 0, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
return count, nil
}
func (b PostgresBackend) queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) {
var conditions []string
var params []any
if filter.IDs != nil {
if len(filter.IDs) > b.QueryIDsLimit {
// too many ids, fail everything
return "", nil, nil
}
likeids := make([]string, 0, len(filter.IDs))
for _, id := range filter.IDs {
// to prevent sql attack here we will check if
// these ids are valid 32byte hex
parsed, err := hex.DecodeString(id)
if err != nil || len(parsed) != 32 {
continue
}
likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed))
}
if len(likeids) == 0 {
// ids being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")")
}
if filter.Authors != nil {
if len(filter.Authors) > b.QueryAuthorsLimit {
// too many authors, fail everything
return "", nil, nil
}
likekeys := make([]string, 0, len(filter.Authors))
for _, key := range filter.Authors {
// to prevent sql attack here we will check if
// these keys are valid 32byte hex
parsed, err := hex.DecodeString(key)
if err != nil || len(parsed) != 32 {
continue
}
likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed))
}
if len(likekeys) == 0 {
// authors being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")")
}
if filter.Kinds != nil {
if len(filter.Kinds) > b.QueryKindsLimit {
// too many kinds, fail everything
return "", nil, nil
}
if len(filter.Kinds) == 0 {
// kinds being [] mean you won't get anything
return "", nil, nil
}
// no sql injection issues since these are ints
inkinds := make([]string, len(filter.Kinds))
for i, kind := range filter.Kinds {
inkinds[i] = strconv.Itoa(kind)
}
conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`)
}
tagQuery := make([]string, 0, 1)
for _, values := range filter.Tags {
if len(values) == 0 {
// any tag set to [] is wrong
return "", nil, nil
}
// add these tags to the query
tagQuery = append(tagQuery, values...)
if len(tagQuery) > b.QueryTagsLimit {
// too many tags, fail everything
return "", nil, nil
}
}
if len(tagQuery) > 0 {
arrayBuild := make([]string, len(tagQuery))
for i, tagValue := range tagQuery {
arrayBuild[i] = "?"
params = append(params, tagValue)
}
// we use a very bad implementation in which we only check the tag values and
// ignore the tag names
conditions = append(conditions,
"tagvalues && ARRAY["+strings.Join(arrayBuild, ",")+"]")
}
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
params = append(params, filter.Until)
}
if len(conditions) == 0 {
// fallback
conditions = append(conditions, "true")
}
if filter.Limit < 1 || filter.Limit > b.QueryLimit {
params = append(params, b.QueryLimit)
} else {
params = append(params, filter.Limit)
}
var query string
if doCount {
query = sqlx.Rebind(sqlx.BindType("postgres"), `SELECT
COUNT(*)
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
} else {
query = sqlx.Rebind(sqlx.BindType("postgres"), `SELECT
id, pubkey, created_at, kind, tags, content, sig
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
}
return query, params, nil
}

View File

@@ -0,0 +1,405 @@
package postgresql
import (
"fmt"
"strconv"
"strings"
"testing"
"github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/assert"
)
var defaultBackend = PostgresBackend{
QueryLimit: queryLimit,
QueryIDsLimit: queryIDsLimit,
QueryAuthorsLimit: queryAuthorsLimit,
QueryKindsLimit: queryKindsLimit,
QueryTagsLimit: queryTagsLimit,
}
func TestQueryEventsSql(t *testing.T) {
var tests = []struct {
name string
backend PostgresBackend
filter *nostr.Filter
query string
params []any
err error
}{
{
name: "empty filter",
backend: defaultBackend,
filter: &nostr.Filter{},
query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1",
params: []any{100},
err: nil,
},
{
name: "valid filter limit",
backend: defaultBackend,
filter: &nostr.Filter{
Limit: 50,
},
query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1",
params: []any{50},
err: nil,
},
{
name: "too large filter limit",
backend: defaultBackend,
filter: &nostr.Filter{
Limit: 2000,
},
query: "SELECT id, pubkey, created_at, kind, tags, content, sig FROM event WHERE true ORDER BY created_at DESC LIMIT $1",
params: []any{100},
err: nil,
},
{
name: "ids filter",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: []string{"083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294"},
},
query: `SELECT id, pubkey, created_at, kind, tags, content, sig
FROM event
WHERE (id LIKE '083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294%')
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
{
name: "kind filter",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: []int{1, 2, 3},
},
query: `SELECT id, pubkey, created_at, kind, tags, content, sig
FROM event
WHERE kind IN(1,2,3)
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
{
name: "authors filter",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: []string{"7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229"},
},
query: `SELECT id, pubkey, created_at, kind, tags, content, sig
FROM event
WHERE (pubkey LIKE '7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229%')
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
// errors
{
name: "nil filter",
backend: defaultBackend,
filter: nil,
query: "",
params: nil,
err: fmt.Errorf("filter cannot be null"),
},
{
name: "too many ids",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: strSlice(501),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "invalid ids",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: []string{"stuff"},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many authors",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: strSlice(501),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "invalid authors",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: []string{"stuff"},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many kinds",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: intSlice(11),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "no kinds",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: []int{},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "tags of empty array",
backend: defaultBackend,
filter: &nostr.Filter{
Tags: nostr.TagMap{
"#e": []string{},
},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many tag values",
backend: defaultBackend,
filter: &nostr.Filter{
Tags: nostr.TagMap{
"#e": strSlice(11),
},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query, params, err := tt.backend.queryEventsSql(tt.filter, false)
assert.Equal(t, tt.err, err)
if err != nil {
return
}
assert.Equal(t, clean(tt.query), clean(query))
assert.Equal(t, tt.params, params)
})
}
}
func clean(s string) string {
return strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(s, "\t", ""), "\n", ""), " ", "")
}
func intSlice(n int) []int {
slice := make([]int, 0, n)
for i := 0; i < n; i++ {
slice = append(slice, i)
}
return slice
}
func strSlice(n int) []string {
slice := make([]string, 0, n)
for i := 0; i < n; i++ {
slice = append(slice, strconv.Itoa(i))
}
return slice
}
func TestCountEventsSql(t *testing.T) {
var tests = []struct {
name string
backend PostgresBackend
filter *nostr.Filter
query string
params []any
err error
}{
{
name: "empty filter",
backend: defaultBackend,
filter: &nostr.Filter{},
query: "SELECT COUNT(*) FROM event WHERE true ORDER BY created_at DESC LIMIT $1",
params: []any{100},
err: nil,
},
{
name: "ids filter",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: []string{"083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294"},
},
query: `SELECT COUNT(*)
FROM event
WHERE (id LIKE '083ec57f36a7b39ab98a57bedab4f85355b2ee89e4b205bed58d7c3ef9edd294%')
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
{
name: "kind filter",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: []int{1, 2, 3},
},
query: `SELECT COUNT(*)
FROM event
WHERE kind IN(1,2,3)
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
{
name: "authors filter",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: []string{"7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229"},
},
query: `SELECT COUNT(*)
FROM event
WHERE (pubkey LIKE '7bdef7bdebb8721f77927d0e77c66059360fa62371fdf15f3add93923a613229%')
ORDER BY created_at DESC LIMIT $1`,
params: []any{100},
err: nil,
},
// errors
{
name: "nil filter",
backend: defaultBackend,
filter: nil,
query: "",
params: nil,
err: fmt.Errorf("filter cannot be null"),
},
{
name: "too many ids",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: strSlice(501),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "invalid ids",
backend: defaultBackend,
filter: &nostr.Filter{
IDs: []string{"stuff"},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many authors",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: strSlice(501),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "invalid authors",
backend: defaultBackend,
filter: &nostr.Filter{
Authors: []string{"stuff"},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many kinds",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: intSlice(11),
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "no kinds",
backend: defaultBackend,
filter: &nostr.Filter{
Kinds: []int{},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "tags of empty array",
backend: defaultBackend,
filter: &nostr.Filter{
Tags: nostr.TagMap{
"#e": []string{},
},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
{
name: "too many tag values",
backend: defaultBackend,
filter: &nostr.Filter{
Tags: nostr.TagMap{
"#e": strSlice(11),
},
},
query: "",
params: nil,
// REVIEW: should return error
err: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query, params, err := tt.backend.queryEventsSql(tt.filter, true)
assert.Equal(t, tt.err, err)
if err != nil {
return
}
assert.Equal(t, clean(tt.query), clean(query))
assert.Equal(t, tt.params, params)
})
}
}

View File

@@ -0,0 +1,54 @@
package postgresql
import (
"context"
"encoding/json"
"github.com/fiatjaf/khatru"
"github.com/nbd-wtf/go-nostr"
)
func (b *PostgresBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
sql, params, _ := saveEventSql(evt)
res, err := b.DB.ExecContext(ctx, sql, params...)
if err != nil {
return err
}
nr, err := res.RowsAffected()
if err != nil {
return err
}
if nr == 0 {
return khatru.ErrDupEvent
}
return nil
}
func (b *PostgresBackend) BeforeSave(ctx context.Context, evt *nostr.Event) {
// do nothing
}
func (b *PostgresBackend) AfterSave(evt *nostr.Event) {
// delete all but the 100 most recent ones for each key
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
SELECT created_at FROM event WHERE pubkey = $1
ORDER BY created_at DESC OFFSET 100 LIMIT 1
)`, evt.PubKey, evt.Kind)
}
func saveEventSql(evt *nostr.Event) (string, []any, error) {
const query = `INSERT INTO event (
id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING`
var (
tagsj, _ = json.Marshal(evt.Tags)
params = []any{evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig}
)
return query, params, nil
}

View File

@@ -0,0 +1,64 @@
package postgresql
import (
"testing"
"github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/assert"
)
func TestSaveEventSql(t *testing.T) {
now := nostr.Now()
tests := []struct {
name string
event *nostr.Event
query string
params []any
err error
}{
{
name: "basic",
event: &nostr.Event{
ID: "id",
PubKey: "pk",
CreatedAt: now,
Kind: nostr.KindTextNote,
Content: "test",
Sig: "sig",
},
query: `INSERT INTO event (
id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING`,
params: []any{"id", "pk", now, nostr.KindTextNote, []byte("null"), "test", "sig"},
err: nil,
},
{
name: "tags",
event: &nostr.Event{
ID: "id",
PubKey: "pk",
CreatedAt: now,
Kind: nostr.KindTextNote,
Tags: nostr.Tags{nostr.Tag{"foo", "bar"}},
Content: "test",
Sig: "sig",
},
query: `INSERT INTO event (
id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING`,
params: []any{"id", "pk", now, nostr.KindTextNote, []byte("[[\"foo\",\"bar\"]]"), "test", "sig"},
err: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query, params, err := saveEventSql(tt.event)
assert.Equal(t, clean(tt.query), clean(query))
assert.Equal(t, tt.params, params)
assert.Equal(t, tt.err, err)
})
}
}

View File

@@ -0,0 +1,12 @@
package sqlite3
import (
"context"
"github.com/nbd-wtf/go-nostr"
)
func (b SQLite3Backend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
_, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1", evt.ID)
return err
}

View File

@@ -0,0 +1,33 @@
package sqlite3
import (
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/mattn/go-sqlite3"
)
func (b *SQLite3Backend) Init() error {
db, err := sqlx.Connect("sqlite3", b.DatabaseURL)
if err != nil {
return err
}
// sqlx default is 0 (unlimited), while sqlite3 by default accepts up to 100 connections
db.SetMaxOpenConns(80)
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
b.DB = db
_, err = b.DB.Exec(`
CREATE TABLE IF NOT EXISTS event (
id text NOT NULL,
pubkey text NOT NULL,
created_at integer NOT NULL,
kind integer NOT NULL,
tags jsonb NOT NULL,
content text NOT NULL,
sig text NOT NULL
);
`)
return err
}

View File

@@ -0,0 +1,192 @@
package sqlite3
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"strconv"
"strings"
"github.com/jmoiron/sqlx"
"github.com/nbd-wtf/go-nostr"
)
func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch chan *nostr.Event, err error) {
ch = make(chan *nostr.Event)
query, params, err := queryEventsSql(filter, false)
if err != nil {
return nil, err
}
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
go func() {
defer rows.Close()
defer close(ch)
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return
}
evt.CreatedAt = nostr.Timestamp(timestamp)
ch <- &evt
}
}()
return ch, nil
}
func (b SQLite3Backend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
query, params, err := queryEventsSql(filter, true)
if err != nil {
return 0, err
}
var count int64
err = b.DB.QueryRow(query, params...).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
return count, nil
}
func queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) {
var conditions []string
var params []any
if filter.IDs != nil {
if len(filter.IDs) > 500 {
// too many ids, fail everything
return "", nil, nil
}
likeids := make([]string, 0, len(filter.IDs))
for _, id := range filter.IDs {
// to prevent sql attack here we will check if
// these ids are valid 32byte hex
parsed, err := hex.DecodeString(id)
if err != nil || len(parsed) != 32 {
continue
}
likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed))
}
if len(likeids) == 0 {
// ids being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")")
}
if filter.Authors != nil {
if len(filter.Authors) > 500 {
// too many authors, fail everything
return "", nil, nil
}
likekeys := make([]string, 0, len(filter.Authors))
for _, key := range filter.Authors {
// to prevent sql attack here we will check if
// these keys are valid 32byte hex
parsed, err := hex.DecodeString(key)
if err != nil || len(parsed) != 32 {
continue
}
likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed))
}
if len(likekeys) == 0 {
// authors being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")")
}
if filter.Kinds != nil {
if len(filter.Kinds) > 10 {
// too many kinds, fail everything
return "", nil, nil
}
if len(filter.Kinds) == 0 {
// kinds being [] mean you won't get anything
return "", nil, nil
}
// no sql injection issues since these are ints
inkinds := make([]string, len(filter.Kinds))
for i, kind := range filter.Kinds {
inkinds[i] = strconv.Itoa(kind)
}
conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`)
}
tagQuery := make([]string, 0, 1)
for _, values := range filter.Tags {
if len(values) == 0 {
// any tag set to [] is wrong
return "", nil, nil
}
// add these tags to the query
tagQuery = append(tagQuery, values...)
if len(tagQuery) > 10 {
// too many tags, fail everything
return "", nil, nil
}
}
// we use a very bad implementation in which we only check the tag values and
// ignore the tag names
for _, tagValue := range tagQuery {
params = append(params, "%"+tagValue+"%")
conditions = append(conditions, "tags LIKE ?")
}
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
params = append(params, filter.Until)
}
if filter.Search != "" {
conditions = append(conditions, "content LIKE ?")
params = append(params, "%"+filter.Search+"%")
}
if len(conditions) == 0 {
// fallback
conditions = append(conditions, "true")
}
if filter.Limit < 1 || filter.Limit > 100 {
params = append(params, 100)
} else {
params = append(params, filter.Limit)
}
var query string
if doCount {
query = sqlx.Rebind(sqlx.BindType("sqlite3"), `SELECT
COUNT(*)
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
} else {
query = sqlx.Rebind(sqlx.BindType("sqlite3"), `SELECT
id, pubkey, created_at, kind, tags, content, sig
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
}
return query, params, nil
}

View File

@@ -0,0 +1,44 @@
package sqlite3
import (
"context"
"encoding/json"
"github.com/fiatjaf/khatru"
"github.com/nbd-wtf/go-nostr"
)
func (b *SQLite3Backend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
// insert
tagsj, _ := json.Marshal(evt.Tags)
res, err := b.DB.ExecContext(ctx, `
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
if err != nil {
return err
}
nr, err := res.RowsAffected()
if err != nil {
return err
}
if nr == 0 {
return khatru.ErrDupEvent
}
return nil
}
func (b *SQLite3Backend) BeforeSave(ctx context.Context, evt *nostr.Event) {
// do nothing
}
func (b *SQLite3Backend) AfterSave(evt *nostr.Event) {
// delete all but the 100 most recent ones for each key
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
SELECT created_at FROM event WHERE pubkey = $1
ORDER BY created_at DESC OFFSET 100 LIMIT 1
)`, evt.PubKey, evt.Kind)
}

View File

@@ -0,0 +1,10 @@
package sqlite3
import (
"github.com/jmoiron/sqlx"
)
type SQLite3Backend struct {
*sqlx.DB
DatabaseURL string
}

82
relay.go Normal file
View File

@@ -0,0 +1,82 @@
package khatru
import (
"context"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/fasthttp/websocket"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11"
)
func NewRelay() *Relay {
return &Relay{
Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags),
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
clients: make(map[*websocket.Conn]struct{}),
serveMux: &http.ServeMux{},
WriteWait: 10 * time.Second,
PongWait: 60 * time.Second,
PingPeriod: 30 * time.Second,
MaxMessageSize: 512000,
}
}
type Relay struct {
Name string
Description string
PubKey string
Contact string
ServiceURL string // required for nip-42
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
StoreEvent []func(ctx context.Context, event *nostr.Event) error
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
EditInformation []func(ctx context.Context, info *nip11.RelayInformationDocument)
OnAuth []func(ctx context.Context, pubkey string)
OnConnect []func(ctx context.Context)
OnEventSaved []func(ctx context.Context, event *nostr.Event)
// Default logger, as set by NewServer, is a stdlib logger prefixed with "[khatru-relay] ",
// outputting to stderr.
Log *log.Logger
// for establishing websockets
upgrader websocket.Upgrader
// keep a connection reference to all connected clients for Server.Shutdown
clientsMu sync.Mutex
clients map[*websocket.Conn]struct{}
// in case you call Server.Start
Addr string
serveMux *http.ServeMux
httpServer *http.Server
// websocket options
WriteWait time.Duration // Time allowed to write a message to the peer.
PongWait time.Duration // Time allowed to read the next pong message from the peer.
PingPeriod time.Duration // Send pings to peer with this period. Must be less than pongWait.
MaxMessageSize int64 // Maximum message size allowed from peer.
}
func (rl *Relay) RequestAuth(ctx context.Context) {
ws := GetConnection(ctx)
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
}

60
start.go Normal file
View File

@@ -0,0 +1,60 @@
package khatru
import (
"context"
"net"
"net/http"
"strconv"
"time"
"github.com/fasthttp/websocket"
"github.com/rs/cors"
)
func (rl *Relay) Router() *http.ServeMux {
return rl.serveMux
}
// Start creates an http server and starts listening on given host and port.
func (rl *Relay) Start(host string, port int, started ...chan bool) error {
addr := net.JoinHostPort(host, strconv.Itoa(port))
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
rl.Addr = ln.Addr().String()
rl.httpServer = &http.Server{
Handler: cors.Default().Handler(rl),
Addr: addr,
WriteTimeout: 2 * time.Second,
ReadTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
}
// notify caller that we're starting
for _, started := range started {
close(started)
}
if err := rl.httpServer.Serve(ln); err == http.ErrServerClosed {
return nil
} else if err != nil {
return err
} else {
return nil
}
}
// Shutdown sends a websocket close control message to all connected clients.
func (rl *Relay) Shutdown(ctx context.Context) {
rl.httpServer.Shutdown(ctx)
rl.clientsMu.Lock()
defer rl.clientsMu.Unlock()
for conn := range rl.clients {
conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))
conn.Close()
delete(rl.clients, conn)
}
}

93
start_test.go Normal file
View File

@@ -0,0 +1,93 @@
package khatru
import (
"context"
"errors"
"net/http"
"testing"
"time"
"github.com/gobwas/ws/wsutil"
"github.com/nbd-wtf/go-nostr"
)
func TestServerStartShutdown(t *testing.T) {
var (
inited bool
storeInited bool
shutdown bool
)
rl := &testRelay{
name: "test server start",
init: func() error {
inited = true
return nil
},
onShutdown: func(context.Context) { shutdown = true },
storage: &testStorage{
init: func() error { storeInited = true; return nil },
},
}
srv, _ := NewServer(rl)
ready := make(chan bool)
done := make(chan error)
go func() { done <- srv.Start("127.0.0.1", 0, ready); close(done) }()
<-ready
// verify everything's initialized
if !inited {
t.Error("didn't call testRelay.init")
}
if !storeInited {
t.Error("didn't call testStorage.init")
}
// check that http requests are served
if _, err := http.Get("http://" + srv.Addr); err != nil {
t.Errorf("GET %s: %v", srv.Addr, err)
}
// verify server shuts down
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
srv.Shutdown(ctx)
if !shutdown {
t.Error("didn't call testRelay.onShutdown")
}
select {
case err := <-done:
if err != nil {
t.Errorf("srv.Start: %v", err)
}
case <-time.After(time.Second):
t.Error("srv.Start too long to return")
}
}
func TestServerShutdownWebsocket(t *testing.T) {
// set up a new relay server
srv := startTestRelay(t, &testRelay{storage: &testStorage{}})
// connect a client to it
ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
client, err := nostr.RelayConnect(ctx1, "ws://"+srv.Addr)
if err != nil {
t.Fatalf("nostr.RelayConnectContext: %v", err)
}
// now, shut down the server
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
srv.Shutdown(ctx2)
// wait for the client to receive a "connection close"
time.Sleep(1 * time.Second)
err = client.ConnectionError
if e := errors.Unwrap(err); e != nil {
err = e
}
if _, ok := err.(wsutil.ClosedError); !ok {
t.Errorf("client.ConnextionError: %v (%T); want wsutil.ClosedError", err, err)
}
}

91
util_test.go Normal file
View File

@@ -0,0 +1,91 @@
package khatru
import (
"context"
"testing"
"github.com/nbd-wtf/go-nostr"
)
func startTestRelay(t *testing.T, tr *testRelay) *Server {
t.Helper()
srv, _ := NewServer(tr)
started := make(chan bool)
go srv.Start("127.0.0.1", 0, started)
<-started
return srv
}
type testRelay struct {
name string
storage Storage
init func() error
onShutdown func(context.Context)
acceptEvent func(*nostr.Event) bool
}
func (tr *testRelay) Name() string { return tr.name }
func (tr *testRelay) Storage(context.Context) Storage { return tr.storage }
func (tr *testRelay) Init() error {
if fn := tr.init; fn != nil {
return fn()
}
return nil
}
func (tr *testRelay) OnShutdown(ctx context.Context) {
if fn := tr.onShutdown; fn != nil {
fn(ctx)
}
}
func (tr *testRelay) AcceptEvent(ctx context.Context, e *nostr.Event) bool {
if fn := tr.acceptEvent; fn != nil {
return fn(e)
}
return true
}
type testStorage struct {
init func() error
queryEvents func(context.Context, *nostr.Filter) (chan *nostr.Event, error)
deleteEvent func(ctx context.Context, id string, pubkey string) error
saveEvent func(context.Context, *nostr.Event) error
countEvents func(context.Context, *nostr.Filter) (int64, error)
}
func (st *testStorage) Init() error {
if fn := st.init; fn != nil {
return fn()
}
return nil
}
func (st *testStorage) QueryEvents(ctx context.Context, f *nostr.Filter) (chan *nostr.Event, error) {
if fn := st.queryEvents; fn != nil {
return fn(ctx, f)
}
return nil, nil
}
func (st *testStorage) DeleteEvent(ctx context.Context, id string, pubkey string) error {
if fn := st.deleteEvent; fn != nil {
return fn(ctx, id, pubkey)
}
return nil
}
func (st *testStorage) SaveEvent(ctx context.Context, e *nostr.Event) error {
if fn := st.saveEvent; fn != nil {
return fn(ctx, e)
}
return nil
}
func (st *testStorage) CountEvents(ctx context.Context, f *nostr.Filter) (int64, error) {
if fn := st.countEvents; fn != nil {
return fn(ctx, f)
}
return 0, nil
}

25
utils.go Normal file
View File

@@ -0,0 +1,25 @@
package khatru
import (
"context"
"regexp"
)
const (
AUTH_CONTEXT_KEY = iota
WS_KEY = iota
)
var nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
func GetConnection(ctx context.Context) *WebSocket {
return ctx.Value(WS_KEY).(*WebSocket)
}
func GetAuthed(ctx context.Context) string {
authedPubkey := ctx.Value(AUTH_CONTEXT_KEY)
if authedPubkey == nil {
return ""
}
return authedPubkey.(string)
}

29
websocket.go Normal file
View File

@@ -0,0 +1,29 @@
package khatru
import (
"sync"
"github.com/fasthttp/websocket"
)
type WebSocket struct {
conn *websocket.Conn
mutex sync.Mutex
// nip42
Challenge string
Authed string
WaitingForAuth chan struct{}
}
func (ws *WebSocket) WriteJSON(any any) error {
ws.mutex.Lock()
defer ws.mutex.Unlock()
return ws.conn.WriteJSON(any)
}
func (ws *WebSocket) WriteMessage(t int, b []byte) error {
ws.mutex.Lock()
defer ws.mutex.Unlock()
return ws.conn.WriteMessage(t, b)
}