From 78dbf9def53916a20882c849b7f5371cbb9b0a9f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 20 Mar 2025 19:37:37 -0300 Subject: [PATCH] use our own dataloader and simplify batch replaceable requests, removing bugs. --- go.mod | 3 +- go.sum | 6 +- sdk/addressable_loader.go | 75 ++++------- sdk/dataloader/dataloader.go | 255 +++++++++++++++++++++++++++++++++++ sdk/helpers.go | 37 ----- sdk/list.go | 5 +- sdk/metadata.go | 3 +- sdk/replaceable_loader.go | 98 +++++--------- sdk/set.go | 3 +- sdk/system.go | 2 +- sdk/utils.go | 7 - 11 files changed, 321 insertions(+), 173 deletions(-) create mode 100644 sdk/dataloader/dataloader.go diff --git a/go.mod b/go.mod index 6283595..c06393a 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,8 @@ require ( github.com/dgraph-io/ristretto v1.0.0 github.com/elnosh/gonuts v0.3.1-0.20250123162555-7c0381a585e3 github.com/fiatjaf/eventstore v0.16.2 - github.com/fiatjaf/khatru v0.17.3-0.20250312035319-596bca93c3ff + github.com/fiatjaf/khatru v0.17.4 github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 - github.com/graph-gophers/dataloader/v7 v7.1.0 github.com/jmoiron/sqlx v1.4.0 github.com/json-iterator/go v1.1.12 github.com/mailru/easyjson v0.9.0 diff --git a/go.sum b/go.sum index 6e6b705..eed1f67 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOU github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= github.com/fiatjaf/eventstore v0.16.2 h1:h4rHwSwPcqAKqWUsAbYWUhDeSgm2Kp+PBkJc3FgBYu4= github.com/fiatjaf/eventstore v0.16.2/go.mod h1:0gU8fzYO/bG+NQAVlHtJWOlt3JKKFefh5Xjj2d1dLIs= -github.com/fiatjaf/khatru v0.17.3-0.20250312035319-596bca93c3ff h1:b6LYwWlc8zAW6aoZpXYC3Gx/zkP4XW5amDx0VwyeREs= -github.com/fiatjaf/khatru v0.17.3-0.20250312035319-596bca93c3ff/go.mod h1:dAaXV6QZwuMVYlXQigp/0Uyl/m1nKOhtRssjQYsgMu0= +github.com/fiatjaf/khatru v0.17.4 h1:VzcLUyBKMlP/CAG4iHJbDJmnZgzhbGLKLxJAUuLRogg= +github.com/fiatjaf/khatru v0.17.4/go.mod h1:VYQ7ZNhs3C1+E4gBnx+DtEgU0BrPdrl3XYF3H+mq6fg= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= @@ -143,8 +143,6 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8= github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/graph-gophers/dataloader/v7 v7.1.0 h1:Wn8HGF/q7MNXcvfaBnLEPEFJttVHR8zuEqP1obys/oc= -github.com/graph-gophers/dataloader/v7 v7.1.0/go.mod h1:1bKE0Dm6OUcTB/OAuYVOZctgIz7Q3d0XrYtlIzTgg6Q= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go index 1994247..01fe435 100644 --- a/sdk/addressable_loader.go +++ b/sdk/addressable_loader.go @@ -2,14 +2,11 @@ package sdk import ( "context" - "errors" - "fmt" - "strconv" "sync" "time" - "github.com/graph-gophers/dataloader/v7" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/sdk/dataloader" ) // this is similar to replaceable_loader and reuses logic from that. @@ -33,28 +30,21 @@ func (sys *System) initializeAddressableDataloaders() { func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[string, []*nostr.Event] { return dataloader.NewBatchedLoader( - func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] { - return sys.batchLoadAddressableEvents(kind, pubkeys) + func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[[]*nostr.Event] { + return sys.batchLoadAddressableEvents(ctxs, kind, pubkeys) }, dataloader.WithBatchCapacity[string, []*nostr.Event](30), - dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), - dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}), dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), ) } func (sys *System) batchLoadAddressableEvents( + ctxs []context.Context, kind int, pubkeys []string, -) []*dataloader.Result[[]*nostr.Event] { - ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*6, - errors.New("batch addressable load took too long"), - ) - defer cancel() - +) map[string]dataloader.Result[[]*nostr.Event] { batchSize := len(pubkeys) - results := make([]*dataloader.Result[[]*nostr.Event], batchSize) - keyPositions := make(map[string]int) // { [pubkey]: slice_index } + results := make(map[string]dataloader.Result[[]*nostr.Event], batchSize) relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2)) relayFilterIndex := make(map[string]int, max(3, batchSize*2)) @@ -62,36 +52,16 @@ func (sys *System) batchLoadAddressableEvents( wg.Add(len(pubkeys)) cm := sync.Mutex{} + aggregatedContext, aggregatedCancel := context.WithCancel(context.Background()) + waiting := len(pubkeys) + for i, pubkey := range pubkeys { + ctx := ctxs[i] + // build batched queries for the external relays - keyPositions[pubkey] = i // this is to help us know where to save the result later - go func(i int, pubkey string) { - defer wg.Done() - - // if we're attempting this query with a short key (last 8 characters), stop here - if len(pubkey) != 64 { - results[i] = &dataloader.Result[[]*nostr.Event]{ - Error: fmt.Errorf("won't proceed to query relays with a shortened key (%d)", kind), - } - return - } - - // save attempts here so we don't try the same failed query over and over - if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow { - results[i] = &dataloader.Result[[]*nostr.Event]{ - Error: fmt.Errorf("last attempt failed, waiting more to try again"), - } - return - } - // gather relays we'll use for this pubkey - relays := sys.determineRelaysToQuery(pubkey, kind) - - // by default we will return an error (this will be overwritten when we find an event) - results[i] = &dataloader.Result[[]*nostr.Event]{ - Error: fmt.Errorf("couldn't find a kind %d event anywhere %v", kind, relays), - } + relays := sys.determineRelaysToQuery(ctx, pubkey, kind) cm.Lock() for _, relay := range relays { @@ -116,6 +86,13 @@ func (sys *System) batchLoadAddressableEvents( relayFilter[idx] = dfilter } cm.Unlock() + wg.Done() + + <-ctx.Done() + waiting-- + if waiting == 0 { + aggregatedCancel() + } }(i, pubkey) } @@ -123,7 +100,7 @@ func (sys *System) batchLoadAddressableEvents( wg.Wait() // query all relays with the prepared filters - multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter) + multiSubs := sys.Pool.BatchedSubManyEose(aggregatedContext, relayFilter) nextEvent: for { select { @@ -132,13 +109,10 @@ nextEvent: return results } - // insert this event at the desired position - pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed - - events := results[pos].Data + events := results[ie.PubKey].Data if events == nil { // no events found, so just add this and end - results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{ie.Event}} + results[ie.PubKey] = dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{ie.Event}} continue nextEvent } @@ -158,10 +132,9 @@ nextEvent: } } - // there is no match, so add to the end events = append(events, ie.Event) - results[pos].Data = events - case <-ctx.Done(): + results[ie.PubKey] = dataloader.Result[[]*nostr.Event]{Data: events} + case <-aggregatedContext.Done(): return results } } diff --git a/sdk/dataloader/dataloader.go b/sdk/dataloader/dataloader.go new file mode 100644 index 0000000..10f19b3 --- /dev/null +++ b/sdk/dataloader/dataloader.go @@ -0,0 +1,255 @@ +package dataloader + +import ( + "context" + "errors" + "sync" + "time" +) + +var NoValueError = errors.New("") + +// BatchFunc is a function, which when given a slice of keys (string), returns a map of `results` indexed by keys. +// +// The keys passed to this function are guaranteed to be unique. +type BatchFunc[K comparable, V any] func([]context.Context, []K) map[K]Result[V] + +// Result is the data structure that a BatchFunc returns. +// It contains the resolved data, and any errors that may have occurred while fetching the data. +type Result[V any] struct { + Data V + Error error +} + +// ResultMany is used by the LoadMany method. +// It contains a list of resolved data and a list of errors. +// The lengths of the data list and error list will match, and elements at each index correspond to each other. +type ResultMany[V any] struct { + Data []V + Error []error +} + +// PanicErrorWrapper wraps the error interface. +// This is used to check if the error is a panic error. +// We should not cache panic errors. +type PanicErrorWrapper struct { + panicError error +} + +func (p *PanicErrorWrapper) Error() string { + return p.panicError.Error() +} + +// Loader implements the dataloader.Interface. +type Loader[K comparable, V any] struct { + // the batch function to be used by this loader + batchFn BatchFunc[K, V] + + // the maximum batch size. Set to 0 if you want it to be unbounded. + batchCap int + + // count of queued up items + count int + + // the maximum input queue size. Set to 0 if you want it to be unbounded. + inputCap int + + // the amount of time to wait before triggering a batch + wait time.Duration + + // lock to protect the batching operations + batchLock sync.Mutex + + // current batcher + curBatcher *batcher[K, V] + + // used to close the sleeper of the current batcher + endSleeper chan bool + + // used by tests to prevent logs + silent bool +} + +// type used to on input channel +type batchRequest[K comparable, V any] struct { + ctx context.Context + key K + channel chan Result[V] +} + +// Option allows for configuration of Loader fields. +type Option[K comparable, V any] func(*Loader[K, V]) + +// WithBatchCapacity sets the batch capacity. Default is 0 (unbounded). +func WithBatchCapacity[K comparable, V any](c int) Option[K, V] { + return func(l *Loader[K, V]) { + l.batchCap = c + } +} + +// WithInputCapacity sets the input capacity. Default is 1000. +func WithInputCapacity[K comparable, V any](c int) Option[K, V] { + return func(l *Loader[K, V]) { + l.inputCap = c + } +} + +// WithWait sets the amount of time to wait before triggering a batch. +// Default duration is 16 milliseconds. +func WithWait[K comparable, V any](d time.Duration) Option[K, V] { + return func(l *Loader[K, V]) { + l.wait = d + } +} + +// withSilentLogger turns of log messages. It's used by the tests +func withSilentLogger[K comparable, V any]() Option[K, V] { + return func(l *Loader[K, V]) { + l.silent = true + } +} + +// NewBatchedLoader constructs a new Loader with given options. +func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] { + loader := &Loader[K, V]{ + batchFn: batchFn, + inputCap: 1000, + wait: 16 * time.Millisecond, + } + + // Apply options + for _, apply := range opts { + apply(loader) + } + + return loader +} + +// Load load/resolves the given key, returning a channel that will contain the value and error. +// The first context passed to this function within a given batch window will be provided to +// the registered BatchFunc. +func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) { + c := make(chan Result[V], 1) + + // this is sent to batch fn. It contains the key and the channel to return + // the result on + req := &batchRequest[K, V]{ctx, key, c} + + l.batchLock.Lock() + // start the batch window if it hasn't already started. + if l.curBatcher == nil { + l.curBatcher = l.newBatcher(l.silent) + // start the current batcher batch function + go l.curBatcher.batch() + // start a sleeper for the current batcher + l.endSleeper = make(chan bool) + go l.sleeper(l.curBatcher, l.endSleeper) + } + + l.curBatcher.input <- req + + // if we need to keep track of the count (max batch), then do so. + if l.batchCap > 0 { + l.count++ + // if we hit our limit, force the batch to start + if l.count == l.batchCap { + // end the batcher synchronously here because another call to Load + // may concurrently happen and needs to go to a new batcher. + l.curBatcher.end() + // end the sleeper for the current batcher. + // this is to stop the goroutine without waiting for the + // sleeper timeout. + close(l.endSleeper) + l.reset() + } + } + l.batchLock.Unlock() + + if v, ok := <-c; ok { + return v.Data, v.Error + } + + return value, NoValueError +} + +func (l *Loader[K, V]) reset() { + l.count = 0 + l.curBatcher = nil +} + +type batcher[K comparable, V any] struct { + input chan *batchRequest[K, V] + batchFn BatchFunc[K, V] + finished bool + silent bool +} + +// newBatcher returns a batcher for the current requests +// all the batcher methods must be protected by a global batchLock +func (l *Loader[K, V]) newBatcher(silent bool) *batcher[K, V] { + return &batcher[K, V]{ + input: make(chan *batchRequest[K, V], l.inputCap), + batchFn: l.batchFn, + silent: silent, + } +} + +// stop receiving input and process batch function +func (b *batcher[K, V]) end() { + if !b.finished { + close(b.input) + b.finished = true + } +} + +// execute the batch of all items in queue +func (b *batcher[K, V]) batch() { + var ( + ctxs = make([]context.Context, 0, 30) + keys = make([]K, 0, 30) + reqs = make([]*batchRequest[K, V], 0, 30) + res map[K]Result[V] + ) + + for item := range b.input { + ctxs = append(ctxs, item.ctx) + keys = append(keys, item.key) + reqs = append(reqs, item) + } + + func() { + res = b.batchFn(ctxs, keys) + }() + + for _, req := range reqs { + if r, ok := res[req.key]; ok { + req.channel <- r + } + close(req.channel) + } +} + +// wait the appropriate amount of time for the provided batcher +func (l *Loader[K, V]) sleeper(b *batcher[K, V], close chan bool) { + select { + // used by batch to close early. usually triggered by max batch size + case <-close: + return + // this will move this goroutine to the back of the callstack? + case <-time.After(l.wait): + } + + // reset + // this is protected by the batchLock to avoid closing the batcher input + // channel while Load is inserting a request + l.batchLock.Lock() + b.end() + + // We can end here also if the batcher has already been closed and a + // new one has been created. So reset the loader state only if the batcher + // is the current one + if l.curBatcher == b { + l.reset() + } + l.batchLock.Unlock() +} diff --git a/sdk/helpers.go b/sdk/helpers.go index 91b54ea..9a49e1b 100644 --- a/sdk/helpers.go +++ b/sdk/helpers.go @@ -2,7 +2,6 @@ package sdk import ( "slices" - "time" jsoniter "github.com/json-iterator/go" ) @@ -20,39 +19,3 @@ func appendUnique[I comparable](arr []I, item ...I) []I { } return arr } - -// doThisNotMoreThanOnceAnHour checks if an operation with the given key -// has been performed in the last hour. If not, it returns true and records -// the operation to prevent it from running again within the hour. -func doThisNotMoreThanOnceAnHour(key string) (doItNow bool) { - _dtnmtoahLock.Lock() - defer _dtnmtoahLock.Unlock() - - if _dtnmtoah == nil { - // this runs only once for the lifetime of this library and - // starts a long-running process of checking for expired items - // and deleting them from this map every 10 minutes. - _dtnmtoah = make(map[string]time.Time) - go func() { - for { - time.Sleep(time.Minute * 10) - _dtnmtoahLock.Lock() - now := time.Now() - for k, v := range _dtnmtoah { - if v.Before(now) { - delete(_dtnmtoah, k) - } - } - _dtnmtoahLock.Unlock() - } - }() - } - - _, hasBeenPerformedInTheLastHour := _dtnmtoah[key] - if hasBeenPerformedInTheLastHour { - return false - } - - _dtnmtoah[key] = time.Now() - return true -} diff --git a/sdk/list.go b/sdk/list.go index 95db850..54155e2 100644 --- a/sdk/list.go +++ b/sdk/list.go @@ -74,7 +74,7 @@ func fetchGenericList[I TagItemWithValue]( v = *newV } - // even if we didn't find anything register this because we tried + // register this even if we didn't find anything because we tried // (and we still have the previous event in our local store) sys.KVStore.Set(lastFetchKey, encodeTimestamp(nostr.Now())) } @@ -108,8 +108,7 @@ func tryFetchListFromNetwork[I TagItemWithValue]( replaceableIndex replaceableIndex, parseTag func(nostr.Tag) (I, bool), ) *GenericList[I] { - thunk := sys.replaceableLoaders[replaceableIndex].Load(ctx, pubkey) - evt, err := thunk() + evt, err := sys.replaceableLoaders[replaceableIndex].Load(ctx, pubkey) if err != nil { return nil } diff --git a/sdk/metadata.go b/sdk/metadata.go index fa3569c..3337bc2 100644 --- a/sdk/metadata.go +++ b/sdk/metadata.go @@ -154,8 +154,7 @@ func (sys *System) FetchProfileMetadata(ctx context.Context, pubkey string) (pm } func (sys *System) tryFetchMetadataFromNetwork(ctx context.Context, pubkey string) *ProfileMetadata { - thunk0 := sys.replaceableLoaders[kind_0].Load(ctx, pubkey) - evt, err := thunk0() + evt, err := sys.replaceableLoaders[kind_0].Load(ctx, pubkey) if err != nil { return nil } diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 3fe23b5..8f62697 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -3,20 +3,15 @@ package sdk import ( "context" "errors" - "fmt" "slices" "strconv" "sync" "time" - "github.com/graph-gophers/dataloader/v7" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/sdk/dataloader" ) -// this is used as a hack to signal that these replaceable loader queries shouldn't use the full -// context timespan when they're being made from inside determineRelaysToQuery -var contextForSub10002Query = context.WithValue(context.Background(), "", "") - type replaceableIndex int const ( @@ -54,37 +49,21 @@ func (sys *System) initializeReplaceableDataloaders() { func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] { return dataloader.NewBatchedLoader( - func(ctx context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] { - var cancel context.CancelFunc - - if ctx == contextForSub10002Query { - ctx, cancel = context.WithTimeoutCause(context.Background(), time.Millisecond*2300, - errors.New("fetching relays in subloader took too long"), - ) - } else { - ctx, cancel = context.WithTimeoutCause(context.Background(), time.Second*6, - errors.New("batch replaceable load took too long"), - ) - defer cancel() - } - - return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys) + func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[*nostr.Event] { + return sys.batchLoadReplaceableEvents(ctxs, kind, pubkeys) }, dataloader.WithBatchCapacity[string, *nostr.Event](30), - dataloader.WithClearCacheOnBatch[string, *nostr.Event](), - dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}), dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), ) } func (sys *System) batchLoadReplaceableEvents( - ctx context.Context, + ctxs []context.Context, kind int, pubkeys []string, -) []*dataloader.Result[*nostr.Event] { +) map[string]dataloader.Result[*nostr.Event] { batchSize := len(pubkeys) - results := make([]*dataloader.Result[*nostr.Event], batchSize) - keyPositions := make(map[string]int) // { [pubkey]: slice_index } + results := make(map[string]dataloader.Result[*nostr.Event], batchSize) relayFilter := make([]nostr.DirectedFilter, 0, max(3, batchSize*2)) relayFilterIndex := make(map[string]int, max(3, batchSize*2)) @@ -92,36 +71,16 @@ func (sys *System) batchLoadReplaceableEvents( wg.Add(len(pubkeys)) cm := sync.Mutex{} + aggregatedContext, aggregatedCancel := context.WithCancel(context.Background()) + waiting := len(pubkeys) + for i, pubkey := range pubkeys { + ctx := ctxs[i] + // build batched queries for the external relays - keyPositions[pubkey] = i // this is to help us know where to save the result later - go func(i int, pubkey string) { - defer wg.Done() - - // if we're attempting this query with a short key (last 8 characters), stop here - if len(pubkey) != 64 { - results[i] = &dataloader.Result[*nostr.Event]{ - Error: fmt.Errorf("won't proceed to query relays with a shortened key (%d)", kind), - } - return - } - - // save attempts here so we don't try the same failed query over and over - if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow { - results[i] = &dataloader.Result[*nostr.Event]{ - Error: fmt.Errorf("last attempt failed, waiting more to try again"), - } - return - } - // gather relays we'll use for this pubkey - relays := sys.determineRelaysToQuery(pubkey, kind) - - // by default we will return an error (this will be overwritten when we find an event) - results[i] = &dataloader.Result[*nostr.Event]{ - Error: fmt.Errorf("couldn't find a kind %d event anywhere %v", kind, relays), - } + relays := sys.determineRelaysToQuery(ctx, pubkey, kind) cm.Lock() for _, relay := range relays { @@ -146,12 +105,21 @@ func (sys *System) batchLoadReplaceableEvents( relayFilter[idx] = dfilter } cm.Unlock() + wg.Done() + + <-ctx.Done() + waiting-- + if waiting == 0 { + aggregatedCancel() + } }(i, pubkey) } // query all relays with the prepared filters wg.Wait() - multiSubs := sys.Pool.BatchedSubManyEose(ctx, relayFilter, nostr.WithLabel("repl~"+strconv.Itoa(kind))) + multiSubs := sys.Pool.BatchedSubManyEose(aggregatedContext, relayFilter, + nostr.WithLabel("repl~"+strconv.Itoa(kind)), + ) for { select { case ie, more := <-multiSubs: @@ -160,33 +128,35 @@ func (sys *System) batchLoadReplaceableEvents( } // insert this event at the desired position - pos := keyPositions[ie.PubKey] // @unchecked: it must succeed because it must be a key we passed - if results[pos].Data == nil || results[pos].Data.CreatedAt < ie.CreatedAt { - results[pos] = &dataloader.Result[*nostr.Event]{Data: ie.Event} + if results[ie.PubKey].Data == nil || results[ie.PubKey].Data.CreatedAt < ie.CreatedAt { + results[ie.PubKey] = dataloader.Result[*nostr.Event]{Data: ie.Event} } - case <-ctx.Done(): + case <-aggregatedContext.Done(): return results } } } -func (sys *System) determineRelaysToQuery(pubkey string, kind int) []string { +func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, kind int) []string { var relays []string // search in specific relays for user if kind == 10002 { // prevent infinite loops by jumping directly to this relays = sys.Hints.TopN(pubkey, 3) - if len(relays) == 0 { - relays = []string{"wss://relay.damus.io", "wss://nos.lol"} - } } else { + ctx, cancel := context.WithTimeoutCause(ctx, time.Millisecond*2300, + errors.New("fetching relays in subloader took too long"), + ) + if kind == 0 || kind == 3 { // leave room for two hardcoded relays because people are stupid - relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 1) + relays = sys.FetchOutboxRelays(ctx, pubkey, 1) } else { - relays = sys.FetchOutboxRelays(contextForSub10002Query, pubkey, 3) + relays = sys.FetchOutboxRelays(ctx, pubkey, 3) } + + cancel() } // use a different set of extra relays depending on the kind diff --git a/sdk/set.go b/sdk/set.go index 2d6a98b..904306d 100644 --- a/sdk/set.go +++ b/sdk/set.go @@ -97,8 +97,7 @@ func tryFetchSetsFromNetwork[I TagItemWithValue]( addressableIndex addressableIndex, parseTag func(nostr.Tag) (I, bool), ) *GenericSets[I] { - thunk := sys.addressableLoaders[addressableIndex].Load(ctx, pubkey) - events, err := thunk() + events, err := sys.addressableLoaders[addressableIndex].Load(ctx, pubkey) if err != nil { return nil } diff --git a/sdk/system.go b/sdk/system.go index 5f2183c..b05cabc 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -6,10 +6,10 @@ import ( "github.com/fiatjaf/eventstore" "github.com/fiatjaf/eventstore/nullstore" - "github.com/graph-gophers/dataloader/v7" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/sdk/cache" cache_memory "github.com/nbd-wtf/go-nostr/sdk/cache/memory" + "github.com/nbd-wtf/go-nostr/sdk/dataloader" "github.com/nbd-wtf/go-nostr/sdk/hints" "github.com/nbd-wtf/go-nostr/sdk/hints/memoryh" "github.com/nbd-wtf/go-nostr/sdk/kvstore" diff --git a/sdk/utils.go b/sdk/utils.go index 34388ec..8bb3d05 100644 --- a/sdk/utils.go +++ b/sdk/utils.go @@ -3,14 +3,7 @@ package sdk import ( "math" "strings" - "sync" "testing" - "time" -) - -var ( - _dtnmtoah map[string]time.Time = make(map[string]time.Time) - _dtnmtoahLock sync.Mutex ) // IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations.