mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-03-17 05:13:03 +01:00
support negentropy.
This commit is contained in:
parent
f3b18619c7
commit
1dc12e5d2e
@ -20,6 +20,7 @@ func main() {
|
||||
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
|
||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||
relay.Negentropy = true
|
||||
|
||||
fmt.Println("running on :3334")
|
||||
http.ListenAndServe(":3334", relay)
|
||||
|
3
go.mod
3
go.mod
@ -3,6 +3,7 @@ module github.com/fiatjaf/khatru
|
||||
go 1.23.1
|
||||
|
||||
require (
|
||||
github.com/bep/debounce v1.2.1
|
||||
github.com/fasthttp/websocket v1.5.7
|
||||
github.com/fiatjaf/eventstore v0.12.0
|
||||
github.com/nbd-wtf/go-nostr v0.40.0
|
||||
@ -18,6 +19,7 @@ require (
|
||||
github.com/aquasecurity/esquery v0.2.0 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
|
||||
@ -36,6 +38,7 @@ require (
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/flatbuffers v24.3.25+incompatible // indirect
|
||||
github.com/greatroar/blobloom v0.8.0 // indirect
|
||||
github.com/jmoiron/sqlx v1.3.5 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.17.10 // indirect
|
||||
|
10
go.sum
10
go.sum
@ -2,17 +2,23 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
|
||||
fiatjaf.com/lib v0.2.0 h1:TgIJESbbND6GjOgGHxF5jsO6EMjuAxIzZHPo5DXYexs=
|
||||
fiatjaf.com/lib v0.2.0/go.mod h1:Ycqq3+mJ9jAWu7XjbQI1cVr+OFgnHn79dQR5oTII47g=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU=
|
||||
github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU=
|
||||
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
|
||||
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA=
|
||||
github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao=
|
||||
github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY=
|
||||
github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ=
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
@ -87,6 +93,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4=
|
||||
github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
|
||||
github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI=
|
||||
github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs=
|
||||
github.com/jgroeneveld/trial v2.0.0+incompatible h1:d59ctdgor+VqdZCAiUfVN8K13s0ALDioG5DWwZNtRuQ=
|
||||
@ -126,6 +134,8 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
|
||||
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
|
90
handlers.go
90
handlers.go
@ -10,9 +10,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bep/debounce"
|
||||
"github.com/fasthttp/websocket"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip42"
|
||||
"github.com/nbd-wtf/go-nostr/nip77"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
@ -54,9 +58,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
rand.Read(challenge)
|
||||
|
||||
ws := &WebSocket{
|
||||
conn: conn,
|
||||
Request: r,
|
||||
Challenge: hex.EncodeToString(challenge),
|
||||
conn: conn,
|
||||
Request: r,
|
||||
Challenge: hex.EncodeToString(challenge),
|
||||
negentropySessions: xsync.NewMapOf[string, *NegentropySession](),
|
||||
}
|
||||
ws.Context, ws.cancel = context.WithCancel(context.Background())
|
||||
|
||||
@ -123,8 +128,14 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
go func(message []byte) {
|
||||
envelope := nostr.ParseMessage(message)
|
||||
if envelope == nil {
|
||||
// stop silently
|
||||
return
|
||||
if !rl.Negentropy {
|
||||
// stop silently
|
||||
return
|
||||
}
|
||||
envelope = nip77.ParseNegMessage(message)
|
||||
if envelope == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
switch env := envelope.(type) {
|
||||
@ -272,6 +283,75 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
} else {
|
||||
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"})
|
||||
}
|
||||
case *nip77.OpenEnvelope:
|
||||
srl := rl
|
||||
if rl.getSubRelayFromFilter != nil {
|
||||
srl = rl.getSubRelayFromFilter(env.Filter)
|
||||
if !srl.Negentropy {
|
||||
// ignore
|
||||
return
|
||||
}
|
||||
}
|
||||
vec, err := srl.startNegentropySession(ctx, env.Filter)
|
||||
if err != nil {
|
||||
// fail everything if any filter is rejected
|
||||
reason := err.Error()
|
||||
if strings.HasPrefix(reason, "auth-required:") {
|
||||
RequestAuth(ctx)
|
||||
}
|
||||
ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
|
||||
return
|
||||
}
|
||||
|
||||
// reconcile to get the next message and return it
|
||||
neg := negentropy.New(vec, 1024*1024)
|
||||
out, err := neg.Reconcile(env.Message)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: err.Error()})
|
||||
return
|
||||
}
|
||||
ws.WriteJSON(nip77.MessageEnvelope{SubscriptionID: env.SubscriptionID, Message: out})
|
||||
|
||||
// if the message is not empty that means we'll probably have more reconciliation sessions, so store this
|
||||
if out != "" {
|
||||
deb := debounce.New(time.Second * 7)
|
||||
negSession := &NegentropySession{
|
||||
neg: neg,
|
||||
postponeClose: func() {
|
||||
deb(func() {
|
||||
ws.negentropySessions.Delete(env.SubscriptionID)
|
||||
})
|
||||
},
|
||||
}
|
||||
negSession.postponeClose()
|
||||
|
||||
ws.negentropySessions.Store(env.SubscriptionID, negSession)
|
||||
}
|
||||
case *nip77.MessageEnvelope:
|
||||
negSession, ok := ws.negentropySessions.Load(env.SubscriptionID)
|
||||
if !ok {
|
||||
// bad luck, your request was destroyed
|
||||
ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: "CLOSED"})
|
||||
return
|
||||
}
|
||||
// reconcile to get the next message and return it
|
||||
out, err := negSession.neg.Reconcile(env.Message)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: err.Error()})
|
||||
ws.negentropySessions.Delete(env.SubscriptionID)
|
||||
return
|
||||
}
|
||||
ws.WriteJSON(nip77.MessageEnvelope{SubscriptionID: env.SubscriptionID, Message: out})
|
||||
|
||||
// if there is more reconciliation to do, postpone this
|
||||
if out != "" {
|
||||
negSession.postponeClose()
|
||||
} else {
|
||||
// otherwise we can just close it
|
||||
ws.negentropySessions.Delete(env.SubscriptionID)
|
||||
}
|
||||
case *nip77.CloseEnvelope:
|
||||
ws.negentropySessions.Delete(env.SubscriptionID)
|
||||
}
|
||||
}(message)
|
||||
}
|
||||
|
50
negentropy.go
Normal file
50
negentropy.go
Normal file
@ -0,0 +1,50 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector"
|
||||
)
|
||||
|
||||
type NegentropySession struct {
|
||||
neg *negentropy.Negentropy
|
||||
postponeClose func()
|
||||
}
|
||||
|
||||
func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter) (*vector.Vector, error) {
|
||||
// do the same overwrite/reject flow we do in normal REQs
|
||||
for _, ovw := range rl.OverwriteFilter {
|
||||
ovw(ctx, &filter)
|
||||
}
|
||||
if filter.LimitZero {
|
||||
return nil, fmt.Errorf("invalid limit 0")
|
||||
}
|
||||
for _, reject := range rl.RejectFilter {
|
||||
if reject, msg := reject(ctx, filter); reject {
|
||||
return nil, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
|
||||
}
|
||||
}
|
||||
|
||||
// fetch events and add them to a negentropy Vector store
|
||||
vec := vector.New()
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, filter)
|
||||
if err != nil {
|
||||
continue
|
||||
} else if ch == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for event := range ch {
|
||||
// since the goal here is to sync databases we won't do fancy stuff like overwrite events
|
||||
vec.Insert(event.CreatedAt, event.ID)
|
||||
}
|
||||
}
|
||||
vec.Seal()
|
||||
|
||||
return vec, nil
|
||||
}
|
7
nip11.go
7
nip11.go
@ -11,10 +11,13 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
|
||||
info := *rl.Info
|
||||
|
||||
if len(rl.DeleteEvent) > 0 {
|
||||
info.SupportedNIPs = append(info.SupportedNIPs, 9)
|
||||
info.AddSupportedNIP(9)
|
||||
}
|
||||
if len(rl.CountEvents) > 0 {
|
||||
info.SupportedNIPs = append(info.SupportedNIPs, 45)
|
||||
info.AddSupportedNIP(45)
|
||||
}
|
||||
if rl.Negentropy {
|
||||
info.AddSupportedNIP(77)
|
||||
}
|
||||
|
||||
for _, ovw := range rl.OverwriteRelayInformation {
|
||||
|
5
relay.go
5
relay.go
@ -46,7 +46,7 @@ func NewRelay() *Relay {
|
||||
type Relay struct {
|
||||
ServiceURL string
|
||||
|
||||
// these structs keeps track of all the things that can be customized when handling events or requests
|
||||
// hooks that will be called at various times
|
||||
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
|
||||
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
|
||||
StoreEvent []func(ctx context.Context, event *nostr.Event) error
|
||||
@ -90,6 +90,9 @@ type Relay struct {
|
||||
listeners []listener
|
||||
clientsMutex sync.Mutex
|
||||
|
||||
// set this to true to support negentropy
|
||||
Negentropy bool
|
||||
|
||||
// in case you call Server.Start
|
||||
Addr string
|
||||
serveMux *http.ServeMux
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/fasthttp/websocket"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
)
|
||||
|
||||
type WebSocket struct {
|
||||
@ -24,6 +25,9 @@ type WebSocket struct {
|
||||
AuthedPublicKey string
|
||||
Authed chan struct{}
|
||||
|
||||
// nip77
|
||||
negentropySessions *xsync.MapOf[string, *NegentropySession]
|
||||
|
||||
authLock sync.Mutex
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user