mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-20 03:38:00 +02:00
Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5823515d27 | ||
|
|
9273a4b809 | ||
|
|
ddfc9ab64a | ||
|
|
375236cfe2 | ||
|
|
35e801379a | ||
|
|
22da06b629 | ||
|
|
7bfde76ab1 | ||
|
|
ad92d0b051 | ||
|
|
728417852e | ||
|
|
3c1b062eb8 | ||
|
|
84d01dc1d3 | ||
|
|
888ac8c1c0 | ||
|
|
e1fd6aaa56 | ||
|
|
386a89676a | ||
|
|
90697ad3d3 | ||
|
|
8c8a435a0b | ||
|
|
d608c67791 | ||
|
|
c0069f1e1b | ||
|
|
7a221cf9f0 | ||
|
|
194ec994d7 | ||
|
|
d592bd95a9 | ||
|
|
2edf754907 | ||
|
|
18e4904a00 | ||
|
|
591b49fe73 | ||
|
|
5db3b5fb8b | ||
|
|
dcdf86c4e4 | ||
|
|
0a62169e14 | ||
|
|
8fd6436ac8 | ||
|
|
d2544d0f4d | ||
|
|
7a3eb6fb08 | ||
|
|
1abeab4851 | ||
|
|
ecb4fc66f8 | ||
|
|
af24bd2132 | ||
|
|
4905a46ccd | ||
|
|
21e0c559f7 | ||
|
|
78dd138ca8 | ||
|
|
6c1a030ad2 | ||
|
|
270096debb | ||
|
|
487b84cf2d | ||
|
|
b277dae743 | ||
|
|
1e51cdbc07 | ||
|
|
a15cd4e545 | ||
|
|
e6078b1a68 | ||
|
|
0ad33f78f1 |
14
.github/workflows/test.yml
vendored
Normal file
14
.github/workflows/test.yml
vendored
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
name: test every commit
|
||||||
|
on:
|
||||||
|
- push
|
||||||
|
- pull_request
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v2
|
||||||
|
- uses: actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version-file: ./go.mod
|
||||||
|
- run: go test ./...
|
||||||
121
README.md
121
README.md
@@ -1 +1,120 @@
|
|||||||
khatru
|
# khatru, a relay framework [](https://pkg.go.dev/github.com/fiatjaf/khatru#Relay)
|
||||||
|
|
||||||
|
[](https://github.com/fiatjaf/khatru/actions/workflows/test.yml)
|
||||||
|
[](https://pkg.go.dev/github.com/fiatjaf/khatru)
|
||||||
|
[](https://goreportcard.com/report/github.com/fiatjaf/khatru)
|
||||||
|
|
||||||
|
Khatru makes it easy to write very very custom relays:
|
||||||
|
|
||||||
|
- custom event or filter acceptance policies
|
||||||
|
- custom `AUTH` handlers
|
||||||
|
- custom storage and pluggable databases
|
||||||
|
- custom webpages and other HTTP handlers
|
||||||
|
|
||||||
|
Here's a sample:
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/fiatjaf/khatru"
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// create the relay instance
|
||||||
|
relay := khatru.NewRelay()
|
||||||
|
|
||||||
|
// set up some basic properties (will be returned on the NIP-11 endpoint)
|
||||||
|
relay.Info.Name = "my relay"
|
||||||
|
relay.Info.PubKey = "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798"
|
||||||
|
relay.Info.Description = "this is my custom relay"
|
||||||
|
relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images"
|
||||||
|
|
||||||
|
// you must bring your own storage scheme -- if you want to have any
|
||||||
|
store := make(map[string]*nostr.Event, 120)
|
||||||
|
|
||||||
|
// set up the basic relay functions
|
||||||
|
relay.StoreEvent = append(relay.StoreEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) error {
|
||||||
|
store[event.ID] = event
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.QueryEvents = append(relay.QueryEvents,
|
||||||
|
func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
|
||||||
|
ch := make(chan *nostr.Event)
|
||||||
|
go func() {
|
||||||
|
for _, evt := range store {
|
||||||
|
if filter.Matches(evt) {
|
||||||
|
ch <- evt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
return ch, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.DeleteEvent = append(relay.DeleteEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) error {
|
||||||
|
delete(store, event.ID)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// there are many other configurable things you can set
|
||||||
|
relay.RejectEvent = append(relay.RejectEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
if event.PubKey == "fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52" {
|
||||||
|
return true, "we don't allow this person to write here"
|
||||||
|
}
|
||||||
|
return false, "" // anyone else can
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.OnConnect = append(relay.OnConnect,
|
||||||
|
func(ctx context.Context) {
|
||||||
|
// request NIP-42 AUTH from everybody
|
||||||
|
relay.RequestAuth(ctx)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.OnAuth = append(relay.OnAuth,
|
||||||
|
func(ctx context.Context, pubkey string) {
|
||||||
|
// and when they auth we just log that for nothing
|
||||||
|
log.Println(pubkey + " is authed!")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// check the docs for more goodies!
|
||||||
|
|
||||||
|
mux := relay.Router()
|
||||||
|
// set up other http handlers
|
||||||
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("content-type", "text/html")
|
||||||
|
fmt.Fprintf(w, `<b>welcome</b> to my relay!`)
|
||||||
|
})
|
||||||
|
|
||||||
|
// start the server
|
||||||
|
fmt.Println("running on :3334")
|
||||||
|
http.ListenAndServe(":3334", relay)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### But I don't want to write my own database!
|
||||||
|
|
||||||
|
Fear no more. Using the https://github.com/fiatjaf/eventstore module you get a bunch of compatible databases out of the box and you can just plug them into your relay. For example, [sqlite](https://pkg.go.dev/github.com/fiatjaf/eventstore/sqlite3):
|
||||||
|
|
||||||
|
```go
|
||||||
|
db := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/khatru-sqlite-tmp"}
|
||||||
|
if err := db.Init(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
|
||||||
|
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||||
|
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
|
||||||
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
|
```
|
||||||
|
|||||||
26
add-event.go
26
add-event.go
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/fiatjaf/eventstore"
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -12,22 +13,15 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
|||||||
return fmt.Errorf("event is nil")
|
return fmt.Errorf("event is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := ""
|
|
||||||
rejecting := false
|
|
||||||
for _, reject := range rl.RejectEvent {
|
for _, reject := range rl.RejectEvent {
|
||||||
rejecting, msg = reject(ctx, evt)
|
if reject, msg := reject(ctx, evt); reject {
|
||||||
if rejecting {
|
if msg == "" {
|
||||||
break
|
msg = "no reason"
|
||||||
|
}
|
||||||
|
return fmt.Errorf(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if rejecting {
|
|
||||||
if msg == "" {
|
|
||||||
msg = "no reason"
|
|
||||||
}
|
|
||||||
return fmt.Errorf(msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
if 20000 <= evt.Kind && evt.Kind < 30000 {
|
if 20000 <= evt.Kind && evt.Kind < 30000 {
|
||||||
// do not store ephemeral events
|
// do not store ephemeral events
|
||||||
} else {
|
} else {
|
||||||
@@ -38,8 +32,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
previous := <-ch
|
if previous := <-ch; previous != nil && isOlder(previous, evt) {
|
||||||
if previous != nil {
|
|
||||||
for _, del := range rl.DeleteEvent {
|
for _, del := range rl.DeleteEvent {
|
||||||
del(ctx, previous)
|
del(ctx, previous)
|
||||||
}
|
}
|
||||||
@@ -54,8 +47,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
previous := <-ch
|
if previous := <-ch; previous != nil && isOlder(previous, evt) {
|
||||||
if previous != nil {
|
|
||||||
for _, del := range rl.DeleteEvent {
|
for _, del := range rl.DeleteEvent {
|
||||||
del(ctx, previous)
|
del(ctx, previous)
|
||||||
}
|
}
|
||||||
@@ -68,7 +60,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
|||||||
for _, store := range rl.StoreEvent {
|
for _, store := range rl.StoreEvent {
|
||||||
if saveErr := store(ctx, evt); saveErr != nil {
|
if saveErr := store(ctx, evt); saveErr != nil {
|
||||||
switch saveErr {
|
switch saveErr {
|
||||||
case ErrDupEvent:
|
case eventstore.ErrDupEvent:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
errmsg := saveErr.Error()
|
errmsg := saveErr.Error()
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
package khatru
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
var ErrDupEvent = fmt.Errorf("duplicate: event already exists")
|
|
||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/fiatjaf/eventstore/lmdb"
|
"github.com/fiatjaf/eventstore/lmdb"
|
||||||
"github.com/fiatjaf/khatru"
|
"github.com/fiatjaf/khatru"
|
||||||
"github.com/fiatjaf/khatru/plugins"
|
"github.com/fiatjaf/khatru/policies"
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -26,8 +26,8 @@ func main() {
|
|||||||
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
|
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
|
||||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
|
|
||||||
relay.RejectEvent = append(relay.RejectEvent, plugins.PreventTooManyIndexableTags(10))
|
relay.RejectEvent = append(relay.RejectEvent, policies.PreventTooManyIndexableTags(10, nil, nil))
|
||||||
relay.RejectFilter = append(relay.RejectFilter, plugins.NoPrefixFilters, plugins.NoComplexFilters)
|
relay.RejectFilter = append(relay.RejectFilter, policies.NoComplexFilters)
|
||||||
|
|
||||||
relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) {
|
relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) {
|
||||||
})
|
})
|
||||||
|
|||||||
BIN
examples/readme-demo/demo-memory
Executable file
BIN
examples/readme-demo/demo-memory
Executable file
Binary file not shown.
92
examples/readme-demo/main.go
Normal file
92
examples/readme-demo/main.go
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/fiatjaf/khatru"
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// create the relay instance
|
||||||
|
relay := khatru.NewRelay()
|
||||||
|
|
||||||
|
// set up some basic properties (will be returned on the NIP-11 endpoint)
|
||||||
|
relay.Info.Name = "my relay"
|
||||||
|
relay.Info.PubKey = "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798"
|
||||||
|
relay.Info.Description = "this is my custom relay"
|
||||||
|
relay.Info.Icon = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fliquipedia.net%2Fcommons%2Fimages%2F3%2F35%2FSCProbe.jpg&f=1&nofb=1&ipt=0cbbfef25bce41da63d910e86c3c343e6c3b9d63194ca9755351bb7c2efa3359&ipo=images"
|
||||||
|
|
||||||
|
// you must bring your own storage scheme -- if you want to have any
|
||||||
|
store := make(map[string]*nostr.Event, 120)
|
||||||
|
|
||||||
|
// set up the basic relay functions
|
||||||
|
relay.StoreEvent = append(relay.StoreEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) error {
|
||||||
|
store[event.ID] = event
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.QueryEvents = append(relay.QueryEvents,
|
||||||
|
func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
|
||||||
|
ch := make(chan *nostr.Event)
|
||||||
|
go func() {
|
||||||
|
for _, evt := range store {
|
||||||
|
if filter.Matches(evt) {
|
||||||
|
ch <- evt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
return ch, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.DeleteEvent = append(relay.DeleteEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) error {
|
||||||
|
delete(store, event.ID)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// there are many other configurable things you can set
|
||||||
|
relay.RejectEvent = append(relay.RejectEvent,
|
||||||
|
func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
if event.PubKey == "fa984bd7dbb282f07e16e7ae87b26a2a7b9b90b7246a44771f0cf5ae58018f52" {
|
||||||
|
return true, "we don't allow this person to write here"
|
||||||
|
}
|
||||||
|
return false, "" // anyone else can
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// you can request auth by rejecting an event or a request with the prefix "auth-required: "
|
||||||
|
relay.RejectFilter = append(relay.RejectFilter,
|
||||||
|
func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||||
|
if pubkey := khatru.GetAuthed(ctx); pubkey != "" {
|
||||||
|
log.Printf("request from %s\n", pubkey)
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
return true, "auth-required: only authenticated users can read from this relay"
|
||||||
|
},
|
||||||
|
)
|
||||||
|
relay.OnAuth = append(relay.OnAuth,
|
||||||
|
func(ctx context.Context, pubkey string) {
|
||||||
|
// and when they auth we can just log that for nothing
|
||||||
|
log.Println(pubkey + " is authed!")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// check the docs for more goodies!
|
||||||
|
|
||||||
|
mux := relay.Router()
|
||||||
|
// set up other http handlers
|
||||||
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("content-type", "text/html")
|
||||||
|
fmt.Fprintf(w, `<b>welcome</b> to my relay!`)
|
||||||
|
})
|
||||||
|
|
||||||
|
// start the server
|
||||||
|
fmt.Println("running on :3334")
|
||||||
|
http.ListenAndServe(":3334", relay)
|
||||||
|
}
|
||||||
4
go.mod
4
go.mod
@@ -5,8 +5,7 @@ go 1.21.0
|
|||||||
require (
|
require (
|
||||||
github.com/fasthttp/websocket v1.5.3
|
github.com/fasthttp/websocket v1.5.3
|
||||||
github.com/fiatjaf/eventstore v0.1.0
|
github.com/fiatjaf/eventstore v0.1.0
|
||||||
github.com/gobwas/ws v1.2.0
|
github.com/nbd-wtf/go-nostr v0.26.0
|
||||||
github.com/nbd-wtf/go-nostr v0.25.1
|
|
||||||
github.com/puzpuzpuz/xsync/v2 v2.5.1
|
github.com/puzpuzpuz/xsync/v2 v2.5.1
|
||||||
github.com/rs/cors v1.7.0
|
github.com/rs/cors v1.7.0
|
||||||
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
|
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
|
||||||
@@ -30,6 +29,7 @@ require (
|
|||||||
github.com/fatih/structs v1.1.0 // indirect
|
github.com/fatih/structs v1.1.0 // indirect
|
||||||
github.com/gobwas/httphead v0.1.0 // indirect
|
github.com/gobwas/httphead v0.1.0 // indirect
|
||||||
github.com/gobwas/pool v0.2.1 // indirect
|
github.com/gobwas/pool v0.2.1 // indirect
|
||||||
|
github.com/gobwas/ws v1.2.0 // indirect
|
||||||
github.com/gogo/protobuf v1.3.2 // indirect
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
github.com/golang/glog v1.0.0 // indirect
|
github.com/golang/glog v1.0.0 // indirect
|
||||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
|
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -90,8 +90,8 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
|
|||||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||||
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
|
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
|
||||||
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||||
github.com/nbd-wtf/go-nostr v0.25.1 h1:YTLTDUgngfzd3qQ0fWmQmq20flwnGtHH0g0Q8S3HlW4=
|
github.com/nbd-wtf/go-nostr v0.26.0 h1:Tofbs9i8DD5iEKIhLlWFO7kfWpvmUG16fEyW30MzHVQ=
|
||||||
github.com/nbd-wtf/go-nostr v0.25.1/go.mod h1:bkffJI+x914sPQWum9ZRUn66D7NpDnAoWo1yICvj3/0=
|
github.com/nbd-wtf/go-nostr v0.26.0/go.mod h1:bkffJI+x914sPQWum9ZRUn66D7NpDnAoWo1yICvj3/0=
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
|||||||
291
handlers.go
291
handlers.go
@@ -6,30 +6,34 @@ import (
|
|||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fasthttp/websocket"
|
"github.com/fasthttp/websocket"
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
"github.com/nbd-wtf/go-nostr/nip11"
|
|
||||||
"github.com/nbd-wtf/go-nostr/nip42"
|
"github.com/nbd-wtf/go-nostr/nip42"
|
||||||
|
"github.com/rs/cors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ServeHTTP implements http.Handler interface.
|
// ServeHTTP implements http.Handler interface.
|
||||||
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if rl.ServiceURL == "" {
|
||||||
|
rl.ServiceURL = getServiceBaseURL(r)
|
||||||
|
}
|
||||||
|
|
||||||
if r.Header.Get("Upgrade") == "websocket" {
|
if r.Header.Get("Upgrade") == "websocket" {
|
||||||
rl.HandleWebsocket(w, r)
|
rl.HandleWebsocket(w, r)
|
||||||
} else if r.Header.Get("Accept") == "application/nostr+json" {
|
} else if r.Header.Get("Accept") == "application/nostr+json" {
|
||||||
rl.HandleNIP11(w, r)
|
cors.AllowAll().Handler(http.HandlerFunc(rl.HandleNIP11)).ServeHTTP(w, r)
|
||||||
} else {
|
} else {
|
||||||
rl.serveMux.ServeHTTP(w, r)
|
rl.serveMux.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
|
||||||
|
|
||||||
conn, err := rl.upgrader.Upgrade(w, r, nil)
|
conn, err := rl.upgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
|
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
|
||||||
@@ -43,21 +47,31 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
rand.Read(challenge)
|
rand.Read(challenge)
|
||||||
|
|
||||||
ws := &WebSocket{
|
ws := &WebSocket{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
Challenge: hex.EncodeToString(challenge),
|
Request: r,
|
||||||
WaitingForAuth: make(chan struct{}),
|
Challenge: hex.EncodeToString(challenge),
|
||||||
|
Authed: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(
|
||||||
|
context.WithValue(
|
||||||
|
context.Background(),
|
||||||
|
WS_KEY, ws,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
kill := func() {
|
||||||
|
ticker.Stop()
|
||||||
|
cancel()
|
||||||
|
if _, ok := rl.clients.Load(conn); ok {
|
||||||
|
conn.Close()
|
||||||
|
rl.clients.Delete(conn)
|
||||||
|
removeListener(ws)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reader
|
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer kill()
|
||||||
ticker.Stop()
|
|
||||||
if _, ok := rl.clients.Load(conn); ok {
|
|
||||||
conn.Close()
|
|
||||||
rl.clients.Delete(conn)
|
|
||||||
removeListener(ws)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
conn.SetReadLimit(rl.MaxMessageSize)
|
conn.SetReadLimit(rl.MaxMessageSize)
|
||||||
conn.SetReadDeadline(time.Now().Add(rl.PongWait))
|
conn.SetReadDeadline(time.Now().Add(rl.PongWait))
|
||||||
@@ -75,6 +89,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if websocket.IsUnexpectedCloseError(
|
if websocket.IsUnexpectedCloseError(
|
||||||
err,
|
err,
|
||||||
|
websocket.CloseNormalClosure, // 1000
|
||||||
websocket.CloseGoingAway, // 1001
|
websocket.CloseGoingAway, // 1001
|
||||||
websocket.CloseNoStatusReceived, // 1005
|
websocket.CloseNoStatusReceived, // 1005
|
||||||
websocket.CloseAbnormalClosure, // 1006
|
websocket.CloseAbnormalClosure, // 1006
|
||||||
@@ -90,211 +105,119 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func(message []byte) {
|
go func(message []byte) {
|
||||||
ctx = context.Background()
|
envelope := nostr.ParseMessage(message)
|
||||||
|
if envelope == nil {
|
||||||
var request []json.RawMessage
|
|
||||||
if err := json.Unmarshal(message, &request); err != nil {
|
|
||||||
// stop silently
|
// stop silently
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(request) < 2 {
|
switch env := envelope.(type) {
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("request has less than 2 parameters"))
|
case *nostr.EventEnvelope:
|
||||||
return
|
// check id
|
||||||
}
|
hash := sha256.Sum256(env.Event.Serialize())
|
||||||
|
id := hex.EncodeToString(hash[:])
|
||||||
var typ string
|
if id != env.Event.ID {
|
||||||
json.Unmarshal(request[0], &typ)
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "invalid: id is computed incorrectly"})
|
||||||
|
|
||||||
switch typ {
|
|
||||||
case "EVENT":
|
|
||||||
// it's a new event
|
|
||||||
var evt nostr.Event
|
|
||||||
if err := json.Unmarshal(request[1], &evt); err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode event: " + err.Error()))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// check serialization
|
// check signature
|
||||||
serialized := evt.Serialize()
|
if ok, err := env.Event.CheckSignature(); err != nil {
|
||||||
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to verify signature"})
|
||||||
// assign ID
|
|
||||||
hash := sha256.Sum256(serialized)
|
|
||||||
evt.ID = hex.EncodeToString(hash[:])
|
|
||||||
|
|
||||||
// check signature (requires the ID to be set)
|
|
||||||
if ok, err := evt.CheckSignature(); err != nil {
|
|
||||||
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: "error: failed to verify signature"})
|
|
||||||
return
|
return
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: "invalid: signature is invalid"})
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "invalid: signature is invalid"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var ok bool
|
var ok bool
|
||||||
if evt.Kind == 5 {
|
if env.Event.Kind == 5 {
|
||||||
err = rl.handleDeleteRequest(ctx, &evt)
|
err = rl.handleDeleteRequest(ctx, &env.Event)
|
||||||
} else {
|
} else {
|
||||||
err = rl.AddEvent(ctx, &evt)
|
err = rl.AddEvent(ctx, &env.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
var reason string
|
var reason string
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ok = true
|
ok = true
|
||||||
} else {
|
} else {
|
||||||
reason = err.Error()
|
reason = nostr.NormalizeOKMessage(err.Error(), "blocked")
|
||||||
|
if isAuthRequired(reason) {
|
||||||
|
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: ok, Reason: reason})
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: ok, Reason: reason})
|
||||||
case "COUNT":
|
case *nostr.CountEnvelope:
|
||||||
if rl.CountEvents == nil {
|
if rl.CountEvents == nil {
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("this relay does not support NIP-45"))
|
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var id string
|
|
||||||
json.Unmarshal(request[1], &id)
|
|
||||||
if id == "" {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("COUNT has no <id>"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var total int64
|
var total int64
|
||||||
filters := make(nostr.Filters, len(request)-2)
|
for _, filter := range env.Filters {
|
||||||
for i, filterReq := range request[2:] {
|
total += rl.handleCountRequest(ctx, ws, filter)
|
||||||
if err := json.Unmarshal(filterReq, &filters[i]); err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
filter := filters[i]
|
|
||||||
|
|
||||||
for _, reject := range rl.RejectFilter {
|
|
||||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, reject := range rl.RejectCountFilter {
|
|
||||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, count := range rl.CountEvents {
|
|
||||||
res, err := count(ctx, filter)
|
|
||||||
if err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
|
||||||
}
|
|
||||||
total += res
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
ws.WriteJSON(nostr.CountEnvelope{SubscriptionID: env.SubscriptionID, Count: &total})
|
||||||
ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}})
|
case *nostr.ReqEnvelope:
|
||||||
case "REQ":
|
|
||||||
var id string
|
|
||||||
json.Unmarshal(request[1], &id)
|
|
||||||
if id == "" {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("REQ has no <id>"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
filters := make(nostr.Filters, len(request)-2)
|
|
||||||
eose := sync.WaitGroup{}
|
eose := sync.WaitGroup{}
|
||||||
eose.Add(len(request[2:]))
|
eose.Add(len(env.Filters))
|
||||||
|
|
||||||
for i, filterReq := range request[2:] {
|
// a context just for the "stored events" request handler
|
||||||
if err := json.Unmarshal(
|
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
|
||||||
filterReq,
|
|
||||||
&filters[i],
|
|
||||||
); err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
|
|
||||||
eose.Done()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
filter := filters[i]
|
// handle each filter separately -- dispatching events as they're loaded from databases
|
||||||
|
for _, filter := range env.Filters {
|
||||||
for _, reject := range rl.RejectCountFilter {
|
err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
|
||||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
if err != nil {
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
// fail everything if any filter is rejected
|
||||||
eose.Done()
|
reason := nostr.NormalizeOKMessage(err.Error(), "blocked")
|
||||||
continue
|
if isAuthRequired(reason) {
|
||||||
|
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
|
||||||
}
|
}
|
||||||
|
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
|
||||||
|
cancelReqCtx(fmt.Errorf("filter rejected"))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
eose.Add(len(rl.QueryEvents))
|
|
||||||
for _, query := range rl.QueryEvents {
|
|
||||||
ch, err := query(ctx, filter)
|
|
||||||
if err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
|
||||||
eose.Done()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(ch chan *nostr.Event) {
|
|
||||||
for event := range ch {
|
|
||||||
for _, ovw := range rl.OverwriteResponseEvent {
|
|
||||||
ovw(ctx, event)
|
|
||||||
}
|
|
||||||
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
|
|
||||||
}
|
|
||||||
eose.Done()
|
|
||||||
}(ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
eose.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
// when all events have been loaded from databases and dispatched
|
||||||
|
// we can cancel the context and fire the EOSE message
|
||||||
eose.Wait()
|
eose.Wait()
|
||||||
ws.WriteJSON(nostr.EOSEEnvelope(id))
|
cancelReqCtx(nil)
|
||||||
|
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
setListener(id, ws, filters)
|
setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx)
|
||||||
case "CLOSE":
|
case *nostr.CloseEnvelope:
|
||||||
var id string
|
removeListenerId(ws, string(*env))
|
||||||
json.Unmarshal(request[1], &id)
|
case *nostr.AuthEnvelope:
|
||||||
if id == "" {
|
wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1)
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("CLOSE has no <id>"))
|
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
|
||||||
return
|
ws.AuthedPublicKey = pubkey
|
||||||
}
|
close(ws.Authed)
|
||||||
|
ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, pubkey)
|
||||||
removeListenerId(ws, id)
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: true})
|
||||||
case "AUTH":
|
} else {
|
||||||
if rl.ServiceURL != "" {
|
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"})
|
||||||
var evt nostr.Event
|
|
||||||
if err := json.Unmarshal(request[1], &evt); err != nil {
|
|
||||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode auth event: " + err.Error()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if pubkey, ok := nip42.ValidateAuthEvent(&evt, ws.Challenge, rl.ServiceURL); ok {
|
|
||||||
ws.Authed = pubkey
|
|
||||||
close(ws.WaitingForAuth)
|
|
||||||
ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, pubkey)
|
|
||||||
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: true})
|
|
||||||
} else {
|
|
||||||
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: false, Reason: "error: failed to authenticate"})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(message)
|
}(message)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// writer
|
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer kill()
|
||||||
ticker.Stop()
|
|
||||||
conn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
err := ws.WriteMessage(websocket.PingMessage, nil)
|
err := ws.WriteMessage(websocket.PingMessage, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
|
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||||
|
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -303,29 +226,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
|
func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/nostr+json")
|
||||||
|
|
||||||
supportedNIPs := []int{9, 11, 12, 15, 16, 20, 33}
|
info := *rl.Info
|
||||||
if rl.ServiceURL != "" {
|
for _, ovw := range rl.OverwriteRelayInformation {
|
||||||
supportedNIPs = append(supportedNIPs, 42)
|
info = ovw(r.Context(), r, info)
|
||||||
}
|
|
||||||
if rl.CountEvents != nil {
|
|
||||||
supportedNIPs = append(supportedNIPs, 45)
|
|
||||||
}
|
|
||||||
|
|
||||||
info := nip11.RelayInformationDocument{
|
|
||||||
Name: rl.Name,
|
|
||||||
Description: rl.Description,
|
|
||||||
PubKey: rl.PubKey,
|
|
||||||
Contact: rl.Contact,
|
|
||||||
Icon: rl.IconURL,
|
|
||||||
SupportedNIPs: supportedNIPs,
|
|
||||||
Software: "https://github.com/trailriver/khatru",
|
|
||||||
Version: "n/a",
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, edit := range rl.EditInformation {
|
|
||||||
edit(r.Context(), &info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(info)
|
json.NewEncoder(w).Encode(info)
|
||||||
|
|||||||
55
helpers.go
Normal file
55
helpers.go
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
package khatru
|
||||||
|
|
||||||
|
import (
|
||||||
|
"hash/maphash"
|
||||||
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
AUTH_CONTEXT_KEY = iota
|
||||||
|
WS_KEY
|
||||||
|
)
|
||||||
|
|
||||||
|
var nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
|
||||||
|
|
||||||
|
func pointerHasher[V any](_ maphash.Seed, k *V) uint64 {
|
||||||
|
return uint64(uintptr(unsafe.Pointer(k)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func isOlder(previous, next *nostr.Event) bool {
|
||||||
|
return previous.CreatedAt < next.CreatedAt ||
|
||||||
|
(previous.CreatedAt == next.CreatedAt && previous.ID > next.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isAuthRequired(msg string) bool {
|
||||||
|
idx := strings.IndexByte(msg, ':')
|
||||||
|
return msg[0:idx] == "auth-required"
|
||||||
|
}
|
||||||
|
|
||||||
|
func getServiceBaseURL(r *http.Request) string {
|
||||||
|
host := r.Header.Get("X-Forwarded-Host")
|
||||||
|
if host == "" {
|
||||||
|
host = r.Host
|
||||||
|
}
|
||||||
|
proto := r.Header.Get("X-Forwarded-Proto")
|
||||||
|
if proto == "" {
|
||||||
|
if host == "localhost" {
|
||||||
|
proto = "http"
|
||||||
|
} else if strings.Index(host, ":") != -1 {
|
||||||
|
// has a port number
|
||||||
|
proto = "http"
|
||||||
|
} else if _, err := strconv.Atoi(strings.ReplaceAll(host, ".", "")); err == nil {
|
||||||
|
// it's a naked IP
|
||||||
|
proto = "http"
|
||||||
|
} else {
|
||||||
|
proto = "https"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return proto + "://" + host
|
||||||
|
}
|
||||||
18
listener.go
18
listener.go
@@ -1,12 +1,16 @@
|
|||||||
package khatru
|
package khatru
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
"github.com/puzpuzpuz/xsync/v2"
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Listener struct {
|
type Listener struct {
|
||||||
filters nostr.Filters
|
filters nostr.Filters
|
||||||
|
cancel context.CancelCauseFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket])
|
var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket])
|
||||||
@@ -43,24 +47,28 @@ func GetListeningFilters() nostr.Filters {
|
|||||||
return respfilters
|
return respfilters
|
||||||
}
|
}
|
||||||
|
|
||||||
func setListener(id string, ws *WebSocket, filters nostr.Filters) {
|
func setListener(id string, ws *WebSocket, filters nostr.Filters, cancel context.CancelCauseFunc) {
|
||||||
subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] {
|
subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] {
|
||||||
return xsync.NewMapOf[*Listener]()
|
return xsync.NewMapOf[*Listener]()
|
||||||
})
|
})
|
||||||
subs.Store(id, &Listener{filters: filters})
|
subs.Store(id, &Listener{filters: filters, cancel: cancel})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove a specific subscription id from listeners for a given ws client
|
// remove a specific subscription id from listeners for a given ws client
|
||||||
|
// and cancel its specific context
|
||||||
func removeListenerId(ws *WebSocket, id string) {
|
func removeListenerId(ws *WebSocket, id string) {
|
||||||
if subs, ok := listeners.Load(ws); ok {
|
if subs, ok := listeners.Load(ws); ok {
|
||||||
subs.Delete(id)
|
if listener, ok := subs.LoadAndDelete(id); ok {
|
||||||
|
listener.cancel(fmt.Errorf("subscription closed by client"))
|
||||||
|
}
|
||||||
if subs.Size() == 0 {
|
if subs.Size() == 0 {
|
||||||
listeners.Delete(ws)
|
listeners.Delete(ws)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove WebSocket conn from listeners
|
// remove WebSocket conn from listeners
|
||||||
|
// (no need to cancel contexts as they are all inherited from the main connection context)
|
||||||
func removeListener(ws *WebSocket) {
|
func removeListener(ws *WebSocket) {
|
||||||
listeners.Delete(ws)
|
listeners.Delete(ws)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,58 +0,0 @@
|
|||||||
package plugins
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PreventTooManyIndexableTags returns a function that can be used as a RejectFilter that will reject
|
|
||||||
// events with more indexable (single-character) tags than the specified number.
|
|
||||||
func PreventTooManyIndexableTags(max int) func(context.Context, *nostr.Event) (bool, string) {
|
|
||||||
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
|
||||||
ntags := 0
|
|
||||||
for _, tag := range event.Tags {
|
|
||||||
if len(tag) > 0 && len(tag[0]) == 1 {
|
|
||||||
ntags++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ntags > max {
|
|
||||||
return true, "too many indexable tags"
|
|
||||||
}
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RestrictToSpecifiedKinds returns a function that can be used as a RejectFilter that will reject
|
|
||||||
// any events with kinds different than the specified ones.
|
|
||||||
func RestrictToSpecifiedKinds(kinds ...uint16) func(context.Context, *nostr.Event) (bool, string) {
|
|
||||||
max := 0
|
|
||||||
min := 0
|
|
||||||
allowed := make(map[uint16]struct{}, len(kinds))
|
|
||||||
for _, kind := range kinds {
|
|
||||||
allowed[kind] = struct{}{}
|
|
||||||
if int(kind) > max {
|
|
||||||
max = int(kind)
|
|
||||||
}
|
|
||||||
if int(kind) < min {
|
|
||||||
min = int(kind)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
|
||||||
// these are cheap and very questionable optimizations, but they exist for a reason:
|
|
||||||
// we would have to ensure that the kind number is within the bounds of a uint16 anyway
|
|
||||||
if event.Kind > max {
|
|
||||||
return true, "event kind not allowed"
|
|
||||||
}
|
|
||||||
if event.Kind < min {
|
|
||||||
return true, "event kind not allowed"
|
|
||||||
}
|
|
||||||
|
|
||||||
// hopefully this map of uint16s is very fast
|
|
||||||
if _, allowed := allowed[uint16(event.Kind)]; allowed {
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
return true, "event kind not allowed"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
package plugins
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NoPrefixFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
|
||||||
for _, id := range filter.IDs {
|
|
||||||
if len(id) != 64 {
|
|
||||||
return true, fmt.Sprintf("filters can only contain full ids")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, pk := range filter.Authors {
|
|
||||||
if len(pk) != 64 {
|
|
||||||
return true, fmt.Sprintf("filters can only contain full pubkeys")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
|
||||||
items := len(filter.Tags) + len(filter.Kinds)
|
|
||||||
|
|
||||||
if items > 4 && len(filter.Tags) > 2 {
|
|
||||||
return true, "too many things to filter for"
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
110
policies/events.go
Normal file
110
policies/events.go
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
package policies
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PreventTooManyIndexableTags returns a function that can be used as a RejectFilter that will reject
|
||||||
|
// events with more indexable (single-character) tags than the specified number.
|
||||||
|
//
|
||||||
|
// If ignoreKinds is given this restriction will not apply to these kinds (useful for allowing a bigger).
|
||||||
|
// If onlyKinds is given then all other kinds will be ignored.
|
||||||
|
func PreventTooManyIndexableTags(max int, ignoreKinds []int, onlyKinds []int) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
ignore := func(kind int) bool { return false }
|
||||||
|
if len(ignoreKinds) > 0 {
|
||||||
|
ignore = func(kind int) bool {
|
||||||
|
_, isIgnored := slices.BinarySearch(ignoreKinds, kind)
|
||||||
|
return isIgnored
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(onlyKinds) > 0 {
|
||||||
|
ignore = func(kind int) bool {
|
||||||
|
_, isApplicable := slices.BinarySearch(onlyKinds, kind)
|
||||||
|
return !isApplicable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
if ignore(event.Kind) {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
ntags := 0
|
||||||
|
for _, tag := range event.Tags {
|
||||||
|
if len(tag) > 0 && len(tag[0]) == 1 {
|
||||||
|
ntags++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ntags > max {
|
||||||
|
return true, "too many indexable tags"
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreventLargeTags rejects events that have indexable tag values greater than maxTagValueLen.
|
||||||
|
func PreventLargeTags(maxTagValueLen int) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
for _, tag := range event.Tags {
|
||||||
|
if len(tag) > 1 && len(tag[0]) == 1 {
|
||||||
|
if len(tag[1]) > maxTagValueLen {
|
||||||
|
return true, "event contains too large tags"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestrictToSpecifiedKinds returns a function that can be used as a RejectFilter that will reject
|
||||||
|
// any events with kinds different than the specified ones.
|
||||||
|
func RestrictToSpecifiedKinds(kinds ...uint16) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
max := 0
|
||||||
|
min := 0
|
||||||
|
for _, kind := range kinds {
|
||||||
|
if int(kind) > max {
|
||||||
|
max = int(kind)
|
||||||
|
}
|
||||||
|
if int(kind) < min {
|
||||||
|
min = int(kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
// these are cheap and very questionable optimizations, but they exist for a reason:
|
||||||
|
// we would have to ensure that the kind number is within the bounds of a uint16 anyway
|
||||||
|
if event.Kind > max {
|
||||||
|
return true, "event kind not allowed"
|
||||||
|
}
|
||||||
|
if event.Kind < min {
|
||||||
|
return true, "event kind not allowed"
|
||||||
|
}
|
||||||
|
|
||||||
|
// hopefully this map of uint16s is very fast
|
||||||
|
if _, allowed := slices.BinarySearch(kinds, uint16(event.Kind)); allowed {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
return true, "event kind not allowed"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func PreventTimestampsInThePast(thresholdSeconds nostr.Timestamp) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
if nostr.Now()-event.CreatedAt > thresholdSeconds {
|
||||||
|
return true, "event too old"
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func PreventTimestampsInTheFuture(thresholdSeconds nostr.Timestamp) func(context.Context, *nostr.Event) (bool, string) {
|
||||||
|
return func(ctx context.Context, event *nostr.Event) (reject bool, msg string) {
|
||||||
|
if event.CreatedAt-nostr.Now() > thresholdSeconds {
|
||||||
|
return true, "event too much in the future"
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
73
policies/filters.go
Normal file
73
policies/filters.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package policies
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NoComplexFilters disallows filters with more than 2 tags.
|
||||||
|
func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||||
|
items := len(filter.Tags) + len(filter.Kinds)
|
||||||
|
|
||||||
|
if items > 4 && len(filter.Tags) > 2 {
|
||||||
|
return true, "too many things to filter for"
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// NoEmptyFilters disallows filters that don't have at least a tag, a kind, an author or an id.
|
||||||
|
func NoEmptyFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||||
|
c := len(filter.Kinds) + len(filter.IDs) + len(filter.Authors)
|
||||||
|
for _, tagItems := range filter.Tags {
|
||||||
|
c += len(tagItems)
|
||||||
|
}
|
||||||
|
if c == 0 {
|
||||||
|
return true, "can't handle empty filters"
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// AntiSyncBots tries to prevent people from syncing kind:1s from this relay to else by always
|
||||||
|
// requiring an author parameter at least.
|
||||||
|
func AntiSyncBots(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||||
|
return (len(filter.Kinds) == 0 || slices.Contains(filter.Kinds, 1)) &&
|
||||||
|
len(filter.Authors) == 0, "an author must be specified to get their kind:1 notes"
|
||||||
|
}
|
||||||
|
|
||||||
|
func NoSearchQueries(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||||
|
if filter.Search != "" {
|
||||||
|
return true, "search is not supported"
|
||||||
|
}
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func RemoveSearchQueries(ctx context.Context, filter *nostr.Filter) {
|
||||||
|
filter.Search = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func RemoveAllButKinds(kinds ...uint16) func(context.Context, *nostr.Filter) {
|
||||||
|
return func(ctx context.Context, filter *nostr.Filter) {
|
||||||
|
if n := len(filter.Kinds); n > 0 {
|
||||||
|
newKinds := make([]int, 0, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
if k := filter.Kinds[i]; slices.Contains(kinds, uint16(k)) {
|
||||||
|
newKinds = append(newKinds, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
filter.Kinds = newKinds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RemoveAllButTags(tagNames ...string) func(context.Context, *nostr.Filter) {
|
||||||
|
return func(ctx context.Context, filter *nostr.Filter) {
|
||||||
|
for tagName := range filter.Tags {
|
||||||
|
if !slices.Contains(tagNames, tagName) {
|
||||||
|
delete(filter.Tags, tagName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package plugins
|
package policies
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -8,7 +8,8 @@ import (
|
|||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
func rejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, string) {
|
// RejectKind04Snoopers prevents reading NIP-04 messages from people not involved in the conversation.
|
||||||
|
func RejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, string) {
|
||||||
// prevent kind-4 events from being returned to unauthed users,
|
// prevent kind-4 events from being returned to unauthed users,
|
||||||
// only when authentication is a thing
|
// only when authentication is a thing
|
||||||
if !slices.Contains(filter.Kinds, 4) {
|
if !slices.Contains(filter.Kinds, 4) {
|
||||||
@@ -19,13 +20,13 @@ func rejectKind04Snoopers(ctx context.Context, filter nostr.Filter) (bool, strin
|
|||||||
senders := filter.Authors
|
senders := filter.Authors
|
||||||
receivers, _ := filter.Tags["p"]
|
receivers, _ := filter.Tags["p"]
|
||||||
switch {
|
switch {
|
||||||
case ws.Authed == "":
|
case ws.AuthedPublicKey == "":
|
||||||
// not authenticated
|
// not authenticated
|
||||||
return true, "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?"
|
return true, "restricted: this relay does not serve kind-4 to unauthenticated users, does your client implement NIP-42?"
|
||||||
case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.Authed):
|
case len(senders) == 1 && len(receivers) < 2 && (senders[0] == ws.AuthedPublicKey):
|
||||||
// allowed filter: ws.authed is sole sender (filter specifies one or all receivers)
|
// allowed filter: ws.authed is sole sender (filter specifies one or all receivers)
|
||||||
return false, ""
|
return false, ""
|
||||||
case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.Authed):
|
case len(receivers) == 1 && len(senders) < 2 && (receivers[0] == ws.AuthedPublicKey):
|
||||||
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
|
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
|
||||||
return false, ""
|
return false, ""
|
||||||
default:
|
default:
|
||||||
49
relay.go
49
relay.go
@@ -17,6 +17,12 @@ func NewRelay() *Relay {
|
|||||||
return &Relay{
|
return &Relay{
|
||||||
Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags),
|
Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags),
|
||||||
|
|
||||||
|
Info: &nip11.RelayInformationDocument{
|
||||||
|
Software: "https://github.com/fiatjaf/khatru",
|
||||||
|
Version: "n/a",
|
||||||
|
SupportedNIPs: make([]int, 0),
|
||||||
|
},
|
||||||
|
|
||||||
upgrader: websocket.Upgrader{
|
upgrader: websocket.Upgrader{
|
||||||
ReadBufferSize: 1024,
|
ReadBufferSize: 1024,
|
||||||
WriteBufferSize: 1024,
|
WriteBufferSize: 1024,
|
||||||
@@ -34,26 +40,26 @@ func NewRelay() *Relay {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Relay struct {
|
type Relay struct {
|
||||||
Name string
|
ServiceURL string
|
||||||
Description string
|
|
||||||
PubKey string
|
|
||||||
Contact string
|
|
||||||
ServiceURL string // required for nip-42
|
|
||||||
IconURL string
|
|
||||||
|
|
||||||
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
|
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
|
||||||
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
||||||
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
||||||
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
|
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
|
||||||
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
|
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
|
||||||
StoreEvent []func(ctx context.Context, event *nostr.Event) error
|
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||||
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
|
OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||||
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
|
OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument
|
||||||
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
|
StoreEvent []func(ctx context.Context, event *nostr.Event) error
|
||||||
EditInformation []func(ctx context.Context, info *nip11.RelayInformationDocument)
|
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
|
||||||
OnAuth []func(ctx context.Context, pubkey string)
|
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
|
||||||
OnConnect []func(ctx context.Context)
|
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
|
||||||
OnEventSaved []func(ctx context.Context, event *nostr.Event)
|
OnAuth []func(ctx context.Context, pubkey string)
|
||||||
|
OnConnect []func(ctx context.Context)
|
||||||
|
OnEventSaved []func(ctx context.Context, event *nostr.Event)
|
||||||
|
|
||||||
|
// editing info will affect
|
||||||
|
Info *nip11.RelayInformationDocument
|
||||||
|
|
||||||
// Default logger, as set by NewServer, is a stdlib logger prefixed with "[khatru-relay] ",
|
// Default logger, as set by NewServer, is a stdlib logger prefixed with "[khatru-relay] ",
|
||||||
// outputting to stderr.
|
// outputting to stderr.
|
||||||
@@ -76,8 +82,3 @@ type Relay struct {
|
|||||||
PingPeriod time.Duration // Send pings to peer with this period. Must be less than pongWait.
|
PingPeriod time.Duration // Send pings to peer with this period. Must be less than pongWait.
|
||||||
MaxMessageSize int64 // Maximum message size allowed from peer.
|
MaxMessageSize int64 // Maximum message size allowed from peer.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rl *Relay) RequestAuth(ctx context.Context) {
|
|
||||||
ws := GetConnection(ctx)
|
|
||||||
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
|
|
||||||
}
|
|
||||||
|
|||||||
85
serve-req.go
Normal file
85
serve-req.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package khatru
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, filter nostr.Filter) error {
|
||||||
|
defer eose.Done()
|
||||||
|
|
||||||
|
// overwrite the filter (for example, to eliminate some kinds or
|
||||||
|
// that we know we don't support)
|
||||||
|
for _, ovw := range rl.OverwriteFilter {
|
||||||
|
ovw(ctx, &filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.Limit < 0 {
|
||||||
|
return fmt.Errorf("filter invalidated")
|
||||||
|
}
|
||||||
|
|
||||||
|
// then check if we'll reject this filter (we apply this after overwriting
|
||||||
|
// because we may, for example, remove some things from the incoming filters
|
||||||
|
// that we know we don't support, and then if the end result is an empty
|
||||||
|
// filter we can just reject it)
|
||||||
|
for _, reject := range rl.RejectFilter {
|
||||||
|
if reject, msg := reject(ctx, filter); reject {
|
||||||
|
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||||
|
return fmt.Errorf(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the functions to query events (generally just one,
|
||||||
|
// but we might be fetching stuff from multiple places)
|
||||||
|
eose.Add(len(rl.QueryEvents))
|
||||||
|
for _, query := range rl.QueryEvents {
|
||||||
|
ch, err := query(ctx, filter)
|
||||||
|
if err != nil {
|
||||||
|
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||||
|
eose.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(ch chan *nostr.Event) {
|
||||||
|
for event := range ch {
|
||||||
|
for _, ovw := range rl.OverwriteResponseEvent {
|
||||||
|
ovw(ctx, event)
|
||||||
|
}
|
||||||
|
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
|
||||||
|
}
|
||||||
|
eose.Done()
|
||||||
|
}(ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 {
|
||||||
|
// overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support)
|
||||||
|
for _, ovw := range rl.OverwriteCountFilter {
|
||||||
|
ovw(ctx, &filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// then check if we'll reject this filter
|
||||||
|
for _, reject := range rl.RejectCountFilter {
|
||||||
|
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||||
|
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the functions to count (generally it will be just one)
|
||||||
|
var subtotal int64 = 0
|
||||||
|
for _, count := range rl.CountEvents {
|
||||||
|
res, err := count(ctx, filter)
|
||||||
|
if err != nil {
|
||||||
|
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||||
|
}
|
||||||
|
subtotal += res
|
||||||
|
}
|
||||||
|
|
||||||
|
return subtotal
|
||||||
|
}
|
||||||
@@ -1,93 +0,0 @@
|
|||||||
package khatru
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"net/http"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gobwas/ws/wsutil"
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestServerStartShutdown(t *testing.T) {
|
|
||||||
var (
|
|
||||||
inited bool
|
|
||||||
storeInited bool
|
|
||||||
shutdown bool
|
|
||||||
)
|
|
||||||
rl := &testRelay{
|
|
||||||
name: "test server start",
|
|
||||||
init: func() error {
|
|
||||||
inited = true
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
onShutdown: func(context.Context) { shutdown = true },
|
|
||||||
storage: &testStorage{
|
|
||||||
init: func() error { storeInited = true; return nil },
|
|
||||||
},
|
|
||||||
}
|
|
||||||
srv, _ := NewServer(rl)
|
|
||||||
ready := make(chan bool)
|
|
||||||
done := make(chan error)
|
|
||||||
go func() { done <- srv.Start("127.0.0.1", 0, ready); close(done) }()
|
|
||||||
<-ready
|
|
||||||
|
|
||||||
// verify everything's initialized
|
|
||||||
if !inited {
|
|
||||||
t.Error("didn't call testRelay.init")
|
|
||||||
}
|
|
||||||
if !storeInited {
|
|
||||||
t.Error("didn't call testStorage.init")
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that http requests are served
|
|
||||||
if _, err := http.Get("http://" + srv.Addr); err != nil {
|
|
||||||
t.Errorf("GET %s: %v", srv.Addr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify server shuts down
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
srv.Shutdown(ctx)
|
|
||||||
if !shutdown {
|
|
||||||
t.Error("didn't call testRelay.onShutdown")
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case err := <-done:
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("srv.Start: %v", err)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Error("srv.Start too long to return")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServerShutdownWebsocket(t *testing.T) {
|
|
||||||
// set up a new relay server
|
|
||||||
srv := startTestRelay(t, &testRelay{storage: &testStorage{}})
|
|
||||||
|
|
||||||
// connect a client to it
|
|
||||||
ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
client, err := nostr.RelayConnect(ctx1, "ws://"+srv.Addr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("nostr.RelayConnectContext: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now, shut down the server
|
|
||||||
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
srv.Shutdown(ctx2)
|
|
||||||
|
|
||||||
// wait for the client to receive a "connection close"
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
err = client.ConnectionError
|
|
||||||
if e := errors.Unwrap(err); e != nil {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
if _, ok := err.(wsutil.ClosedError); !ok {
|
|
||||||
t.Errorf("client.ConnextionError: %v (%T); want wsutil.ClosedError", err, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
91
util_test.go
91
util_test.go
@@ -1,91 +0,0 @@
|
|||||||
package khatru
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
|
||||||
)
|
|
||||||
|
|
||||||
func startTestRelay(t *testing.T, tr *testRelay) *Server {
|
|
||||||
t.Helper()
|
|
||||||
srv, _ := NewServer(tr)
|
|
||||||
started := make(chan bool)
|
|
||||||
go srv.Start("127.0.0.1", 0, started)
|
|
||||||
<-started
|
|
||||||
return srv
|
|
||||||
}
|
|
||||||
|
|
||||||
type testRelay struct {
|
|
||||||
name string
|
|
||||||
storage Storage
|
|
||||||
init func() error
|
|
||||||
onShutdown func(context.Context)
|
|
||||||
acceptEvent func(*nostr.Event) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr *testRelay) Name() string { return tr.name }
|
|
||||||
func (tr *testRelay) Storage(context.Context) Storage { return tr.storage }
|
|
||||||
|
|
||||||
func (tr *testRelay) Init() error {
|
|
||||||
if fn := tr.init; fn != nil {
|
|
||||||
return fn()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr *testRelay) OnShutdown(ctx context.Context) {
|
|
||||||
if fn := tr.onShutdown; fn != nil {
|
|
||||||
fn(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr *testRelay) AcceptEvent(ctx context.Context, e *nostr.Event) bool {
|
|
||||||
if fn := tr.acceptEvent; fn != nil {
|
|
||||||
return fn(e)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
type testStorage struct {
|
|
||||||
init func() error
|
|
||||||
queryEvents func(context.Context, *nostr.Filter) (chan *nostr.Event, error)
|
|
||||||
deleteEvent func(ctx context.Context, id string, pubkey string) error
|
|
||||||
saveEvent func(context.Context, *nostr.Event) error
|
|
||||||
countEvents func(context.Context, *nostr.Filter) (int64, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *testStorage) Init() error {
|
|
||||||
if fn := st.init; fn != nil {
|
|
||||||
return fn()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *testStorage) QueryEvents(ctx context.Context, f *nostr.Filter) (chan *nostr.Event, error) {
|
|
||||||
if fn := st.queryEvents; fn != nil {
|
|
||||||
return fn(ctx, f)
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *testStorage) DeleteEvent(ctx context.Context, id string, pubkey string) error {
|
|
||||||
if fn := st.deleteEvent; fn != nil {
|
|
||||||
return fn(ctx, id, pubkey)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *testStorage) SaveEvent(ctx context.Context, e *nostr.Event) error {
|
|
||||||
if fn := st.saveEvent; fn != nil {
|
|
||||||
return fn(ctx, e)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *testStorage) CountEvents(ctx context.Context, f *nostr.Filter) (int64, error) {
|
|
||||||
if fn := st.countEvents; fn != nil {
|
|
||||||
return fn(ctx, f)
|
|
||||||
}
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
12
utils.go
12
utils.go
@@ -2,18 +2,8 @@ package khatru
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"hash/maphash"
|
|
||||||
"regexp"
|
|
||||||
"unsafe"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
AUTH_CONTEXT_KEY = iota
|
|
||||||
WS_KEY = iota
|
|
||||||
)
|
|
||||||
|
|
||||||
var nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
|
|
||||||
|
|
||||||
func GetConnection(ctx context.Context) *WebSocket {
|
func GetConnection(ctx context.Context) *WebSocket {
|
||||||
return ctx.Value(WS_KEY).(*WebSocket)
|
return ctx.Value(WS_KEY).(*WebSocket)
|
||||||
}
|
}
|
||||||
@@ -25,5 +15,3 @@ func GetAuthed(ctx context.Context) string {
|
|||||||
}
|
}
|
||||||
return authedPubkey.(string)
|
return authedPubkey.(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
func pointerHasher[V any](_ maphash.Seed, k *V) uint64 { return uint64(uintptr(unsafe.Pointer(k))) }
|
|
||||||
|
|||||||
10
websocket.go
10
websocket.go
@@ -1,6 +1,7 @@
|
|||||||
package khatru
|
package khatru
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/fasthttp/websocket"
|
"github.com/fasthttp/websocket"
|
||||||
@@ -10,10 +11,13 @@ type WebSocket struct {
|
|||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
|
||||||
|
// original request
|
||||||
|
Request *http.Request
|
||||||
|
|
||||||
// nip42
|
// nip42
|
||||||
Challenge string
|
Challenge string
|
||||||
Authed string
|
AuthedPublicKey string
|
||||||
WaitingForAuth chan struct{}
|
Authed chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WebSocket) WriteJSON(any any) error {
|
func (ws *WebSocket) WriteJSON(any any) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user