Compare commits

...

9 Commits

Author SHA1 Message Date
fiatjaf
f450c26d84 update go-nostr and xsync. 2024-01-10 16:27:50 -03:00
fiatjaf
8842ec2edd OnEphemeralEvent() 2024-01-10 16:24:35 -03:00
fiatjaf
89ac8f1f1a add clause for websocket close code 4537 because why? 2024-01-05 20:55:24 -03:00
fiatjaf
8d0afc1f12 invalidated filters just return nothing instead of erroring. 2024-01-05 20:48:44 -03:00
fiatjaf
40c3dbdc76 add relay.BroadcastEvent() and rename files. 2024-01-01 17:12:10 -03:00
fiatjaf
e876415677 remove unused .OnAuth() and update README example. 2023-12-28 09:17:06 -03:00
fiatjaf
b00e5b2b3f only reset ws.Authed if it's nil.
i.e. if there has been an auth and for some reason the client tried to auth again
after RequestAuth() has been called again.
2023-12-27 13:05:31 -03:00
fiatjaf
0f7d26f26e missed from last commit: setting ws.Authed to nil. 2023-12-27 12:55:05 -03:00
fiatjaf
21b08cb044 fix closing of closed ws.Authed channel when client AUTHs twice. 2023-12-27 12:30:23 -03:00
14 changed files with 109 additions and 81 deletions

View File

@@ -76,16 +76,17 @@ func main() {
return false, "" // anyone else can
},
)
relay.OnConnect = append(relay.OnConnect,
func(ctx context.Context) {
// request NIP-42 AUTH from everybody
khatru.RequestAuth(ctx)
},
)
relay.OnAuth = append(relay.OnAuth,
func(ctx context.Context, pubkey string) {
// and when they auth we just log that for nothing
log.Println(pubkey + " is authed!")
// you can request auth by rejecting an event or a request with the prefix "auth-required: "
relay.RejectFilter = append(relay.RejectFilter,
func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) {
if pubkey := khatru.GetAuthed(ctx); pubkey != "" {
log.Printf("request from %s\n", pubkey)
return false, ""
}
return true, "auth-required: only authenticated users can read from this relay"
// (this will cause an AUTH message to be sent and then a CLOSED message such that clients can
// authenticate and then request again)
},
)
// check the docs for more goodies!

View File

@@ -9,6 +9,7 @@ import (
"github.com/nbd-wtf/go-nostr"
)
// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket.
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
if evt == nil {
return errors.New("error: event is nil")
@@ -26,6 +27,9 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
if 20000 <= evt.Kind && evt.Kind < 30000 {
// do not store ephemeral events
for _, oee := range rl.OnEphemeralEvent {
oee(ctx, evt)
}
} else {
if evt.Kind == 0 || evt.Kind == 3 || (10000 <= evt.Kind && evt.Kind < 20000) {
// replaceable event, delete before storing
@@ -82,46 +86,3 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) error {
return nil
}
func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) error {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
// first we fetch the event
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{IDs: []string{tag[1]}})
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
// got the event, now check if the user can delete it
acceptDeletion := target.PubKey == evt.PubKey
var msg string
if acceptDeletion == false {
msg = "you are not the author of this event"
}
// but if we have a function to overwrite this outcome, use that instead
for _, odo := range rl.OverwriteDeletionOutcome {
acceptDeletion, msg = odo(ctx, target, evt)
}
if acceptDeletion {
// delete it
for _, del := range rl.DeleteEvent {
del(ctx, target)
}
} else {
// fail and stop here
return fmt.Errorf("blocked: %s", msg)
}
// don't try to query this same event again
break
}
}
}
return nil
}

11
broadcasting.go Normal file
View File

@@ -0,0 +1,11 @@
package khatru
import (
"github.com/nbd-wtf/go-nostr"
)
// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt *nostr.Event) {
notifyListeners(evt)
}

51
deleting.go Normal file
View File

@@ -0,0 +1,51 @@
package khatru
import (
"context"
"fmt"
"github.com/nbd-wtf/go-nostr"
)
func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) error {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
// first we fetch the event
for _, query := range rl.QueryEvents {
ch, err := query(ctx, nostr.Filter{IDs: []string{tag[1]}})
if err != nil {
continue
}
target := <-ch
if target == nil {
continue
}
// got the event, now check if the user can delete it
acceptDeletion := target.PubKey == evt.PubKey
var msg string
if acceptDeletion == false {
msg = "you are not the author of this event"
}
// but if we have a function to overwrite this outcome, use that instead
for _, odo := range rl.OverwriteDeletionOutcome {
acceptDeletion, msg = odo(ctx, target, evt)
}
if acceptDeletion {
// delete it
for _, del := range rl.DeleteEvent {
del(ctx, target)
}
} else {
// fail and stop here
return fmt.Errorf("blocked: %s", msg)
}
// don't try to query this same event again
break
}
}
}
return nil
}

View File

@@ -69,12 +69,8 @@ func main() {
return false, ""
}
return true, "auth-required: only authenticated users can read from this relay"
},
)
relay.OnAuth = append(relay.OnAuth,
func(ctx context.Context, pubkey string) {
// and when they auth we can just log that for nothing
log.Println(pubkey + " is authed!")
// (this will cause an AUTH message to be sent and then a CLOSED message such that clients can
// authenticate and then request again)
},
)
// check the docs for more goodies!

6
go.mod
View File

@@ -1,12 +1,12 @@
module github.com/fiatjaf/khatru
go 1.21.0
go 1.21.4
require (
github.com/fasthttp/websocket v1.5.3
github.com/fiatjaf/eventstore v0.3.1
github.com/nbd-wtf/go-nostr v0.27.1
github.com/puzpuzpuz/xsync/v2 v2.5.1
github.com/nbd-wtf/go-nostr v0.28.0
github.com/puzpuzpuz/xsync/v3 v3.0.2
github.com/rs/cors v1.7.0
github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a
golang.org/x/exp v0.0.0-20231006140011-7918f672742d

8
go.sum
View File

@@ -113,15 +113,15 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nbd-wtf/go-nostr v0.27.1 h1:DAwXpAUGxq3/B8KZIWlZmJIoDNkMvlKqQwB/OM/49xk=
github.com/nbd-wtf/go-nostr v0.27.1/go.mod h1:bkffJI+x914sPQWum9ZRUn66D7NpDnAoWo1yICvj3/0=
github.com/nbd-wtf/go-nostr v0.28.0 h1:SLYyoFeCNYb7HyWtmPUzD6rifBOMR66Spj5fzCk+5GE=
github.com/nbd-wtf/go-nostr v0.28.0/go.mod h1:OQ8sNLFJnsj17BdqZiLSmjJBIFTfDqckEYC3utS4qoY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU=
github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/puzpuzpuz/xsync/v3 v3.0.2 h1:3yESHrRFYr6xzkz61LLkvNiPFXxJEAABanTQpKbAaew=
github.com/puzpuzpuz/xsync/v3 v3.0.2/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=

View File

@@ -50,7 +50,6 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
conn: conn,
Request: r,
Challenge: hex.EncodeToString(challenge),
Authed: make(chan struct{}),
}
ctx, cancel := context.WithCancel(
@@ -97,6 +96,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
4537, // some client seems to send many of these
) {
rl.Log.Printf("unexpected close error from %s: %v\n", r.Header.Get("X-Forwarded-For"), err)
}
@@ -204,7 +204,12 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1)
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
ws.AuthedPublicKey = pubkey
close(ws.Authed)
ws.authLock.Lock()
if ws.Authed != nil {
close(ws.Authed)
ws.Authed = nil
}
ws.authLock.Unlock()
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: true})
} else {
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"})

View File

@@ -1,19 +1,13 @@
package khatru
import (
"hash/maphash"
"net/http"
"strconv"
"strings"
"unsafe"
"github.com/nbd-wtf/go-nostr"
)
func pointerHasher[V any](_ maphash.Seed, k *V) uint64 {
return uint64(uintptr(unsafe.Pointer(k)))
}
func isOlder(previous, next *nostr.Event) bool {
return previous.CreatedAt < next.CreatedAt ||
(previous.CreatedAt == next.CreatedAt && previous.ID > next.ID)

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"github.com/nbd-wtf/go-nostr"
"github.com/puzpuzpuz/xsync/v2"
"github.com/puzpuzpuz/xsync/v3"
)
type Listener struct {
@@ -13,7 +13,7 @@ type Listener struct {
cancel context.CancelCauseFunc
}
var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket])
var listeners = xsync.NewMapOf[*WebSocket, *xsync.MapOf[string, *Listener]]()
func GetListeningFilters() nostr.Filters {
respfilters := make(nostr.Filters, 0, listeners.Size()*2)
@@ -49,7 +49,7 @@ func GetListeningFilters() nostr.Filters {
func setListener(id string, ws *WebSocket, filters nostr.Filters, cancel context.CancelCauseFunc) {
subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] {
return xsync.NewMapOf[*Listener]()
return xsync.NewMapOf[string, *Listener]()
})
subs.Store(id, &Listener{filters: filters, cancel: cancel})
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/fasthttp/websocket"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11"
"github.com/puzpuzpuz/xsync/v2"
"github.com/puzpuzpuz/xsync/v3"
)
func NewRelay() *Relay {
@@ -29,7 +29,7 @@ func NewRelay() *Relay {
CheckOrigin: func(r *http.Request) bool { return true },
},
clients: xsync.NewTypedMapOf[*websocket.Conn, struct{}](pointerHasher[websocket.Conn]),
clients: xsync.NewMapOf[*websocket.Conn, struct{}](),
serveMux: &http.ServeMux{},
WriteWait: 10 * time.Second,
@@ -54,10 +54,10 @@ type Relay struct {
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
OnAuth []func(ctx context.Context, pubkey string)
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
OnEventSaved []func(ctx context.Context, event *nostr.Event)
OnEphemeralEvent []func(ctx context.Context, event *nostr.Event)
// editing info will affect
Info *nip11.RelayInformationDocument

View File

@@ -18,7 +18,9 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr
}
if filter.Limit < 0 {
return errors.New("blocked: filter invalidated")
// this is a special situation through which the implementor signals to us that it doesn't want
// to event perform any queries whatsoever
return nil
}
// then check if we'll reject this filter (we apply this after overwriting

View File

@@ -14,6 +14,11 @@ const (
func RequestAuth(ctx context.Context) {
ws := GetConnection(ctx)
ws.authLock.Lock()
if ws.Authed == nil {
ws.Authed = make(chan struct{})
}
ws.authLock.Unlock()
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
}

View File

@@ -18,6 +18,8 @@ type WebSocket struct {
Challenge string
AuthedPublicKey string
Authed chan struct{}
authLock sync.Mutex
}
func (ws *WebSocket) WriteJSON(any any) error {