follow list fetching test and related changes and fixes.

- make BatchedSubManyEose() use a single duplicate id index and use it for replaceable loaders;
- fixes parsing follow entry from kind:3 events (and others);
- adds a "cause" to most cancelation errors in relay/pool;
- remove the inherent cache from dataloader (we have our own hopefully);
- increase max frame size we can read from any websocket to 2**18 (262k), which gives over 2000 item lists.
This commit is contained in:
fiatjaf
2025-01-17 13:44:50 -03:00
parent adb97d46a7
commit 06a15fdaab
9 changed files with 240 additions and 178 deletions

View File

@@ -20,6 +20,8 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header, t
return nil, err return nil, err
} }
c.SetReadLimit(262144) // this should be enough for contact lists of over 2000 people
return &Connection{ return &Connection{
conn: c, conn: c,
}, nil }, nil

View File

@@ -2,6 +2,7 @@ package keyer
import ( import (
"context" "context"
"errors"
"time" "time"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@@ -18,7 +19,7 @@ func NewBunkerSignerFromBunkerClient(bc *nip46.BunkerClient) BunkerSigner {
} }
func (bs BunkerSigner) GetPublicKey(ctx context.Context) (string, error) { func (bs BunkerSigner) GetPublicKey(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*30, errors.New("get_public_key took too long"))
defer cancel() defer cancel()
pk, err := bs.bunker.GetPublicKey(ctx) pk, err := bs.bunker.GetPublicKey(ctx)
if err != nil { if err != nil {
@@ -28,7 +29,7 @@ func (bs BunkerSigner) GetPublicKey(ctx context.Context) (string, error) {
} }
func (bs BunkerSigner) SignEvent(ctx context.Context, evt *nostr.Event) error { func (bs BunkerSigner) SignEvent(ctx context.Context, evt *nostr.Event) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*30, errors.New("sign_event took too long"))
defer cancel() defer cancel()
return bs.bunker.SignEvent(ctx, evt) return bs.bunker.SignEvent(ctx, evt)
} }

102
pool.go
View File

@@ -2,6 +2,7 @@ package nostr
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log" "log"
"math" "math"
@@ -24,7 +25,7 @@ type SimplePool struct {
Context context.Context Context context.Context
authHandler func(context.Context, RelayEvent) error authHandler func(context.Context, RelayEvent) error
cancel context.CancelFunc cancel context.CancelCauseFunc
eventMiddleware func(RelayEvent) eventMiddleware func(RelayEvent)
duplicateMiddleware func(relay string, id string) duplicateMiddleware func(relay string, id string)
@@ -36,8 +37,8 @@ type SimplePool struct {
relayOptions []RelayOption relayOptions []RelayOption
} }
type DirectedFilters struct { type DirectedFilter struct {
Filters Filter
Relay string Relay string
} }
@@ -55,7 +56,7 @@ type PoolOption interface {
} }
func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool { func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancelCause(ctx)
pool := &SimplePool{ pool := &SimplePool{
Relays: xsync.NewMapOf[string, *Relay](), Relays: xsync.NewMapOf[string, *Relay](),
@@ -177,7 +178,11 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
// try to connect // try to connect
// we use this ctx here so when the pool dies everything dies // we use this ctx here so when the pool dies everything dies
ctx, cancel := context.WithTimeout(pool.Context, time.Second*15) ctx, cancel := context.WithTimeoutCause(
pool.Context,
time.Second*15,
errors.New("connecting to the relay took too long"),
)
defer cancel() defer cancel()
relay = NewRelay(context.Background(), url, pool.relayOptions...) relay = NewRelay(context.Background(), url, pool.relayOptions...)
@@ -379,17 +384,36 @@ func (pool *SimplePool) SubManyEose(
filters Filters, filters Filters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) seenAlready := xsync.NewMapOf[string, bool]()
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, WithCheckDuplicate(func(id, relay string) bool {
_, exists := seenAlready.Load(id)
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}), seenAlready, opts...)
}
func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
ctx context.Context,
urls []string,
filters Filters,
wcd WithCheckDuplicate,
seenAlready *xsync.MapOf[string, bool],
opts ...SubscriptionOption,
) chan RelayEvent {
ctx, cancel := context.WithCancelCause(ctx)
events := make(chan RelayEvent) events := make(chan RelayEvent)
seenAlready := xsync.NewMapOf[string, bool]()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(urls)) wg.Add(len(urls))
opts = append(opts, wcd)
go func() { go func() {
// this will happen when all subscriptions get an eose (or when they die) // this will happen when all subscriptions get an eose (or when they die)
wg.Wait() wg.Wait()
cancel() cancel(errors.New("all subscriptions ended"))
close(events) close(events)
}() }()
@@ -411,19 +435,14 @@ func (pool *SimplePool) SubManyEose(
relay, err := pool.EnsureRelay(nm) relay, err := pool.EnsureRelay(nm)
if err != nil { if err != nil {
debugLogf("error connecting to %s with %v: %s", relay, filters, err)
return return
} }
hasAuthed := false hasAuthed := false
subscribe: subscribe:
sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool { sub, err := relay.Subscribe(ctx, filters, opts...)
_, exists := seenAlready.Load(id)
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}))...)
if err != nil { if err != nil {
debugLogf("error subscribing to %s with %v: %s", relay, filters, err) debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
return return
@@ -508,47 +527,52 @@ func (pool *SimplePool) CountMany(
// QuerySingle returns the first event returned by the first relay, cancels everything else. // QuerySingle returns the first event returned by the first relay, cancels everything else.
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent { func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancelCause(ctx)
defer cancel()
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) {
cancel(errors.New("got the first event and ended successfully"))
return &ievt return &ievt
} }
cancel(errors.New("SubManyEose() didn't get yield events"))
return nil return nil
} }
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) BatchedSubManyEose(
ctx context.Context, ctx context.Context,
dfs []DirectedFilters, dfs []DirectedFilter,
subFn func(context.Context, []string, Filters, ...SubscriptionOption) chan RelayEvent, opts ...SubscriptionOption,
opts []SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
res := make(chan RelayEvent) res := make(chan RelayEvent)
wg := sync.WaitGroup{}
wg.Add(len(dfs))
seenAlready := xsync.NewMapOf[string, bool]()
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilters) { go func(df DirectedFilter) {
for ie := range subFn(ctx, []string{df.Relay}, df.Filters, opts...) { for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx,
[]string{df.Relay},
Filters{df.Filter},
WithCheckDuplicate(func(id, relay string) bool {
_, exists := seenAlready.Load(id)
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}), seenAlready, opts...) {
res <- ie res <- ie
} }
wg.Done()
}(df) }(df)
} }
go func() {
wg.Wait()
close(res)
}()
return res return res
} }
// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. func (pool *SimplePool) Close(reason string) {
func (pool *SimplePool) BatchedSubMany( pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))
ctx context.Context,
dfs []DirectedFilters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.SubMany, opts)
}
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(
ctx context.Context,
dfs []DirectedFilters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.SubManyEose, opts)
} }

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
@@ -30,7 +31,7 @@ type Relay struct {
ConnectionError error ConnectionError error
connectionContext context.Context // will be canceled when the connection closes connectionContext context.Context // will be canceled when the connection closes
connectionContextCancel context.CancelFunc connectionContextCancel context.CancelCauseFunc
challenge string // NIP-42 challenge, we only keep the last challenge string // NIP-42 challenge, we only keep the last
noticeHandler func(string) // NIP-01 NOTICEs noticeHandler func(string) // NIP-01 NOTICEs
@@ -51,7 +52,7 @@ type writeRequest struct {
// NewRelay returns a new relay. The relay connection will be closed when the context is canceled. // NewRelay returns a new relay. The relay connection will be closed when the context is canceled.
func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancelCause(ctx)
r := &Relay{ r := &Relay{
URL: NormalizeURL(url), URL: NormalizeURL(url),
connectionContext: ctx, connectionContext: ctx,
@@ -150,7 +151,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
if _, ok := ctx.Deadline(); !ok { if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds // if no timeout is set, force it to 7 seconds
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second) ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("connection took too long"))
defer cancel() defer cancel()
} }
@@ -175,7 +176,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
// close all subscriptions // close all subscriptions
for _, sub := range r.Subscriptions.Range { for _, sub := range r.Subscriptions.Range {
sub.Unsub() sub.unsub(fmt.Errorf("relay connection closed: %w / %w", context.Cause(r.connectionContext), r.ConnectionError))
} }
}() }()
@@ -214,7 +215,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
buf.Reset() buf.Reset()
if err := conn.ReadMessage(r.connectionContext, buf); err != nil { if err := conn.ReadMessage(r.connectionContext, buf); err != nil {
r.ConnectionError = err r.ConnectionError = err
r.Close() r.close(err)
break break
} }
@@ -407,7 +408,7 @@ func (r *Relay) Subscribe(ctx context.Context, filters Filters, opts ...Subscrip
// Failure to do that will result in a huge number of halted goroutines being created. // Failure to do that will result in a huge number of halted goroutines being created.
func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts ...SubscriptionOption) *Subscription { func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts ...SubscriptionOption) *Subscription {
current := subscriptionIDCounter.Add(1) current := subscriptionIDCounter.Add(1)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancelCause(ctx)
sub := &Subscription{ sub := &Subscription{
Relay: r, Relay: r,
@@ -431,7 +432,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
} }
} }
// subscription id calculation // subscription id computation
buf := subIdPool.Get().([]byte)[:0] buf := subIdPool.Get().([]byte)[:0]
buf = strconv.AppendInt(buf, sub.counter, 10) buf = strconv.AppendInt(buf, sub.counter, 10)
buf = append(buf, ':') buf = append(buf, ':')
@@ -462,7 +463,7 @@ func (r *Relay) QueryEvents(ctx context.Context, filter Filter) (chan *Event, er
case <-ctx.Done(): case <-ctx.Done():
case <-r.Context().Done(): case <-r.Context().Done():
} }
sub.Unsub() sub.unsub(errors.New("QueryEvents() ended"))
return return
} }
}() }()
@@ -474,7 +475,7 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error)
if _, ok := ctx.Deadline(); !ok { if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds // if no timeout is set, force it to 7 seconds
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second) ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("QuerySync() took too long"))
defer cancel() defer cancel()
} }
@@ -512,12 +513,12 @@ func (r *Relay) countInternal(ctx context.Context, filters Filters, opts ...Subs
return CountEnvelope{}, err return CountEnvelope{}, err
} }
defer sub.Unsub() defer sub.unsub(errors.New("countInternal() ended"))
if _, ok := ctx.Deadline(); !ok { if _, ok := ctx.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds // if no timeout is set, force it to 7 seconds
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 7*time.Second) ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, errors.New("countInternal took too long"))
defer cancel() defer cancel()
} }
@@ -532,13 +533,17 @@ func (r *Relay) countInternal(ctx context.Context, filters Filters, opts ...Subs
} }
func (r *Relay) Close() error { func (r *Relay) Close() error {
return r.close(errors.New("Close() called"))
}
func (r *Relay) close(reason error) error {
r.closeMutex.Lock() r.closeMutex.Lock()
defer r.closeMutex.Unlock() defer r.closeMutex.Unlock()
if r.connectionContextCancel == nil { if r.connectionContextCancel == nil {
return fmt.Errorf("relay already closed") return fmt.Errorf("relay already closed")
} }
r.connectionContextCancel() r.connectionContextCancel(reason)
r.connectionContextCancel = nil r.connectionContextCancel = nil
if r.Connection == nil { if r.Connection == nil {

View File

@@ -2,6 +2,7 @@ package sdk
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strconv" "strconv"
"sync" "sync"
@@ -37,6 +38,7 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri
}, },
dataloader.WithBatchCapacity[string, []*nostr.Event](60), dataloader.WithBatchCapacity[string, []*nostr.Event](60),
dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), dataloader.WithClearCacheOnBatch[string, []*nostr.Event](),
dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}),
dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350),
) )
} }
@@ -45,13 +47,16 @@ func (sys *System) batchLoadAddressableEvents(
kind int, kind int,
pubkeys []string, pubkeys []string,
) []*dataloader.Result[[]*nostr.Event] { ) []*dataloader.Result[[]*nostr.Event] {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*6,
errors.New("batch addressable load took too long"),
)
defer cancel() defer cancel()
batchSize := len(pubkeys) batchSize := len(pubkeys)
results := make([]*dataloader.Result[[]*nostr.Event], batchSize) results := make([]*dataloader.Result[[]*nostr.Event], batchSize)
keyPositions := make(map[string]int) // { [pubkey]: slice_index } keyPositions := make(map[string]int) // { [pubkey]: slice_index }
relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter } relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2))
relayFilterIndex := make(map[string]int, max(3, batchSize*2))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(pubkeys)) wg.Add(len(pubkeys))
@@ -91,49 +96,60 @@ func (sys *System) batchLoadAddressableEvents(
cm.Lock() cm.Lock()
for _, relay := range relays { for _, relay := range relays {
// each relay will have a custom filter // each relay will have a custom filter
filter, ok := relayFilters[relay] idx, ok := relayFilterIndex[relay]
if !ok { var dfilter nostr.DirectedFilter
filter = nostr.Filter{ if ok {
Kinds: []int{kind}, dfilter = relayFilter[idx]
Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), } else {
dfilter = nostr.DirectedFilter{
Relay: relay,
Filter: nostr.Filter{
Kinds: []int{kind},
Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */),
},
} }
idx = len(relayFilter)
relayFilterIndex[relay] = idx
relayFilter = append(relayFilter, dfilter)
} }
filter.Authors = append(filter.Authors, pubkey) dfilter.Authors = append(dfilter.Authors, pubkey)
relayFilters[relay] = filter relayFilter[idx] = dfilter
} }
cm.Unlock() cm.Unlock()
}(i, pubkey) }(i, pubkey)
} }
// query all relays with the prepared filters // wait for relay batches to be prepared
wg.Wait() wg.Wait()
multiSubs := sys.batchAddressableRelayQueries(ctx, relayFilters)
// query all relays with the prepared filters
multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter)
nextEvent: nextEvent:
for { for {
select { select {
case evt, more := <-multiSubs: case ie, more := <-multiSubs:
if !more { if !more {
return results return results
} }
// insert this event at the desired position // insert this event at the desired position
pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed
events := results[pos].Data events := results[pos].Data
if events == nil { if events == nil {
// no events found, so just add this and end // no events found, so just add this and end
results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{evt}} results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{ie.Event}}
continue nextEvent continue nextEvent
} }
// there are events, so look for a match // there are events, so look for a match
d := evt.Tags.GetD() d := ie.Tags.GetD()
for i, event := range events { for i, event := range events {
if event.Tags.GetD() == d { if event.Tags.GetD() == d {
// there is a match // there is a match
if event.CreatedAt < evt.CreatedAt { if event.CreatedAt < ie.CreatedAt {
// ...and this one is newer, so replace // ...and this one is newer, so replace
events[i] = evt events[i] = ie.Event
} else { } else {
// ... but this one is older, so ignore // ... but this one is older, so ignore
} }
@@ -143,42 +159,10 @@ nextEvent:
} }
// there is no match, so add to the end // there is no match, so add to the end
events = append(events, evt) events = append(events, ie.Event)
results[pos].Data = events results[pos].Data = events
case <-ctx.Done(): case <-ctx.Done():
return results return results
} }
} }
} }
// batchAddressableRelayQueries is like batchReplaceableRelayQueries, except it doesn't count results to
// try to exit early.
func (sys *System) batchAddressableRelayQueries(
ctx context.Context,
relayFilters map[string]nostr.Filter,
) <-chan *nostr.Event {
all := make(chan *nostr.Event)
wg := sync.WaitGroup{}
wg.Add(len(relayFilters))
for url, filter := range relayFilters {
go func(url string, filter nostr.Filter) {
defer wg.Done()
n := len(filter.Authors)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*450+time.Millisecond*50*time.Duration(n))
defer cancel()
for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("addr")) {
all <- ie.Event
}
}(url, filter)
}
go func() {
wg.Wait()
close(all)
}()
return all
}

View File

@@ -48,11 +48,11 @@ func parseProfileRef(tag nostr.Tag) (fw ProfileRef, ok bool) {
if _, err := url.Parse(tag[2]); err == nil { if _, err := url.Parse(tag[2]); err == nil {
fw.Relay = nostr.NormalizeURL(tag[2]) fw.Relay = nostr.NormalizeURL(tag[2])
} }
if len(tag) > 3 { if len(tag) > 3 {
fw.Petname = strings.TrimSpace(tag[3]) fw.Petname = strings.TrimSpace(tag[3])
} }
return fw, true
} }
return fw, false return fw, true
} }

View File

@@ -2,6 +2,7 @@ package sdk
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"slices" "slices"
"strconv" "strconv"
@@ -12,6 +13,10 @@ import (
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
) )
// this is used as a hack to signal that these replaceable loader queries shouldn't use the full
// context timespan when they're being made from inside determineRelaysToQuery
var contextForSub10002Query = context.WithValue(context.Background(), "", "")
type replaceableIndex int type replaceableIndex int
const ( const (
@@ -49,26 +54,39 @@ func (sys *System) initializeReplaceableDataloaders() {
func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] { func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] {
return dataloader.NewBatchedLoader( return dataloader.NewBatchedLoader(
func(_ context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] { func(ctx context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] {
return sys.batchLoadReplaceableEvents(kind, pubkeys) var cancel context.CancelFunc
if ctx == contextForSub10002Query {
ctx, cancel = context.WithTimeoutCause(context.Background(), time.Millisecond*2300,
errors.New("fetching relays in subloader took too long"),
)
} else {
ctx, cancel = context.WithTimeoutCause(context.Background(), time.Second*6,
errors.New("batch replaceable load took too long"),
)
defer cancel()
}
return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys)
}, },
dataloader.WithBatchCapacity[string, *nostr.Event](60), dataloader.WithBatchCapacity[string, *nostr.Event](60),
dataloader.WithClearCacheOnBatch[string, *nostr.Event](), dataloader.WithClearCacheOnBatch[string, *nostr.Event](),
dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}),
dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), dataloader.WithWait[string, *nostr.Event](time.Millisecond*350),
) )
} }
func (sys *System) batchLoadReplaceableEvents( func (sys *System) batchLoadReplaceableEvents(
ctx context.Context,
kind int, kind int,
pubkeys []string, pubkeys []string,
) []*dataloader.Result[*nostr.Event] { ) []*dataloader.Result[*nostr.Event] {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
batchSize := len(pubkeys) batchSize := len(pubkeys)
results := make([]*dataloader.Result[*nostr.Event], batchSize) results := make([]*dataloader.Result[*nostr.Event], batchSize)
keyPositions := make(map[string]int) // { [pubkey]: slice_index } keyPositions := make(map[string]int) // { [pubkey]: slice_index }
relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter } relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2))
relayFilterIndex := make(map[string]int, max(3, batchSize*2))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(pubkeys)) wg.Add(len(pubkeys))
@@ -108,15 +126,24 @@ func (sys *System) batchLoadReplaceableEvents(
cm.Lock() cm.Lock()
for _, relay := range relays { for _, relay := range relays {
// each relay will have a custom filter // each relay will have a custom filter
filter, ok := relayFilters[relay] idx, ok := relayFilterIndex[relay]
if !ok { var dfilter nostr.DirectedFilter
filter = nostr.Filter{ if ok {
Kinds: []int{kind}, dfilter = relayFilter[idx]
Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), } else {
dfilter = nostr.DirectedFilter{
Relay: relay,
Filter: nostr.Filter{
Kinds: []int{kind},
Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */),
},
} }
idx = len(relayFilter)
relayFilterIndex[relay] = idx
relayFilter = append(relayFilter, dfilter)
} }
filter.Authors = append(filter.Authors, pubkey) dfilter.Authors = append(dfilter.Authors, pubkey)
relayFilters[relay] = filter relayFilter[idx] = dfilter
} }
cm.Unlock() cm.Unlock()
}(i, pubkey) }(i, pubkey)
@@ -124,18 +151,18 @@ func (sys *System) batchLoadReplaceableEvents(
// query all relays with the prepared filters // query all relays with the prepared filters
wg.Wait() wg.Wait()
multiSubs := sys.batchReplaceableRelayQueries(ctx, relayFilters) multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter, nostr.WithLabel("repl~"+strconv.Itoa(kind)))
for { for {
select { select {
case evt, more := <-multiSubs: case ie, more := <-multiSubs:
if !more { if !more {
return results return results
} }
// insert this event at the desired position // insert this event at the desired position
pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed
if results[pos].Data == nil || results[pos].Data.CreatedAt < evt.CreatedAt { if results[pos].Data == nil || results[pos].Data.CreatedAt < ie.CreatedAt {
results[pos] = &dataloader.Result[*nostr.Event]{Data: evt} results[pos] = &dataloader.Result[*nostr.Event]{Data: ie.Event}
} }
case <-ctx.Done(): case <-ctx.Done():
return results return results
@@ -153,11 +180,13 @@ func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, ki
if len(relays) == 0 { if len(relays) == 0 {
relays = []string{"wss://relay.damus.io", "wss://nos.lol"} relays = []string{"wss://relay.damus.io", "wss://nos.lol"}
} }
} else if kind == 0 || kind == 3 {
// leave room for two hardcoded relays because people are stupid
relays = sys.FetchOutboxRelays(ctx, pubkey, 1)
} else { } else {
relays = sys.FetchOutboxRelays(ctx, pubkey, 3) if kind == 0 || kind == 3 {
// leave room for two hardcoded relays because people are stupid
relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 1)
} else {
relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 3)
}
} }
// use a different set of extra relays depending on the kind // use a different set of extra relays depending on the kind
@@ -182,45 +211,3 @@ func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, ki
return relays return relays
} }
// batchReplaceableRelayQueries subscribes to multiple relays using a different filter for each and returns
// a single channel with all results. it closes on EOSE or when all the expected events were returned.
//
// the number of expected events is given by the number of pubkeys in the .Authors filter field.
// because of that, batchReplaceableRelayQueries is only suitable for querying replaceable events -- and
// care must be taken to not include the same pubkey more than once in the filter .Authors array.
func (sys *System) batchReplaceableRelayQueries(
ctx context.Context,
relayFilters map[string]nostr.Filter,
) <-chan *nostr.Event {
all := make(chan *nostr.Event)
wg := sync.WaitGroup{}
wg.Add(len(relayFilters))
for url, filter := range relayFilters {
go func(url string, filter nostr.Filter) {
defer wg.Done()
n := len(filter.Authors)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*950+time.Millisecond*50*time.Duration(n))
defer cancel()
received := 0
for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("repl")) {
all <- ie.Event
received++
if received >= n {
// we got all events we asked for, unless the relay is shitty and sent us two from the same
return
}
}
}(url, filter)
}
go func() {
wg.Wait()
close(all)
}()
return all
}

View File

@@ -2,13 +2,15 @@ package sdk
import ( import (
"context" "context"
"fmt"
"strings"
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestSystemFiatjaf(t *testing.T) { func TestMetadataAndEvents(t *testing.T) {
sys := NewSystem() sys := NewSystem()
ctx := context.Background() ctx := context.Background()
@@ -33,3 +35,53 @@ func TestSystemFiatjaf(t *testing.T) {
require.NotEmpty(t, events[meta.PubKey]) require.NotEmpty(t, events[meta.PubKey])
require.GreaterOrEqual(t, len(events[meta.PubKey]), 5) require.GreaterOrEqual(t, len(events[meta.PubKey]), 5)
} }
func TestFollowListRecursion(t *testing.T) {
sys := NewSystem()
ctx := context.Background()
// fetch initial follow list
followList := sys.FetchFollowList(ctx, "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d")
fmt.Println("~", len(followList.Items))
require.Greater(t, len(followList.Items), 400, "should follow more than 400 accounts")
// fetch metadata and follow lists for each followed account concurrently
type result struct {
pubkey string
followList GenericList[ProfileRef]
metadata ProfileMetadata
}
results := make(chan result)
go func() {
for _, item := range followList.Items {
go func() {
fl := sys.FetchFollowList(ctx, item.Pubkey)
meta := sys.FetchProfileMetadata(ctx, item.Pubkey)
fmt.Println(" ~", item.Pubkey, meta.Name, len(fl.Items))
results <- result{item.Pubkey, fl, meta}
}()
}
}()
// collect results
var validAccounts int
var accountsWithManyFollows int
for i := 0; i < len(followList.Items); i++ {
r := <-results
// skip if metadata has "bot" in name
if strings.Contains(strings.ToLower(r.metadata.Name), "bot") {
continue
}
validAccounts++
if len(r.followList.Items) > 20 {
accountsWithManyFollows++
}
}
// check if at least 90% of non-bot accounts follow more than 20 accounts
ratio := float64(accountsWithManyFollows) / float64(validAccounts)
require.Greater(t, ratio, 0.9, "at least 90%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100)
}

View File

@@ -2,6 +2,7 @@ package nostr
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -38,7 +39,7 @@ type Subscription struct {
match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
live atomic.Bool live atomic.Bool
eosed atomic.Bool eosed atomic.Bool
cancel context.CancelFunc cancel context.CancelCauseFunc
// this keeps track of the events we've received before the EOSE that we must dispatch before // this keeps track of the events we've received before the EOSE that we must dispatch before
// closing the EndOfStoredEvents channel // closing the EndOfStoredEvents channel
@@ -74,8 +75,9 @@ var (
func (sub *Subscription) start() { func (sub *Subscription) start() {
<-sub.Context.Done() <-sub.Context.Done()
// the subscription ends once the context is canceled (if not already) // the subscription ends once the context is canceled (if not already)
sub.Unsub() // this will set sub.live to false sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
// do this so we don't have the possibility of closing the Events channel and then trying to send to it // do this so we don't have the possibility of closing the Events channel and then trying to send to it
sub.mu.Lock() sub.mu.Lock()
@@ -123,15 +125,19 @@ func (sub *Subscription) handleClosed(reason string) {
go func() { go func() {
sub.ClosedReason <- reason sub.ClosedReason <- reason
sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
sub.Unsub() sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
}() }()
} }
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
// Unsub() also closes the channel sub.Events and makes a new one. // Unsub() also closes the channel sub.Events and makes a new one.
func (sub *Subscription) Unsub() { func (sub *Subscription) Unsub() {
sub.unsub(errors.New("Unsub() called"))
}
func (sub *Subscription) unsub(err error) {
// cancel the context (if it's not canceled already) // cancel the context (if it's not canceled already)
sub.cancel() sub.cancel(err)
// mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
if sub.live.CompareAndSwap(true, false) { if sub.live.CompareAndSwap(true, false) {
@@ -169,8 +175,9 @@ func (sub *Subscription) Fire() error {
sub.live.Store(true) sub.live.Store(true)
if err := <-sub.Relay.Write(reqb); err != nil { if err := <-sub.Relay.Write(reqb); err != nil {
sub.cancel() err := fmt.Errorf("failed to write: %w", err)
return fmt.Errorf("failed to write: %w", err) sub.cancel(err)
return err
} }
return nil return nil