From f5a466b051be68bf76404c6cb7ac3591cc7c0646 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 21 May 2025 08:07:40 +0200 Subject: [PATCH] batch+graph: update batch.Schedular to be generic In preparation for using the same logic for non-bbolt backends, we adapt the batch.Schedular to be more generic. The only user of the scheduler at the moment is the KVStore in the `graph.db` package. This store instantiates the bbolt implementation of the scheduler. --- batch/batch.go | 25 ++++++++++++++----------- batch/batch_test.go | 17 ++++++++++++----- batch/interface.go | 20 +++++++++----------- batch/kvdb.go | 41 +++++++++++++++++++++++++++++++++++++++++ batch/scheduler.go | 43 ++++++++++++++++++++++++++++--------------- graph/db/kv_store.go | 28 ++++++++++++++++++---------- 6 files changed, 122 insertions(+), 52 deletions(-) create mode 100644 batch/kvdb.go diff --git a/batch/batch.go b/batch/batch.go index 9f4842c65..e2d8f1daa 100644 --- a/batch/batch.go +++ b/batch/batch.go @@ -1,10 +1,10 @@ package batch import ( + "context" "errors" "sync" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/sqldb" ) @@ -14,28 +14,30 @@ var errSolo = errors.New( "batch function returned an error and should be re-run solo", ) -type request struct { - *Request +type request[Q any] struct { + *Request[Q] errChan chan error } -type batch struct { - db kvdb.Backend +type batch[Q any] struct { + db sqldb.BatchedTx[Q] start sync.Once - reqs []*request - clear func(b *batch) + reqs []*request[Q] + clear func(b *batch[Q]) locker sync.Locker } // trigger is the entry point for the batch and ensures that run is started at // most once. -func (b *batch) trigger() { - b.start.Do(b.run) +func (b *batch[Q]) trigger(ctx context.Context) { + b.start.Do(func() { + b.run(ctx) + }) } // run executes the current batch of requests. If any individual requests fail // alongside others they will be retried by the caller. -func (b *batch) run() { +func (b *batch[Q]) run(ctx context.Context) { // Clear the batch from its scheduler, ensuring that no new requests are // added to this batch. b.clear(b) @@ -50,9 +52,10 @@ func (b *batch) run() { // Apply the batch until a subset succeeds or all of them fail. Requests // that fail will be retried individually. + var writeTx writeOpts for len(b.reqs) > 0 { var failIdx = -1 - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + err := b.db.ExecTx(ctx, &writeTx, func(tx Q) error { for i, req := range b.reqs { err := req.Update(tx) if err != nil { diff --git a/batch/batch_test.go b/batch/batch_test.go index 7a73b8f4b..aecf67c2e 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -1,6 +1,7 @@ package batch import ( + "context" "errors" "path/filepath" "sync" @@ -12,7 +13,11 @@ import ( "github.com/stretchr/testify/require" ) +// TestRetry tests the retry logic of the batch scheduler. func TestRetry(t *testing.T) { + t.Parallel() + ctx := context.Background() + dbDir := t.TempDir() dbName := filepath.Join(dbDir, "weks.db") @@ -30,19 +35,21 @@ func TestRetry(t *testing.T) { mu sync.Mutex called int ) - sched := NewTimeScheduler(db, &mu, time.Second) + sched := NewTimeScheduler[kvdb.RwTx]( + NewBoltBackend[kvdb.RwTx](db), &mu, time.Second, + ) // First, we construct a request that should retry individually and // execute it non-lazily. It should still return the error the second // time. - req := &Request{ + req := &Request[kvdb.RwTx]{ Update: func(tx kvdb.RwTx) error { called++ return errors.New("test") }, } - err = sched.Execute(req) + err = sched.Execute(ctx, req) // Check and reset the called counter. mu.Lock() @@ -56,14 +63,14 @@ func TestRetry(t *testing.T) { // a serialization error, which should cause the underlying postgres // transaction to retry. Since we aren't using postgres, this will // cause the transaction to not be retried at all. - req = &Request{ + req = &Request[kvdb.RwTx]{ Update: func(tx kvdb.RwTx) error { called++ return errors.New("could not serialize access") }, } - err = sched.Execute(req) + err = sched.Execute(ctx, req) // Check the called counter. mu.Lock() diff --git a/batch/interface.go b/batch/interface.go index 2a92fbed2..aad02c8c3 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -1,10 +1,10 @@ package batch -import "github.com/lightningnetwork/lnd/kvdb" +import "context" // Request defines an operation that can be batched into a single bbolt // transaction. -type Request struct { +type Request[Q any] struct { // Opts holds various configuration options for a scheduled request. Opts *SchedulerOptions @@ -19,7 +19,7 @@ type Request struct { // Update is applied alongside other operations in the batch. // // NOTE: This method MUST NOT acquire any mutexes. - Update func(tx kvdb.RwTx) error + Update func(tx Q) error // OnCommit is called if the batch or a subset of the batch including // this request all succeeded without failure. The passed error should @@ -32,16 +32,16 @@ type Request struct { // SchedulerOptions holds various configuration options for a scheduled request. type SchedulerOptions struct { - // lazy should be true if we don't have to immediately execute this + // Lazy should be true if we don't have to immediately execute this // request when it comes in. This means that it can be scheduled later, // allowing larger batches. - lazy bool + Lazy bool } // NewDefaultSchedulerOpts returns a new SchedulerOptions with default values. func NewDefaultSchedulerOpts() *SchedulerOptions { return &SchedulerOptions{ - lazy: false, + Lazy: false, } } @@ -62,20 +62,18 @@ type SchedulerOption func(*SchedulerOptions) // LazyAdd will make the request be executed lazily, added to the next batch to // reduce db contention. -// -// NOTE: This is currently a no-op for any DB backend other than bbolt. func LazyAdd() SchedulerOption { return func(opts *SchedulerOptions) { - opts.lazy = true + opts.Lazy = true } } // Scheduler abstracts a generic batching engine that accumulates an incoming // set of Requests, executes them, and returns the error from the operation. -type Scheduler interface { +type Scheduler[Q any] interface { // Execute schedules a Request for execution with the next available // batch. This method blocks until the underlying closure has been // run against the database. The resulting error is returned to the // caller. - Execute(req *Request) error + Execute(ctx context.Context, req *Request[Q]) error } diff --git a/batch/kvdb.go b/batch/kvdb.go new file mode 100644 index 000000000..d9b969b78 --- /dev/null +++ b/batch/kvdb.go @@ -0,0 +1,41 @@ +package batch + +import ( + "context" + "fmt" + + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/sqldb" +) + +// BoltBatcher is a bbolt implementation of the sqldb.BatchedTx interface. +type BoltBatcher[Q any] struct { + db kvdb.Backend +} + +// NewBoltBackend creates a new BoltBackend instance. +func NewBoltBackend[Q any](db kvdb.Backend) *BoltBatcher[Q] { + return &BoltBatcher[Q]{db: db} +} + +// ExecTx will execute the passed txBody, operating upon generic +// parameter Q (usually a storage interface) in a single transaction. +// +// NOTE: This is part of the sqldb.BatchedTx interface. +func (t *BoltBatcher[Q]) ExecTx(_ context.Context, opts sqldb.TxOptions, + txBody func(Q) error, reset func()) error { + + if opts.ReadOnly() { + return fmt.Errorf("read-only transactions not supported") + } + + return kvdb.Update(t.db, func(tx kvdb.RwTx) error { + q, ok := any(tx).(Q) + if !ok { + return fmt.Errorf("unable to cast tx(%T) into the "+ + "type expected by the BoltBatcher(%T)", tx, t) + } + + return txBody(q) + }, reset) +} diff --git a/batch/scheduler.go b/batch/scheduler.go index b91ee615c..af7a68a4b 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -1,10 +1,11 @@ package batch import ( + "context" "sync" "time" - "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/sqldb" ) // TimeScheduler is a batching engine that executes requests within a fixed @@ -12,13 +13,13 @@ import ( // configurable duration for other concurrent requests to join the batch. Once // this time has elapsed, the batch is closed and executed. Subsequent requests // are then added to a new batch which undergoes the same process. -type TimeScheduler struct { - db kvdb.Backend +type TimeScheduler[Q any] struct { + db sqldb.BatchedTx[Q] locker sync.Locker duration time.Duration mu sync.Mutex - b *batch + b *batch[Q] } // NewTimeScheduler initializes a new TimeScheduler with a fixed duration at @@ -26,28 +27,34 @@ type TimeScheduler struct { // cache, the cache's lock should be provided to so that external consistency // can be maintained, as successful db operations will cause a request's // OnCommit method to be executed while holding this lock. -func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, - duration time.Duration) *TimeScheduler { +func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker, + duration time.Duration) *TimeScheduler[Q] { - return &TimeScheduler{ + return &TimeScheduler[Q]{ db: db, locker: locker, duration: duration, } } +type writeOpts struct{} + +func (*writeOpts) ReadOnly() bool { + return false +} + // Execute schedules the provided request for batch execution along with other // concurrent requests. The request will be executed within a fixed horizon, // parameterizeed by the duration of the scheduler. The error from the // underlying operation is returned to the caller. // // NOTE: Part of the Scheduler interface. -func (s *TimeScheduler) Execute(r *Request) error { +func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error { if r.Opts == nil { r.Opts = NewDefaultSchedulerOpts() } - req := request{ + req := request[Q]{ Request: r, errChan: make(chan error, 1), } @@ -56,18 +63,21 @@ func (s *TimeScheduler) Execute(r *Request) error { // or no batch exists, create a new one. s.mu.Lock() if s.b == nil { - s.b = &batch{ + s.b = &batch[Q]{ db: s.db, clear: s.clear, locker: s.locker, } - time.AfterFunc(s.duration, s.b.trigger) + trigger := s.b.trigger + time.AfterFunc(s.duration, func() { + trigger(ctx) + }) } s.b.reqs = append(s.b.reqs, &req) // If this is a non-lazy request, we'll execute the batch immediately. - if !r.Opts.lazy { - go s.b.trigger() + if !r.Opts.Lazy { + go s.b.trigger(ctx) } s.mu.Unlock() @@ -87,7 +97,10 @@ func (s *TimeScheduler) Execute(r *Request) error { } // Otherwise, run the request on its own. - commitErr := kvdb.Update(s.db, req.Update, func() { + var writeTx writeOpts + commitErr := s.db.ExecTx(ctx, &writeTx, func(tx Q) error { + return req.Update(tx) + }, func() { if req.Reset != nil { req.Reset() } @@ -104,7 +117,7 @@ func (s *TimeScheduler) Execute(r *Request) error { // clear resets the scheduler's batch to nil so that no more requests can be // added. -func (s *TimeScheduler) clear(b *batch) { +func (s *TimeScheduler[Q]) clear(b *batch[Q]) { s.mu.Lock() if s.b == b { s.b = nil diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index e0593cb09..05ca6d435 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2,6 +2,7 @@ package graphdb import ( "bytes" + "context" "crypto/sha256" "encoding/binary" "errors" @@ -192,8 +193,8 @@ type KVStore struct { rejectCache *rejectCache chanCache *channelCache - chanScheduler batch.Scheduler - nodeScheduler batch.Scheduler + chanScheduler batch.Scheduler[kvdb.RwTx] + nodeScheduler batch.Scheduler[kvdb.RwTx] } // A compile-time assertion to ensure that the KVStore struct implements the @@ -222,10 +223,12 @@ func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore, chanCache: newChannelCache(opts.ChannelCacheSize), } g.chanScheduler = batch.NewTimeScheduler( - db, &g.cacheMu, opts.BatchCommitInterval, + batch.NewBoltBackend[kvdb.RwTx](db), &g.cacheMu, + opts.BatchCommitInterval, ) g.nodeScheduler = batch.NewTimeScheduler( - db, nil, opts.BatchCommitInterval, + batch.NewBoltBackend[kvdb.RwTx](db), nil, + opts.BatchCommitInterval, ) return g, nil @@ -854,14 +857,16 @@ func (c *KVStore) SetSourceNode(node *models.LightningNode) error { func (c *KVStore) AddLightningNode(node *models.LightningNode, opts ...batch.SchedulerOption) error { - r := &batch.Request{ + ctx := context.TODO() + + r := &batch.Request[kvdb.RwTx]{ Opts: batch.NewSchedulerOptions(opts...), Update: func(tx kvdb.RwTx) error { return addLightningNode(tx, node) }, } - return c.nodeScheduler.Execute(r) + return c.nodeScheduler.Execute(ctx, r) } func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error { @@ -989,8 +994,10 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket, func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error { + ctx := context.TODO() + var alreadyExists bool - r := &batch.Request{ + r := &batch.Request[kvdb.RwTx]{ Opts: batch.NewSchedulerOptions(opts...), Reset: func() { alreadyExists = false @@ -1021,7 +1028,7 @@ func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, }, } - return c.chanScheduler.Execute(r) + return c.chanScheduler.Execute(ctx, r) } // addChannelEdge is the private form of AddChannelEdge that allows callers to @@ -2693,12 +2700,13 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { var ( + ctx = context.TODO() isUpdate1 bool edgeNotFound bool from, to route.Vertex ) - r := &batch.Request{ + r := &batch.Request[kvdb.RwTx]{ Opts: batch.NewSchedulerOptions(opts...), Reset: func() { isUpdate1 = false @@ -2733,7 +2741,7 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, }, } - err := c.chanScheduler.Execute(r) + err := c.chanScheduler.Execute(ctx, r) return from, to, err }