sdk: simplified dataloader event more. should be faster.

This commit is contained in:
fiatjaf
2025-03-20 20:54:51 -03:00
parent 78dbf9def5
commit 3ebfc7812b
3 changed files with 75 additions and 161 deletions

View File

@@ -33,8 +33,10 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri
func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[[]*nostr.Event] { func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[[]*nostr.Event] {
return sys.batchLoadAddressableEvents(ctxs, kind, pubkeys) return sys.batchLoadAddressableEvents(ctxs, kind, pubkeys)
}, },
dataloader.WithBatchCapacity[string, []*nostr.Event](30), dataloader.Options{
dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), Wait: time.Millisecond * 110,
MaxThreshold: 30,
},
) )
} }
@@ -56,7 +58,8 @@ func (sys *System) batchLoadAddressableEvents(
waiting := len(pubkeys) waiting := len(pubkeys)
for i, pubkey := range pubkeys { for i, pubkey := range pubkeys {
ctx := ctxs[i] ctx, cancel := context.WithCancel(ctxs[i])
defer cancel()
// build batched queries for the external relays // build batched queries for the external relays
go func(i int, pubkey string) { go func(i int, pubkey string) {

View File

@@ -21,38 +21,16 @@ type Result[V any] struct {
Error error Error error
} }
// ResultMany is used by the LoadMany method.
// It contains a list of resolved data and a list of errors.
// The lengths of the data list and error list will match, and elements at each index correspond to each other.
type ResultMany[V any] struct {
Data []V
Error []error
}
// PanicErrorWrapper wraps the error interface.
// This is used to check if the error is a panic error.
// We should not cache panic errors.
type PanicErrorWrapper struct {
panicError error
}
func (p *PanicErrorWrapper) Error() string {
return p.panicError.Error()
}
// Loader implements the dataloader.Interface. // Loader implements the dataloader.Interface.
type Loader[K comparable, V any] struct { type Loader[K comparable, V any] struct {
// the batch function to be used by this loader // the batch function to be used by this loader
batchFn BatchFunc[K, V] batchFn BatchFunc[K, V]
// the maximum batch size. Set to 0 if you want it to be unbounded. // the maximum batch size. Set to 0 if you want it to be unbounded.
batchCap int batchCap uint
// count of queued up items // count of queued up items
count int count uint
// the maximum input queue size. Set to 0 if you want it to be unbounded.
inputCap int
// the amount of time to wait before triggering a batch // the amount of time to wait before triggering a batch
wait time.Duration wait time.Duration
@@ -64,10 +42,7 @@ type Loader[K comparable, V any] struct {
curBatcher *batcher[K, V] curBatcher *batcher[K, V]
// used to close the sleeper of the current batcher // used to close the sleeper of the current batcher
endSleeper chan bool thresholdReached chan bool
// used by tests to prevent logs
silent bool
} }
// type used to on input channel // type used to on input channel
@@ -77,49 +52,18 @@ type batchRequest[K comparable, V any] struct {
channel chan Result[V] channel chan Result[V]
} }
// Option allows for configuration of Loader fields. type Options struct {
type Option[K comparable, V any] func(*Loader[K, V]) Wait time.Duration
MaxThreshold uint
// WithBatchCapacity sets the batch capacity. Default is 0 (unbounded).
func WithBatchCapacity[K comparable, V any](c int) Option[K, V] {
return func(l *Loader[K, V]) {
l.batchCap = c
}
}
// WithInputCapacity sets the input capacity. Default is 1000.
func WithInputCapacity[K comparable, V any](c int) Option[K, V] {
return func(l *Loader[K, V]) {
l.inputCap = c
}
}
// WithWait sets the amount of time to wait before triggering a batch.
// Default duration is 16 milliseconds.
func WithWait[K comparable, V any](d time.Duration) Option[K, V] {
return func(l *Loader[K, V]) {
l.wait = d
}
}
// withSilentLogger turns of log messages. It's used by the tests
func withSilentLogger[K comparable, V any]() Option[K, V] {
return func(l *Loader[K, V]) {
l.silent = true
}
} }
// NewBatchedLoader constructs a new Loader with given options. // NewBatchedLoader constructs a new Loader with given options.
func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] { func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts Options) *Loader[K, V] {
loader := &Loader[K, V]{ loader := &Loader[K, V]{
batchFn: batchFn, batchFn: batchFn,
inputCap: 1000, batchCap: opts.MaxThreshold,
wait: 16 * time.Millisecond, count: 0,
} wait: opts.Wait,
// Apply options
for _, apply := range opts {
apply(loader)
} }
return loader return loader
@@ -133,36 +77,64 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) {
// this is sent to batch fn. It contains the key and the channel to return // this is sent to batch fn. It contains the key and the channel to return
// the result on // the result on
req := &batchRequest[K, V]{ctx, key, c} req := batchRequest[K, V]{ctx, key, c}
l.batchLock.Lock() l.batchLock.Lock()
// start the batch window if it hasn't already started. // start the batch window if it hasn't already started.
if l.curBatcher == nil { if l.curBatcher == nil {
l.curBatcher = l.newBatcher(l.silent) l.curBatcher = l.newBatcher()
// start the current batcher batch function
go l.curBatcher.batch()
// start a sleeper for the current batcher // start a sleeper for the current batcher
l.endSleeper = make(chan bool) l.thresholdReached = make(chan bool)
go l.sleeper(l.curBatcher, l.endSleeper)
// we will run the batch function either after some time or after a threshold has been reached
b := l.curBatcher
go func() {
select {
case <-l.thresholdReached:
case <-time.After(l.wait):
}
// We can end here also if the batcher has already been closed and a
// new one has been created. So reset the loader state only if the batcher
// is the current one
if l.curBatcher == b {
l.reset()
}
var (
ctxs = make([]context.Context, 0, len(b.requests))
keys = make([]K, 0, len(b.requests))
res map[K]Result[V]
)
for _, item := range b.requests {
ctxs = append(ctxs, item.ctx)
keys = append(keys, item.key)
}
res = l.batchFn(ctxs, keys)
for _, req := range b.requests {
if r, ok := res[req.key]; ok {
req.channel <- r
}
close(req.channel)
}
}()
} }
l.curBatcher.input <- req l.curBatcher.requests = append(l.curBatcher.requests, req)
// if we need to keep track of the count (max batch), then do so. l.count++
if l.batchCap > 0 { if l.count == l.batchCap {
l.count++ close(l.thresholdReached)
// if we hit our limit, force the batch to start
if l.count == l.batchCap { // end the batcher synchronously here because another call to Load
// end the batcher synchronously here because another call to Load // may concurrently happen and needs to go to a new batcher.
// may concurrently happen and needs to go to a new batcher. l.reset()
l.curBatcher.end()
// end the sleeper for the current batcher.
// this is to stop the goroutine without waiting for the
// sleeper timeout.
close(l.endSleeper)
l.reset()
}
} }
l.batchLock.Unlock() l.batchLock.Unlock()
if v, ok := <-c; ok { if v, ok := <-c; ok {
@@ -178,78 +150,14 @@ func (l *Loader[K, V]) reset() {
} }
type batcher[K comparable, V any] struct { type batcher[K comparable, V any] struct {
input chan *batchRequest[K, V] requests []batchRequest[K, V]
batchFn BatchFunc[K, V] batchFn BatchFunc[K, V]
finished bool
silent bool
} }
// newBatcher returns a batcher for the current requests // newBatcher returns a batcher for the current requests
// all the batcher methods must be protected by a global batchLock func (l *Loader[K, V]) newBatcher() *batcher[K, V] {
func (l *Loader[K, V]) newBatcher(silent bool) *batcher[K, V] {
return &batcher[K, V]{ return &batcher[K, V]{
input: make(chan *batchRequest[K, V], l.inputCap), requests: make([]batchRequest[K, V], 0, l.batchCap),
batchFn: l.batchFn, batchFn: l.batchFn,
silent: silent,
} }
} }
// stop receiving input and process batch function
func (b *batcher[K, V]) end() {
if !b.finished {
close(b.input)
b.finished = true
}
}
// execute the batch of all items in queue
func (b *batcher[K, V]) batch() {
var (
ctxs = make([]context.Context, 0, 30)
keys = make([]K, 0, 30)
reqs = make([]*batchRequest[K, V], 0, 30)
res map[K]Result[V]
)
for item := range b.input {
ctxs = append(ctxs, item.ctx)
keys = append(keys, item.key)
reqs = append(reqs, item)
}
func() {
res = b.batchFn(ctxs, keys)
}()
for _, req := range reqs {
if r, ok := res[req.key]; ok {
req.channel <- r
}
close(req.channel)
}
}
// wait the appropriate amount of time for the provided batcher
func (l *Loader[K, V]) sleeper(b *batcher[K, V], close chan bool) {
select {
// used by batch to close early. usually triggered by max batch size
case <-close:
return
// this will move this goroutine to the back of the callstack?
case <-time.After(l.wait):
}
// reset
// this is protected by the batchLock to avoid closing the batcher input
// channel while Load is inserting a request
l.batchLock.Lock()
b.end()
// We can end here also if the batcher has already been closed and a
// new one has been created. So reset the loader state only if the batcher
// is the current one
if l.curBatcher == b {
l.reset()
}
l.batchLock.Unlock()
}

View File

@@ -52,8 +52,10 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri
func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[*nostr.Event] { func(ctxs []context.Context, pubkeys []string) map[string]dataloader.Result[*nostr.Event] {
return sys.batchLoadReplaceableEvents(ctxs, kind, pubkeys) return sys.batchLoadReplaceableEvents(ctxs, kind, pubkeys)
}, },
dataloader.WithBatchCapacity[string, *nostr.Event](30), dataloader.Options{
dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), Wait: time.Millisecond * 110,
MaxThreshold: 30,
},
) )
} }
@@ -75,7 +77,8 @@ func (sys *System) batchLoadReplaceableEvents(
waiting := len(pubkeys) waiting := len(pubkeys)
for i, pubkey := range pubkeys { for i, pubkey := range pubkeys {
ctx := ctxs[i] ctx, cancel := context.WithCancel(ctxs[i])
defer cancel()
// build batched queries for the external relays // build batched queries for the external relays
go func(i int, pubkey string) { go func(i int, pubkey string) {