splitting the code into multiple packages (wip)

This commit is contained in:
fiatjaf
2024-05-29 16:33:57 -03:00
parent 31e0645afe
commit a4852ef6bc
39 changed files with 255 additions and 153 deletions

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"bytes" "bytes"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"crypto/sha256" "crypto/sha256"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
json "encoding/json" json "encoding/json"

View File

@ -1,4 +1,4 @@
package nostr package core
// SetExtra sets an out-of-the-spec value under the given key into the event object. // SetExtra sets an out-of-the-spec value under the given key into the event object.
func (evt *Event) SetExtra(key string, value any) { func (evt *Event) SetExtra(key string, value any) {

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
json "encoding/json" json "encoding/json"

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,27 +1,9 @@
package nostr package core
import ( import (
"sync"
"unsafe"
"golang.org/x/exp/constraints" "golang.org/x/exp/constraints"
) )
const MAX_LOCKS = 50
var namedMutexPool = make([]sync.Mutex, MAX_LOCKS)
//go:noescape
//go:linkname memhash runtime.memhash
func memhash(p unsafe.Pointer, h, s uintptr) uintptr
func namedLock(name string) (unlock func()) {
sptr := unsafe.StringData(name)
idx := uint64(memhash(unsafe.Pointer(sptr), 0, uintptr(len(name)))) % MAX_LOCKS
namedMutexPool[idx].Lock()
return namedMutexPool[idx].Unlock
}
func similar[E constraints.Ordered](as, bs []E) bool { func similar[E constraints.Ordered](as, bs []E) bool {
if len(as) != len(bs) { if len(as) != len(bs) {
return false return false

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"crypto/rand" "crypto/rand"

View File

@ -1,4 +1,4 @@
package nostr package core
type ProfilePointer struct { type ProfilePointer struct {
PublicKey string `json:"pubkey"` PublicKey string `json:"pubkey"`

View File

@ -1,4 +1,4 @@
package nostr package core
import ( import (
"testing" "testing"

View File

@ -1,11 +1,12 @@
package nostr package core
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"slices"
"strings" "strings"
"slices" "github.com/nbd-wtf/go-nostr/utils"
) )
type Tag []string type Tag []string
@ -54,7 +55,7 @@ func (tag Tag) Value() string {
func (tag Tag) Relay() string { func (tag Tag) Relay() string {
if (tag[0] == "e" || tag[0] == "p") && len(tag) > 2 { if (tag[0] == "e" || tag[0] == "p") && len(tag) > 2 {
return NormalizeURL(tag[2]) return utils.NormalizeURL(tag[2])
} }
return "" return ""
} }

View File

@ -1,4 +1,4 @@
package nostr package core
import "time" import "time"

61
lib.go Normal file
View File

@ -0,0 +1,61 @@
package nostr
import (
"github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/relays"
"github.com/nbd-wtf/go-nostr/utils"
)
type (
Event core.Event
Filter core.Filter
Filters core.Filters
Timestamp core.Timestamp
Tag core.Tag
TagMap core.TagMap
ProfilePointer core.ProfilePointer
EventPointer core.EventPointer
EntityPointer core.EntityPointer
AuthEnvelope core.AuthEnvelope
OKEnvelope core.OKEnvelope
NoticeEnvelope core.NoticeEnvelope
EventEnvelope core.EventEnvelope
CloseEnvelope core.CloseEnvelope
ClosedEnvelope core.ClosedEnvelope
CountEnvelope core.CountEnvelope
EOSEEnvelope core.EOSEEnvelope
ReqEnvelope core.ReqEnvelope
Envelope core.Envelope
Relay relays.Relay
RelayOption relays.RelayOption
SimplePool relays.SimplePool
PoolOption relays.PoolOption
Subscription relays.Subscription
SubscriptionOption relays.SubscriptionOption
IncomingEvent relays.IncomingEvent
WithAuthHandler relays.WithAuthHandler
WithLabel relays.WithLabel
WithNoticeHandler relays.WithNoticeHandler
RelayStore relays.RelayStore
MultiStore relays.MultiStore
)
var (
Now = core.Now
FilterEqual = core.FilterEqual
GeneratePrivateKey = core.GeneratePrivateKey
GetPublicKey = core.GetPublicKey
IsValidPublicKey = core.IsValidPublicKey
NewSimplePool = relays.NewSimplePool
NewRelay = relays.NewRelay
IsValid32ByteHex = utils.IsValid32ByteHex
IsValidRelayURL = utils.IsValidRelayURL
NormalizeOKMessage = utils.NormalizeOKMessage
NormalizeURL = utils.NormalizeURL
)

View File

@ -8,3 +8,7 @@ cpu: AMD Ryzen 3 3200G with Radeon Vega Graphics
BenchmarkSignatureVerification/btcec-4 145 7873130 ns/op 127069 B/op 579 allocs/op BenchmarkSignatureVerification/btcec-4 145 7873130 ns/op 127069 B/op 579 allocs/op
BenchmarkSignatureVerification/libsecp256k1-4 502 2314573 ns/op 112241 B/op 392 allocs/op BenchmarkSignatureVerification/libsecp256k1-4 502 2314573 ns/op 112241 B/op 392 allocs/op
``` ```
To use this manually, just import it.
To use it for the `Relay` subscriptions automatic signature verification, compile your application with the Go build tag `libsecp256k1`.

View File

@ -4,14 +4,14 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/test_common" "github.com/nbd-wtf/go-nostr/test_common"
) )
func BenchmarkSignatureVerification(b *testing.B) { func BenchmarkSignatureVerification(b *testing.B) {
events := make([]*nostr.Event, len(test_common.NormalEvents)) events := make([]*core.Event, len(test_common.NormalEvents))
for i, jevt := range test_common.NormalEvents { for i, jevt := range test_common.NormalEvents {
evt := &nostr.Event{} evt := &core.Event{}
json.Unmarshal([]byte(jevt), evt) json.Unmarshal([]byte(jevt), evt)
events[i] = evt events[i] = evt
} }
@ -27,7 +27,7 @@ func BenchmarkSignatureVerification(b *testing.B) {
b.Run("libsecp256k1", func(b *testing.B) { b.Run("libsecp256k1", func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for _, evt := range events { for _, evt := range events {
CheckSignature(evt) CheckSignature(*evt)
} }
} }
}) })

View File

@ -4,15 +4,15 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/test_common" "github.com/nbd-wtf/go-nostr/test_common"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestEventVerification(t *testing.T) { func TestEventVerification(t *testing.T) {
for _, jevt := range test_common.NormalEvents { for _, jevt := range test_common.NormalEvents {
evt := &nostr.Event{} evt := core.Event{}
json.Unmarshal([]byte(jevt), evt) json.Unmarshal([]byte(jevt), &evt)
ok, _ := CheckSignature(evt) ok, _ := CheckSignature(evt)
shouldBe, _ := evt.CheckSignature() shouldBe, _ := evt.CheckSignature()
assert.Equal(t, ok, shouldBe, "%s signature must be %s", jevt, shouldBe) assert.Equal(t, ok, shouldBe, "%s signature must be %s", jevt, shouldBe)

View File

@ -5,10 +5,10 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/core"
) )
func CheckSignature(evt *nostr.Event) (bool, error) { func CheckSignature(evt core.Event) (bool, error) {
var pk [32]byte var pk [32]byte
_, err := hex.Decode(pk[:], []byte(evt.PubKey)) _, err := hex.Decode(pk[:], []byte(evt.PubKey))
if err != nil { if err != nil {

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"bytes" "bytes"

View File

@ -1,8 +1,10 @@
package nostr package relays
import ( import (
"context" "context"
"testing" "testing"
"github.com/nbd-wtf/go-nostr/core"
) )
func TestCount(t *testing.T) { func TestCount(t *testing.T) {
@ -11,8 +13,8 @@ func TestCount(t *testing.T) {
rl := mustRelayConnect(RELAY) rl := mustRelayConnect(RELAY)
defer rl.Close() defer rl.Close()
count, err := rl.Count(context.Background(), Filters{ count, err := rl.Count(context.Background(), core.Filters{
{Kinds: []int{KindContactList}, Tags: TagMap{"p": []string{"3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"}}}, {Kinds: []int{core.KindContactList}, Tags: core.TagMap{"p": []string{"3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"}}},
}) })
if err != nil { if err != nil {
t.Errorf("count request failed: %v", err) t.Errorf("count request failed: %v", err)

View File

@ -1,17 +1,19 @@
package nostr package relays
import ( import (
"context" "context"
"testing" "testing"
"time" "time"
"github.com/nbd-wtf/go-nostr/core"
) )
func TestEOSEMadness(t *testing.T) { func TestEOSEMadness(t *testing.T) {
rl := mustRelayConnect(RELAY) rl := mustRelayConnect(RELAY)
defer rl.Close() defer rl.Close()
sub, err := rl.Subscribe(context.Background(), Filters{ sub, err := rl.Subscribe(context.Background(), core.Filters{
{Kinds: []int{KindTextNote}, Limit: 2}, {Kinds: []int{core.KindTextNote}, Limit: 2},
}) })
if err != nil { if err != nil {
t.Errorf("subscription failed: %v", err) t.Errorf("subscription failed: %v", err)

22
relays/helpers.go Normal file
View File

@ -0,0 +1,22 @@
package relays
import (
"sync"
"unsafe"
)
const MAX_LOCKS = 50
var namedMutexPool = make([]sync.Mutex, MAX_LOCKS)
//go:noescape
//go:linkname memhash runtime.memhash
func memhash(p unsafe.Pointer, h, s uintptr) uintptr
func namedLock(name string) (unlock func()) {
sptr := unsafe.StringData(name)
idx := uint64(memhash(unsafe.Pointer(sptr), 0, uintptr(len(name)))) % MAX_LOCKS
l := &namedMutexPool[idx]
l.Lock()
return l.Unlock
}

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"testing" "testing"

View File

@ -1,14 +1,16 @@
package nostr package relays
import ( import (
"context" "context"
"errors" "errors"
"slices" "slices"
"github.com/nbd-wtf/go-nostr/core"
) )
type RelayStore interface { type RelayStore interface {
Publish(ctx context.Context, event Event) error Publish(ctx context.Context, event core.Event) error
QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) QuerySync(ctx context.Context, filter core.Filter, opts ...SubscriptionOption) ([]*core.Event, error)
} }
var ( var (
@ -18,7 +20,7 @@ var (
type MultiStore []RelayStore type MultiStore []RelayStore
func (multi MultiStore) Publish(ctx context.Context, event Event) error { func (multi MultiStore) Publish(ctx context.Context, event core.Event) error {
errs := make([]error, len(multi)) errs := make([]error, len(multi))
for i, s := range multi { for i, s := range multi {
errs[i] = s.Publish(ctx, event) errs[i] = s.Publish(ctx, event)
@ -26,15 +28,15 @@ func (multi MultiStore) Publish(ctx context.Context, event Event) error {
return errors.Join(errs...) return errors.Join(errs...)
} }
func (multi MultiStore) QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) { func (multi MultiStore) QuerySync(ctx context.Context, filter core.Filter, opts ...SubscriptionOption) ([]*core.Event, error) {
errs := make([]error, len(multi)) errs := make([]error, len(multi))
events := make([]*Event, 0, max(filter.Limit, 10)) events := make([]*core.Event, 0, max(filter.Limit, 10))
for i, s := range multi { for i, s := range multi {
res, err := s.QuerySync(ctx, filter, opts...) res, err := s.QuerySync(ctx, filter, opts...)
errs[i] = err errs[i] = err
events = append(events, res...) events = append(events, res...)
} }
slices.SortFunc(events, func(a, b *Event) int { slices.SortFunc(events, func(a, b *core.Event) int {
if b.CreatedAt > a.CreatedAt { if b.CreatedAt > a.CreatedAt {
return 1 return 1
} else if b.CreatedAt < a.CreatedAt { } else if b.CreatedAt < a.CreatedAt {

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"log" "log"

View File

@ -1,6 +1,6 @@
//go:build debug //go:build debug
package nostr package relays
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,6 +1,6 @@
//go:build !debug //go:build !debug
package nostr package relays
func debugLogf(str string, args ...any) { func debugLogf(str string, args ...any) {
} }

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"context" "context"
@ -9,6 +9,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/utils"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
) )
@ -20,17 +22,17 @@ type SimplePool struct {
Relays *xsync.MapOf[string, *Relay] Relays *xsync.MapOf[string, *Relay]
Context context.Context Context context.Context
authHandler func(*Event) error authHandler func(*core.Event) error
cancel context.CancelFunc cancel context.CancelFunc
} }
type DirectedFilters struct { type DirectedFilters struct {
Filters core.Filters
Relay string Relay string
} }
type IncomingEvent struct { type IncomingEvent struct {
*Event *core.Event
Relay *Relay Relay *Relay
} }
@ -63,7 +65,7 @@ func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
// WithAuthHandler must be a function that signs the auth event when called. // WithAuthHandler must be a function that signs the auth event when called.
// it will be called whenever any relay in the pool returns a `CLOSED` message // it will be called whenever any relay in the pool returns a `CLOSED` message
// with the "auth-required:" prefix, only once for each relay // with the "auth-required:" prefix, only once for each relay
type WithAuthHandler func(authEvent *Event) error type WithAuthHandler func(authEvent *core.Event) error
func (_ WithAuthHandler) IsPoolOption() {} func (_ WithAuthHandler) IsPoolOption() {}
func (h WithAuthHandler) Apply(pool *SimplePool) { func (h WithAuthHandler) Apply(pool *SimplePool) {
@ -73,7 +75,7 @@ func (h WithAuthHandler) Apply(pool *SimplePool) {
var _ PoolOption = (WithAuthHandler)(nil) var _ PoolOption = (WithAuthHandler)(nil)
func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
nm := NormalizeURL(url) nm := utils.NormalizeURL(url)
defer namedLock(nm)() defer namedLock(nm)()
relay, ok := pool.Relays.Load(nm) relay, ok := pool.Relays.Load(nm)
@ -96,20 +98,20 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
// SubMany opens a subscription with the given filters to multiple relays // SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled // the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters core.Filters) chan IncomingEvent {
return pool.subMany(ctx, urls, filters, true) return pool.subMany(ctx, urls, filters, true)
} }
// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters core.Filters) chan IncomingEvent {
return pool.subMany(ctx, urls, filters, false) return pool.subMany(ctx, urls, filters, false)
} }
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters core.Filters, unique bool) chan IncomingEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining _ = cancel // do this so `go vet` will stop complaining
events := make(chan IncomingEvent) events := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[string, Timestamp]() seenAlready := xsync.NewMapOf[string, core.Timestamp]()
ticker := time.NewTicker(seenAlreadyDropTick) ticker := time.NewTicker(seenAlreadyDropTick)
eose := false eose := false
@ -117,7 +119,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
pending := xsync.NewCounter() pending := xsync.NewCounter()
pending.Add(int64(len(urls))) pending.Add(int64(len(urls)))
for i, url := range urls { for i, url := range urls {
url = NormalizeURL(url) url = utils.NormalizeURL(url)
urls[i] = url urls[i] = url
if idx := slices.Index(urls, url); idx != i { if idx := slices.Index(urls, url); idx != i {
// skip duplicate relays in the list // skip duplicate relays in the list
@ -171,7 +173,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
// this means the connection was closed for weird reasons, like the server shut down // this means the connection was closed for weird reasons, like the server shut down
// so we will update the filters here to include only events seem from now on // so we will update the filters here to include only events seem from now on
// and try to reconnect until we succeed // and try to reconnect until we succeed
now := Now() now := core.Now()
for i := range filters { for i := range filters {
filters[i].Since = &now filters[i].Since = &now
} }
@ -188,8 +190,8 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
} }
case <-ticker.C: case <-ticker.C:
if eose { if eose {
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) old := core.Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(id string, value Timestamp) bool { seenAlready.Range(func(id string, value core.Timestamp) bool {
if value < old { if value < old {
seenAlready.Delete(id) seenAlready.Delete(id)
} }
@ -225,16 +227,16 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
} }
// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters core.Filters) chan IncomingEvent {
return pool.subManyEose(ctx, urls, filters, true) return pool.subManyEose(ctx, urls, filters, true)
} }
// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters core.Filters) chan IncomingEvent {
return pool.subManyEose(ctx, urls, filters, false) return pool.subManyEose(ctx, urls, filters, false)
} }
func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters core.Filters, unique bool) chan IncomingEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
events := make(chan IncomingEvent) events := make(chan IncomingEvent)
@ -302,17 +304,17 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
} }
} }
} }
}(NormalizeURL(url)) }(utils.NormalizeURL(url))
} }
return events return events
} }
// QuerySingle returns the first event returned by the first relay, cancels everything else. // QuerySingle returns the first event returned by the first relay, cancels everything else.
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent { func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter core.Filter) *IncomingEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { for ievt := range pool.SubManyEose(ctx, urls, core.Filters{filter}) {
return &ievt return &ievt
} }
return nil return nil
@ -321,7 +323,7 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) batchedSubMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilters, dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool) chan IncomingEvent, subFn func(context.Context, []string, core.Filters, bool) chan IncomingEvent,
) chan IncomingEvent { ) chan IncomingEvent {
res := make(chan IncomingEvent) res := make(chan IncomingEvent)

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"bytes" "bytes"
@ -13,11 +13,11 @@ import (
"github.com/gobwas/ws" "github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil" "github.com/gobwas/ws/wsutil"
"github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/utils"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
) )
type Status int
var subscriptionIDCounter atomic.Int32 var subscriptionIDCounter atomic.Int32
type Relay struct { type Relay struct {
@ -53,7 +53,7 @@ type writeRequest struct {
func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
r := &Relay{ r := &Relay{
URL: NormalizeURL(url), URL: utils.NormalizeURL(url),
connectionContext: ctx, connectionContext: ctx,
connectionContextCancel: cancel, connectionContextCancel: cancel,
Subscriptions: xsync.NewMapOf[string, *Subscription](), Subscriptions: xsync.NewMapOf[string, *Subscription](),
@ -203,25 +203,25 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
message := buf.Bytes() message := buf.Bytes()
debugLogf("{%s} %v\n", r.URL, message) debugLogf("{%s} %v\n", r.URL, message)
envelope := ParseMessage(message) envelope := core.ParseMessage(message)
if envelope == nil { if envelope == nil {
continue continue
} }
switch env := envelope.(type) { switch env := envelope.(type) {
case *NoticeEnvelope: case *core.NoticeEnvelope:
// see WithNoticeHandler // see WithNoticeHandler
if r.notices != nil { if r.notices != nil {
r.notices <- string(*env) r.notices <- string(*env)
} else { } else {
log.Printf("NOTICE from %s: '%s'\n", r.URL, string(*env)) log.Printf("NOTICE from %s: '%s'\n", r.URL, string(*env))
} }
case *AuthEnvelope: case *core.AuthEnvelope:
if env.Challenge == nil { if env.Challenge == nil {
continue continue
} }
r.challenge = *env.Challenge r.challenge = *env.Challenge
case *EventEnvelope: case *core.EventEnvelope:
if env.SubscriptionID == nil { if env.SubscriptionID == nil {
continue continue
} }
@ -237,12 +237,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
// check signature, ignore invalid, except from trusted (AssumeValid) relays // check signature, ignore invalid, except from trusted (AssumeValid) relays
if !r.AssumeValid { if !r.AssumeValid {
if ok, err := env.Event.CheckSignature(); !ok { if ok := checkSigOnRelay(env.Event); !ok {
errmsg := "" InfoLogger.Printf("{%s} bad signature on %s\n", r.URL, env.Event.ID)
if err != nil {
errmsg = err.Error()
}
InfoLogger.Printf("{%s} bad signature on %s; %s\n", r.URL, env.Event.ID, errmsg)
continue continue
} }
} }
@ -250,19 +246,19 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
// dispatch this to the internal .events channel of the subscription // dispatch this to the internal .events channel of the subscription
subscription.dispatchEvent(&env.Event) subscription.dispatchEvent(&env.Event)
} }
case *EOSEEnvelope: case *core.EOSEEnvelope:
if subscription, ok := r.Subscriptions.Load(string(*env)); ok { if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
subscription.dispatchEose() subscription.dispatchEose()
} }
case *ClosedEnvelope: case *core.ClosedEnvelope:
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok { if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok {
subscription.dispatchClosed(env.Reason) subscription.dispatchClosed(env.Reason)
} }
case *CountEnvelope: case *core.CountEnvelope:
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
subscription.countResult <- *env.Count subscription.countResult <- *env.Count
} }
case *OKEnvelope: case *core.OKEnvelope:
if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { if okCallback, exist := r.okCallbacks.Load(env.EventID); exist {
okCallback(env.OK, env.Reason) okCallback(env.OK, env.Reason)
} else { } else {
@ -287,18 +283,18 @@ func (r *Relay) Write(msg []byte) <-chan error {
} }
// Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response. // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response.
func (r *Relay) Publish(ctx context.Context, event Event) error { func (r *Relay) Publish(ctx context.Context, event core.Event) error {
return r.publish(ctx, event.ID, &EventEnvelope{Event: event}) return r.publish(ctx, event.ID, &core.EventEnvelope{Event: event})
} }
// Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response. // Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response.
func (r *Relay) Auth(ctx context.Context, sign func(event *Event) error) error { func (r *Relay) Auth(ctx context.Context, sign func(event *core.Event) error) error {
authEvent := Event{ authEvent := core.Event{
CreatedAt: Now(), CreatedAt: core.Now(),
Kind: KindClientAuthentication, Kind: core.KindClientAuthentication,
Tags: Tags{ Tags: core.Tags{
Tag{"relay", r.URL}, {"relay", r.URL},
Tag{"challenge", r.challenge}, {"challenge", r.challenge},
}, },
Content: "", Content: "",
} }
@ -306,11 +302,11 @@ func (r *Relay) Auth(ctx context.Context, sign func(event *Event) error) error {
return fmt.Errorf("error signing auth event: %w", err) return fmt.Errorf("error signing auth event: %w", err)
} }
return r.publish(ctx, authEvent.ID, &AuthEnvelope{Event: authEvent}) return r.publish(ctx, authEvent.ID, &core.AuthEnvelope{Event: authEvent})
} }
// publish can be used both for EVENT and for AUTH // publish can be used both for EVENT and for AUTH
func (r *Relay) publish(ctx context.Context, id string, env Envelope) error { func (r *Relay) publish(ctx context.Context, id string, env core.Envelope) error {
var err error var err error
var cancel context.CancelFunc var cancel context.CancelFunc
@ -363,7 +359,7 @@ func (r *Relay) publish(ctx context.Context, id string, env Envelope) error {
// //
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point. // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
// Failure to do that will result in a huge number of halted goroutines being created. // Failure to do that will result in a huge number of halted goroutines being created.
func (r *Relay) Subscribe(ctx context.Context, filters Filters, opts ...SubscriptionOption) (*Subscription, error) { func (r *Relay) Subscribe(ctx context.Context, filters core.Filters, opts ...SubscriptionOption) (*Subscription, error) {
sub := r.PrepareSubscription(ctx, filters, opts...) sub := r.PrepareSubscription(ctx, filters, opts...)
if err := sub.Fire(); err != nil { if err := sub.Fire(); err != nil {
@ -377,7 +373,7 @@ func (r *Relay) Subscribe(ctx context.Context, filters Filters, opts ...Subscrip
// //
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point. // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
// Failure to do that will result in a huge number of halted goroutines being created. // Failure to do that will result in a huge number of halted goroutines being created.
func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts ...SubscriptionOption) *Subscription { func (r *Relay) PrepareSubscription(ctx context.Context, filters core.Filters, opts ...SubscriptionOption) *Subscription {
if r.Connection == nil { if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
} }
@ -390,7 +386,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
Context: ctx, Context: ctx,
cancel: cancel, cancel: cancel,
counter: int(current), counter: int(current),
Events: make(chan *Event), Events: make(chan *core.Event),
EndOfStoredEvents: make(chan struct{}, 1), EndOfStoredEvents: make(chan struct{}, 1),
ClosedReason: make(chan string, 1), ClosedReason: make(chan string, 1),
Filters: filters, Filters: filters,
@ -412,8 +408,8 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
return sub return sub
} }
func (r *Relay) QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) { func (r *Relay) QuerySync(ctx context.Context, filter core.Filter, opts ...SubscriptionOption) ([]*core.Event, error) {
sub, err := r.Subscribe(ctx, Filters{filter}, opts...) sub, err := r.Subscribe(ctx, core.Filters{filter}, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -427,7 +423,7 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter, opts ...Subscripti
defer cancel() defer cancel()
} }
var events []*Event var events []*core.Event
for { for {
select { select {
case evt := <-sub.Events: case evt := <-sub.Events:
@ -444,7 +440,7 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter, opts ...Subscripti
} }
} }
func (r *Relay) Count(ctx context.Context, filters Filters, opts ...SubscriptionOption) (int64, error) { func (r *Relay) Count(ctx context.Context, filters core.Filters, opts ...SubscriptionOption) (int64, error) {
sub := r.PrepareSubscription(ctx, filters, opts...) sub := r.PrepareSubscription(ctx, filters, opts...)
sub.countResult = make(chan int64) sub.countResult = make(chan int64)

10
relays/relay_checksig.go Normal file
View File

@ -0,0 +1,10 @@
//go:build !libsecp256k1
package relays
import "github.com/nbd-wtf/go-nostr/core"
func checkSigOnRelay(evt core.Event) bool {
ok, _ := evt.CheckSignature()
return ok
}

View File

@ -0,0 +1,13 @@
//go:build libsecp256k1
package relays
import (
"github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/libsecp256k1"
)
func checkSigOnRelay(evt core.Event) bool {
ok, _ := libsecp256k1.CheckSignature(evt)
return ok
}

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"bytes" "bytes"
@ -12,17 +12,19 @@ import (
"testing" "testing"
"time" "time"
"github.com/nbd-wtf/go-nostr/core"
"github.com/nbd-wtf/go-nostr/utils"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
func TestPublish(t *testing.T) { func TestPublish(t *testing.T) {
// test note to be sent over websocket // test note to be sent over websocket
priv, pub := makeKeyPair(t) priv, pub := makeKeyPair(t)
textNote := Event{ textNote := core.Event{
Kind: KindTextNote, Kind: core.KindTextNote,
Content: "hello", Content: "hello",
CreatedAt: Timestamp(1672068534), // random fixed timestamp CreatedAt: core.Timestamp(1672068534), // random fixed timestamp
Tags: Tags{[]string{"foo", "bar"}}, Tags: core.Tags{[]string{"foo", "bar"}},
PubKey: pub, PubKey: pub,
} }
if err := textNote.Sign(priv); err != nil { if err := textNote.Sign(priv); err != nil {
@ -66,7 +68,7 @@ func TestPublish(t *testing.T) {
func TestPublishBlocked(t *testing.T) { func TestPublishBlocked(t *testing.T) {
// test note to be sent over websocket // test note to be sent over websocket
textNote := Event{Kind: KindTextNote, Content: "hello"} textNote := core.Event{Kind: core.KindTextNote, Content: "hello"}
textNote.ID = textNote.GetID() textNote.ID = textNote.GetID()
// fake relay server // fake relay server
@ -92,7 +94,7 @@ func TestPublishBlocked(t *testing.T) {
func TestPublishWriteFailed(t *testing.T) { func TestPublishWriteFailed(t *testing.T) {
// test note to be sent over websocket // test note to be sent over websocket
textNote := Event{Kind: KindTextNote, Content: "hello"} textNote := core.Event{Kind: core.KindTextNote, Content: "hello"}
textNote.ID = textNote.GetID() textNote.ID = textNote.GetID()
// fake relay server // fake relay server
@ -161,7 +163,7 @@ func TestConnectWithOrigin(t *testing.T) {
defer ws.Close() defer ws.Close()
// relay client // relay client
r := NewRelay(context.Background(), NormalizeURL(ws.URL)) r := NewRelay(context.Background(), utils.NormalizeURL(ws.URL))
r.RequestHeader = http.Header{"origin": {"https://example.com"}} r.RequestHeader = http.Header{"origin": {"https://example.com"}}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
@ -191,8 +193,8 @@ var anyOriginHandshake = func(conf *websocket.Config, r *http.Request) error {
func makeKeyPair(t *testing.T) (priv, pub string) { func makeKeyPair(t *testing.T) (priv, pub string) {
t.Helper() t.Helper()
privkey := GeneratePrivateKey() privkey := core.GeneratePrivateKey()
pubkey, err := GetPublicKey(privkey) pubkey, err := core.GetPublicKey(privkey)
if err != nil { if err != nil {
t.Fatalf("GetPublicKey(%q): %v", privkey, err) t.Fatalf("GetPublicKey(%q): %v", privkey, err)
} }
@ -207,7 +209,7 @@ func mustRelayConnect(url string) *Relay {
return rl return rl
} }
func parseEventMessage(t *testing.T, raw []json.RawMessage) Event { func parseEventMessage(t *testing.T, raw []json.RawMessage) core.Event {
t.Helper() t.Helper()
if len(raw) < 2 { if len(raw) < 2 {
t.Fatalf("len(raw) = %d; want at least 2", len(raw)) t.Fatalf("len(raw) = %d; want at least 2", len(raw))
@ -217,14 +219,14 @@ func parseEventMessage(t *testing.T, raw []json.RawMessage) Event {
if typ != "EVENT" { if typ != "EVENT" {
t.Errorf("typ = %q; want EVENT", typ) t.Errorf("typ = %q; want EVENT", typ)
} }
var event Event var event core.Event
if err := json.Unmarshal(raw[1], &event); err != nil { if err := json.Unmarshal(raw[1], &event); err != nil {
t.Errorf("json.Unmarshal(`%s`): %v", string(raw[1]), err) t.Errorf("json.Unmarshal(`%s`): %v", string(raw[1]), err)
} }
return event return event
} }
func parseSubscriptionMessage(t *testing.T, raw []json.RawMessage) (subid string, filters []Filter) { func parseSubscriptionMessage(t *testing.T, raw []json.RawMessage) (subid string, filters []core.Filter) {
t.Helper() t.Helper()
if len(raw) < 3 { if len(raw) < 3 {
t.Fatalf("len(raw) = %d; want at least 3", len(raw)) t.Fatalf("len(raw) = %d; want at least 3", len(raw))
@ -238,9 +240,9 @@ func parseSubscriptionMessage(t *testing.T, raw []json.RawMessage) (subid string
if err := json.Unmarshal(raw[1], &id); err != nil { if err := json.Unmarshal(raw[1], &id); err != nil {
t.Errorf("json.Unmarshal sub id: %v", err) t.Errorf("json.Unmarshal sub id: %v", err)
} }
var ff []Filter var ff []core.Filter
for i, b := range raw[2:] { for i, b := range raw[2:] {
var f Filter var f core.Filter
if err := json.Unmarshal(b, &f); err != nil { if err := json.Unmarshal(b, &f); err != nil {
t.Errorf("json.Unmarshal filter %d: %v", i, err) t.Errorf("json.Unmarshal filter %d: %v", i, err)
} }

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"context" "context"
@ -6,6 +6,8 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/nbd-wtf/go-nostr/core"
) )
type Subscription struct { type Subscription struct {
@ -13,14 +15,14 @@ type Subscription struct {
counter int counter int
Relay *Relay Relay *Relay
Filters Filters Filters core.Filters
// for this to be treated as a COUNT and not a REQ this must be set // for this to be treated as a COUNT and not a REQ this must be set
countResult chan int64 countResult chan int64
// the Events channel emits all EVENTs that come in a Subscription // the Events channel emits all EVENTs that come in a Subscription
// will be closed when the subscription ends // will be closed when the subscription ends
Events chan *Event Events chan *core.Event
mu sync.Mutex mu sync.Mutex
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
@ -43,7 +45,7 @@ type Subscription struct {
} }
type EventMessage struct { type EventMessage struct {
Event Event Event core.Event
Relay string Relay string
} }
@ -78,7 +80,7 @@ func (sub *Subscription) start() {
sub.mu.Unlock() sub.mu.Unlock()
} }
func (sub *Subscription) dispatchEvent(evt *Event) { func (sub *Subscription) dispatchEvent(evt *core.Event) {
added := false added := false
if !sub.eosed.Load() { if !sub.eosed.Load() {
sub.storedwg.Add(1) sub.storedwg.Add(1)
@ -138,7 +140,7 @@ func (sub *Subscription) Unsub() {
func (sub *Subscription) Close() { func (sub *Subscription) Close() {
if sub.Relay.IsConnected() { if sub.Relay.IsConnected() {
id := sub.GetID() id := sub.GetID()
closeMsg := CloseEnvelope(id) closeMsg := core.CloseEnvelope(id)
closeb, _ := (&closeMsg).MarshalJSON() closeb, _ := (&closeMsg).MarshalJSON()
debugLogf("{%s} sending %v", sub.Relay.URL, closeb) debugLogf("{%s} sending %v", sub.Relay.URL, closeb)
<-sub.Relay.Write(closeb) <-sub.Relay.Write(closeb)
@ -147,7 +149,7 @@ func (sub *Subscription) Close() {
// Sub sets sub.Filters and then calls sub.Fire(ctx). // Sub sets sub.Filters and then calls sub.Fire(ctx).
// The subscription will be closed if the context expires. // The subscription will be closed if the context expires.
func (sub *Subscription) Sub(_ context.Context, filters Filters) { func (sub *Subscription) Sub(_ context.Context, filters core.Filters) {
sub.Filters = filters sub.Filters = filters
sub.Fire() sub.Fire()
} }
@ -158,9 +160,9 @@ func (sub *Subscription) Fire() error {
var reqb []byte var reqb []byte
if sub.countResult == nil { if sub.countResult == nil {
reqb, _ = ReqEnvelope{id, sub.Filters}.MarshalJSON() reqb, _ = core.ReqEnvelope{SubscriptionID: id, Filters: sub.Filters}.MarshalJSON()
} else { } else {
reqb, _ = CountEnvelope{id, sub.Filters, nil}.MarshalJSON() reqb, _ = core.CountEnvelope{SubscriptionID: id, Filters: sub.Filters, Count: nil}.MarshalJSON()
} }
debugLogf("{%s} sending %v", sub.Relay.URL, reqb) debugLogf("{%s} sending %v", sub.Relay.URL, reqb)

View File

@ -1,4 +1,4 @@
package nostr package relays
import ( import (
"context" "context"
@ -6,6 +6,8 @@ import (
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
"github.com/nbd-wtf/go-nostr/core"
) )
const RELAY = "wss://nos.lol" const RELAY = "wss://nos.lol"
@ -14,8 +16,7 @@ const RELAY = "wss://nos.lol"
func TestSubscribeBasic(t *testing.T) { func TestSubscribeBasic(t *testing.T) {
rl := mustRelayConnect(RELAY) rl := mustRelayConnect(RELAY)
defer rl.Close() defer rl.Close()
sub, err := rl.Subscribe(context.Background(), core.Filters{{Kinds: []int{core.KindTextNote}, Limit: 2}})
sub, err := rl.Subscribe(context.Background(), Filters{{Kinds: []int{KindTextNote}, Limit: 2}})
if err != nil { if err != nil {
t.Fatalf("subscription failed: %v", err) t.Fatalf("subscription failed: %v", err)
return return
@ -56,7 +57,7 @@ func TestNestedSubscriptions(t *testing.T) {
n := atomic.Uint32{} n := atomic.Uint32{}
// fetch 2 replies to a note // fetch 2 replies to a note
sub, err := rl.Subscribe(context.Background(), Filters{{Kinds: []int{KindTextNote}, Tags: TagMap{"e": []string{"0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5"}}, Limit: 3}}) sub, err := rl.Subscribe(context.Background(), core.Filters{{Kinds: []int{core.KindTextNote}, Tags: core.TagMap{"e": []string{"0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5"}}, Limit: 3}})
if err != nil { if err != nil {
t.Fatalf("subscription 1 failed: %v", err) t.Fatalf("subscription 1 failed: %v", err)
return return
@ -66,7 +67,7 @@ func TestNestedSubscriptions(t *testing.T) {
select { select {
case event := <-sub.Events: case event := <-sub.Events:
// now fetch author of this // now fetch author of this
sub, err := rl.Subscribe(context.Background(), Filters{{Kinds: []int{KindProfileMetadata}, Authors: []string{event.PubKey}, Limit: 1}}) sub, err := rl.Subscribe(context.Background(), core.Filters{{Kinds: []int{core.KindProfileMetadata}, Authors: []string{event.PubKey}, Limit: 1}})
if err != nil { if err != nil {
t.Fatalf("subscription 2 failed: %v", err) t.Fatalf("subscription 2 failed: %v", err)
return return
@ -76,7 +77,7 @@ func TestNestedSubscriptions(t *testing.T) {
select { select {
case <-sub.Events: case <-sub.Events:
// do another subscription here in "sync" mode, just so we're sure things are not blocking // do another subscription here in "sync" mode, just so we're sure things are not blocking
rl.QuerySync(context.Background(), Filter{Limit: 1}) rl.QuerySync(context.Background(), core.Filter{Limit: 1})
n.Add(1) n.Add(1)
if n.Load() == 3 { if n.Load() == 3 {
@ -90,7 +91,7 @@ func TestNestedSubscriptions(t *testing.T) {
} }
} }
end: end:
fmt.Println("") fmt.Print("")
case <-sub.EndOfStoredEvents: case <-sub.EndOfStoredEvents:
sub.Unsub() sub.Unsub()
return return

View File

@ -1,4 +1,4 @@
package nostr package utils
import ( import (
"net/url" "net/url"

View File

@ -1,4 +1,4 @@
package nostr package utils
import "fmt" import "fmt"

View File

@ -1,4 +1,4 @@
package nostr package utils
import ( import (
"encoding/hex" "encoding/hex"