mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-07 22:16:46 +02:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ecb4fc66f8 | ||
|
|
af24bd2132 | ||
|
|
4905a46ccd | ||
|
|
21e0c559f7 | ||
|
|
78dd138ca8 | ||
|
|
6c1a030ad2 | ||
|
|
270096debb | ||
|
|
487b84cf2d | ||
|
|
b277dae743 | ||
|
|
1e51cdbc07 |
14
.github/workflows/test.yml
vendored
Normal file
14
.github/workflows/test.yml
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
name: test every commit
|
||||
on:
|
||||
- push
|
||||
- pull_request
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version-file: ./go.mod
|
||||
- run: go test ./...
|
||||
@@ -1,5 +1,9 @@
|
||||
# khatru, a relay framework [](https://pkg.go.dev/github.com/fiatjaf/khatru#Relay)
|
||||
|
||||
[](https://github.com/fiatjaf/khatru/actions/workflows/test.yml)
|
||||
[](https://pkg.go.dev/github.com/fiatjaf/khatru)
|
||||
[](https://goreportcard.com/report/github.com/fiatjaf/khatru)
|
||||
|
||||
Khatru makes it easy to write very very custom relays:
|
||||
|
||||
- custom event or filter acceptance policies
|
||||
|
||||
20
add-event.go
20
add-event.go
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
@@ -12,22 +13,15 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
||||
return fmt.Errorf("event is nil")
|
||||
}
|
||||
|
||||
msg := ""
|
||||
rejecting := false
|
||||
for _, reject := range rl.RejectEvent {
|
||||
rejecting, msg = reject(ctx, evt)
|
||||
if rejecting {
|
||||
break
|
||||
if reject, msg := reject(ctx, evt); reject {
|
||||
if msg == "" {
|
||||
msg = "no reason"
|
||||
}
|
||||
return fmt.Errorf(msg)
|
||||
}
|
||||
}
|
||||
|
||||
if rejecting {
|
||||
if msg == "" {
|
||||
msg = "no reason"
|
||||
}
|
||||
return fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
if 20000 <= evt.Kind && evt.Kind < 30000 {
|
||||
// do not store ephemeral events
|
||||
} else {
|
||||
@@ -66,7 +60,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
|
||||
for _, store := range rl.StoreEvent {
|
||||
if saveErr := store(ctx, evt); saveErr != nil {
|
||||
switch saveErr {
|
||||
case ErrDupEvent:
|
||||
case eventstore.ErrDupEvent:
|
||||
return nil
|
||||
default:
|
||||
errmsg := saveErr.Error()
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
package khatru
|
||||
|
||||
import "fmt"
|
||||
|
||||
var ErrDupEvent = fmt.Errorf("duplicate: event already exists")
|
||||
@@ -27,7 +27,7 @@ func main() {
|
||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||
|
||||
relay.RejectEvent = append(relay.RejectEvent, plugins.PreventTooManyIndexableTags(10))
|
||||
relay.RejectFilter = append(relay.RejectFilter, plugins.NoPrefixFilters, plugins.NoComplexFilters)
|
||||
relay.RejectFilter = append(relay.RejectFilter, plugins.NoComplexFilters)
|
||||
|
||||
relay.OnEventSaved = append(relay.OnEventSaved, func(ctx context.Context, event *nostr.Event) {
|
||||
})
|
||||
|
||||
68
handlers.go
68
handlers.go
@@ -7,6 +7,7 @@ import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -165,29 +166,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
|
||||
continue
|
||||
}
|
||||
|
||||
filter := filters[i]
|
||||
|
||||
for _, reject := range rl.RejectFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
continue
|
||||
}
|
||||
}
|
||||
for _, reject := range rl.RejectCountFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, count := range rl.CountEvents {
|
||||
res, err := count(ctx, filter)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||
}
|
||||
total += res
|
||||
}
|
||||
total += rl.handleCountRequest(ctx, ws, filters[i])
|
||||
}
|
||||
|
||||
ws.WriteJSON([]interface{}{"COUNT", id, map[string]int64{"count": total}})
|
||||
@@ -204,46 +183,13 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
eose.Add(len(request[2:]))
|
||||
|
||||
for i, filterReq := range request[2:] {
|
||||
if err := json.Unmarshal(
|
||||
filterReq,
|
||||
&filters[i],
|
||||
); err != nil {
|
||||
if err := json.Unmarshal(filterReq, &filters[i]); err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope("failed to decode filter"))
|
||||
eose.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
filter := filters[i]
|
||||
|
||||
for _, reject := range rl.RejectCountFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
eose.Done()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
eose.Add(len(rl.QueryEvents))
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, filter)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||
eose.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
go func(ch chan *nostr.Event) {
|
||||
for event := range ch {
|
||||
for _, ovw := range rl.OverwriteResponseEvent {
|
||||
ovw(ctx, event)
|
||||
}
|
||||
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
|
||||
}
|
||||
eose.Done()
|
||||
}(ch)
|
||||
}
|
||||
|
||||
eose.Done()
|
||||
go rl.handleRequest(ctx, id, &eose, ws, filters[i])
|
||||
}
|
||||
|
||||
go func() {
|
||||
@@ -294,7 +240,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
case <-ticker.C:
|
||||
err := ws.WriteMessage(websocket.PingMessage, nil)
|
||||
if err != nil {
|
||||
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
|
||||
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -303,7 +251,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Type", "application/nostr+json")
|
||||
|
||||
supportedNIPs := []int{9, 11, 12, 15, 16, 20, 33}
|
||||
if rl.ServiceURL != "" {
|
||||
|
||||
@@ -2,26 +2,11 @@ package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func NoPrefixFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||
for _, id := range filter.IDs {
|
||||
if len(id) != 64 {
|
||||
return true, fmt.Sprintf("filters can only contain full ids")
|
||||
}
|
||||
}
|
||||
for _, pk := range filter.Authors {
|
||||
if len(pk) != 64 {
|
||||
return true, fmt.Sprintf("filters can only contain full pubkeys")
|
||||
}
|
||||
}
|
||||
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||
items := len(filter.Tags) + len(filter.Kinds)
|
||||
|
||||
@@ -31,3 +16,49 @@ func NoComplexFilters(ctx context.Context, filter nostr.Filter) (reject bool, ms
|
||||
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func NoEmptyFilters(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||
c := len(filter.Kinds) + len(filter.IDs) + len(filter.Authors)
|
||||
for _, tagItems := range filter.Tags {
|
||||
c += len(tagItems)
|
||||
}
|
||||
if c == 0 {
|
||||
return true, "can't handle empty filters"
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func NoSearchQueries(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
|
||||
if filter.Search != "" {
|
||||
return true, "search is not supported"
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func RemoveSearchQueries(ctx context.Context, filter *nostr.Filter) {
|
||||
filter.Search = ""
|
||||
}
|
||||
|
||||
func RemoveAllButKinds(kinds ...uint16) func(context.Context, *nostr.Filter) {
|
||||
return func(ctx context.Context, filter *nostr.Filter) {
|
||||
if n := len(filter.Kinds); n > 0 {
|
||||
newKinds := make([]int, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
if k := filter.Kinds[i]; slices.Contains(kinds, uint16(k)) {
|
||||
newKinds = append(newKinds, k)
|
||||
}
|
||||
}
|
||||
filter.Kinds = newKinds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RemoveAllButTags(tagNames ...string) func(context.Context, *nostr.Filter) {
|
||||
return func(ctx context.Context, filter *nostr.Filter) {
|
||||
for tagName := range filter.Tags {
|
||||
if !slices.Contains(tagNames, tagName) {
|
||||
delete(filter.Tags, tagName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
2
relay.go
2
relay.go
@@ -46,6 +46,8 @@ type Relay struct {
|
||||
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
||||
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
|
||||
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
|
||||
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||
OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||
StoreEvent []func(ctx context.Context, event *nostr.Event) error
|
||||
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
|
||||
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
|
||||
|
||||
82
serve-req.go
Normal file
82
serve-req.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, filter nostr.Filter) {
|
||||
defer eose.Done()
|
||||
|
||||
// overwrite the filter (for example, to eliminate some kinds or
|
||||
// that we know we don't support)
|
||||
for _, ovw := range rl.OverwriteFilter {
|
||||
ovw(ctx, &filter)
|
||||
}
|
||||
|
||||
if filter.Limit == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// then check if we'll reject this filter (we apply this after overwriting
|
||||
// because we may, for example, remove some things from the incoming filters
|
||||
// that we know we don't support, and then if the end result is an empty
|
||||
// filter we can just reject it)
|
||||
for _, reject := range rl.RejectFilter {
|
||||
if reject, msg := reject(ctx, filter); reject {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// run the functions to query events (generally just one,
|
||||
// but we might be fetching stuff from multiple places)
|
||||
eose.Add(len(rl.QueryEvents))
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, filter)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||
eose.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
go func(ch chan *nostr.Event) {
|
||||
for event := range ch {
|
||||
for _, ovw := range rl.OverwriteResponseEvent {
|
||||
ovw(ctx, event)
|
||||
}
|
||||
ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})
|
||||
}
|
||||
eose.Done()
|
||||
}(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 {
|
||||
// overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support)
|
||||
for _, ovw := range rl.OverwriteCountFilter {
|
||||
ovw(ctx, &filter)
|
||||
}
|
||||
|
||||
// then check if we'll reject this filter
|
||||
for _, reject := range rl.RejectCountFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// run the functions to count (generally it will be just one)
|
||||
var subtotal int64 = 0
|
||||
for _, count := range rl.CountEvents {
|
||||
res, err := count(ctx, filter)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||
}
|
||||
subtotal += res
|
||||
}
|
||||
|
||||
return subtotal
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func TestServerStartShutdown(t *testing.T) {
|
||||
var (
|
||||
inited bool
|
||||
storeInited bool
|
||||
shutdown bool
|
||||
)
|
||||
rl := &testRelay{
|
||||
name: "test server start",
|
||||
init: func() error {
|
||||
inited = true
|
||||
return nil
|
||||
},
|
||||
onShutdown: func(context.Context) { shutdown = true },
|
||||
storage: &testStorage{
|
||||
init: func() error { storeInited = true; return nil },
|
||||
},
|
||||
}
|
||||
srv, _ := NewServer(rl)
|
||||
ready := make(chan bool)
|
||||
done := make(chan error)
|
||||
go func() { done <- srv.Start("127.0.0.1", 0, ready); close(done) }()
|
||||
<-ready
|
||||
|
||||
// verify everything's initialized
|
||||
if !inited {
|
||||
t.Error("didn't call testRelay.init")
|
||||
}
|
||||
if !storeInited {
|
||||
t.Error("didn't call testStorage.init")
|
||||
}
|
||||
|
||||
// check that http requests are served
|
||||
if _, err := http.Get("http://" + srv.Addr); err != nil {
|
||||
t.Errorf("GET %s: %v", srv.Addr, err)
|
||||
}
|
||||
|
||||
// verify server shuts down
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
srv.Shutdown(ctx)
|
||||
if !shutdown {
|
||||
t.Error("didn't call testRelay.onShutdown")
|
||||
}
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
t.Errorf("srv.Start: %v", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Error("srv.Start too long to return")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerShutdownWebsocket(t *testing.T) {
|
||||
// set up a new relay server
|
||||
srv := startTestRelay(t, &testRelay{storage: &testStorage{}})
|
||||
|
||||
// connect a client to it
|
||||
ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
client, err := nostr.RelayConnect(ctx1, "ws://"+srv.Addr)
|
||||
if err != nil {
|
||||
t.Fatalf("nostr.RelayConnectContext: %v", err)
|
||||
}
|
||||
|
||||
// now, shut down the server
|
||||
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
srv.Shutdown(ctx2)
|
||||
|
||||
// wait for the client to receive a "connection close"
|
||||
time.Sleep(1 * time.Second)
|
||||
err = client.ConnectionError
|
||||
if e := errors.Unwrap(err); e != nil {
|
||||
err = e
|
||||
}
|
||||
if _, ok := err.(wsutil.ClosedError); !ok {
|
||||
t.Errorf("client.ConnextionError: %v (%T); want wsutil.ClosedError", err, err)
|
||||
}
|
||||
}
|
||||
91
util_test.go
91
util_test.go
@@ -1,91 +0,0 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func startTestRelay(t *testing.T, tr *testRelay) *Server {
|
||||
t.Helper()
|
||||
srv, _ := NewServer(tr)
|
||||
started := make(chan bool)
|
||||
go srv.Start("127.0.0.1", 0, started)
|
||||
<-started
|
||||
return srv
|
||||
}
|
||||
|
||||
type testRelay struct {
|
||||
name string
|
||||
storage Storage
|
||||
init func() error
|
||||
onShutdown func(context.Context)
|
||||
acceptEvent func(*nostr.Event) bool
|
||||
}
|
||||
|
||||
func (tr *testRelay) Name() string { return tr.name }
|
||||
func (tr *testRelay) Storage(context.Context) Storage { return tr.storage }
|
||||
|
||||
func (tr *testRelay) Init() error {
|
||||
if fn := tr.init; fn != nil {
|
||||
return fn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tr *testRelay) OnShutdown(ctx context.Context) {
|
||||
if fn := tr.onShutdown; fn != nil {
|
||||
fn(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (tr *testRelay) AcceptEvent(ctx context.Context, e *nostr.Event) bool {
|
||||
if fn := tr.acceptEvent; fn != nil {
|
||||
return fn(e)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type testStorage struct {
|
||||
init func() error
|
||||
queryEvents func(context.Context, *nostr.Filter) (chan *nostr.Event, error)
|
||||
deleteEvent func(ctx context.Context, id string, pubkey string) error
|
||||
saveEvent func(context.Context, *nostr.Event) error
|
||||
countEvents func(context.Context, *nostr.Filter) (int64, error)
|
||||
}
|
||||
|
||||
func (st *testStorage) Init() error {
|
||||
if fn := st.init; fn != nil {
|
||||
return fn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *testStorage) QueryEvents(ctx context.Context, f *nostr.Filter) (chan *nostr.Event, error) {
|
||||
if fn := st.queryEvents; fn != nil {
|
||||
return fn(ctx, f)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (st *testStorage) DeleteEvent(ctx context.Context, id string, pubkey string) error {
|
||||
if fn := st.deleteEvent; fn != nil {
|
||||
return fn(ctx, id, pubkey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *testStorage) SaveEvent(ctx context.Context, e *nostr.Event) error {
|
||||
if fn := st.saveEvent; fn != nil {
|
||||
return fn(ctx, e)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *testStorage) CountEvents(ctx context.Context, f *nostr.Filter) (int64, error) {
|
||||
if fn := st.countEvents; fn != nil {
|
||||
return fn(ctx, f)
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
Reference in New Issue
Block a user