batch+graph: update batch.Schedular to be generic

In preparation for using the same logic for non-bbolt backends, we adapt
the batch.Schedular to be more generic.

The only user of the scheduler at the moment is the KVStore in the
`graph.db` package. This store instantiates the bbolt implementation of
the scheduler.
This commit is contained in:
Elle Mouton
2025-05-21 08:07:40 +02:00
parent 9b9a964975
commit f5a466b051
6 changed files with 122 additions and 52 deletions

View File

@@ -1,10 +1,10 @@
package batch package batch
import ( import (
"context"
"errors" "errors"
"sync" "sync"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb" "github.com/lightningnetwork/lnd/sqldb"
) )
@@ -14,28 +14,30 @@ var errSolo = errors.New(
"batch function returned an error and should be re-run solo", "batch function returned an error and should be re-run solo",
) )
type request struct { type request[Q any] struct {
*Request *Request[Q]
errChan chan error errChan chan error
} }
type batch struct { type batch[Q any] struct {
db kvdb.Backend db sqldb.BatchedTx[Q]
start sync.Once start sync.Once
reqs []*request reqs []*request[Q]
clear func(b *batch) clear func(b *batch[Q])
locker sync.Locker locker sync.Locker
} }
// trigger is the entry point for the batch and ensures that run is started at // trigger is the entry point for the batch and ensures that run is started at
// most once. // most once.
func (b *batch) trigger() { func (b *batch[Q]) trigger(ctx context.Context) {
b.start.Do(b.run) b.start.Do(func() {
b.run(ctx)
})
} }
// run executes the current batch of requests. If any individual requests fail // run executes the current batch of requests. If any individual requests fail
// alongside others they will be retried by the caller. // alongside others they will be retried by the caller.
func (b *batch) run() { func (b *batch[Q]) run(ctx context.Context) {
// Clear the batch from its scheduler, ensuring that no new requests are // Clear the batch from its scheduler, ensuring that no new requests are
// added to this batch. // added to this batch.
b.clear(b) b.clear(b)
@@ -50,9 +52,10 @@ func (b *batch) run() {
// Apply the batch until a subset succeeds or all of them fail. Requests // Apply the batch until a subset succeeds or all of them fail. Requests
// that fail will be retried individually. // that fail will be retried individually.
var writeTx writeOpts
for len(b.reqs) > 0 { for len(b.reqs) > 0 {
var failIdx = -1 var failIdx = -1
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { err := b.db.ExecTx(ctx, &writeTx, func(tx Q) error {
for i, req := range b.reqs { for i, req := range b.reqs {
err := req.Update(tx) err := req.Update(tx)
if err != nil { if err != nil {

View File

@@ -1,6 +1,7 @@
package batch package batch
import ( import (
"context"
"errors" "errors"
"path/filepath" "path/filepath"
"sync" "sync"
@@ -12,7 +13,11 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestRetry tests the retry logic of the batch scheduler.
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
t.Parallel()
ctx := context.Background()
dbDir := t.TempDir() dbDir := t.TempDir()
dbName := filepath.Join(dbDir, "weks.db") dbName := filepath.Join(dbDir, "weks.db")
@@ -30,19 +35,21 @@ func TestRetry(t *testing.T) {
mu sync.Mutex mu sync.Mutex
called int called int
) )
sched := NewTimeScheduler(db, &mu, time.Second) sched := NewTimeScheduler[kvdb.RwTx](
NewBoltBackend[kvdb.RwTx](db), &mu, time.Second,
)
// First, we construct a request that should retry individually and // First, we construct a request that should retry individually and
// execute it non-lazily. It should still return the error the second // execute it non-lazily. It should still return the error the second
// time. // time.
req := &Request{ req := &Request[kvdb.RwTx]{
Update: func(tx kvdb.RwTx) error { Update: func(tx kvdb.RwTx) error {
called++ called++
return errors.New("test") return errors.New("test")
}, },
} }
err = sched.Execute(req) err = sched.Execute(ctx, req)
// Check and reset the called counter. // Check and reset the called counter.
mu.Lock() mu.Lock()
@@ -56,14 +63,14 @@ func TestRetry(t *testing.T) {
// a serialization error, which should cause the underlying postgres // a serialization error, which should cause the underlying postgres
// transaction to retry. Since we aren't using postgres, this will // transaction to retry. Since we aren't using postgres, this will
// cause the transaction to not be retried at all. // cause the transaction to not be retried at all.
req = &Request{ req = &Request[kvdb.RwTx]{
Update: func(tx kvdb.RwTx) error { Update: func(tx kvdb.RwTx) error {
called++ called++
return errors.New("could not serialize access") return errors.New("could not serialize access")
}, },
} }
err = sched.Execute(req) err = sched.Execute(ctx, req)
// Check the called counter. // Check the called counter.
mu.Lock() mu.Lock()

View File

@@ -1,10 +1,10 @@
package batch package batch
import "github.com/lightningnetwork/lnd/kvdb" import "context"
// Request defines an operation that can be batched into a single bbolt // Request defines an operation that can be batched into a single bbolt
// transaction. // transaction.
type Request struct { type Request[Q any] struct {
// Opts holds various configuration options for a scheduled request. // Opts holds various configuration options for a scheduled request.
Opts *SchedulerOptions Opts *SchedulerOptions
@@ -19,7 +19,7 @@ type Request struct {
// Update is applied alongside other operations in the batch. // Update is applied alongside other operations in the batch.
// //
// NOTE: This method MUST NOT acquire any mutexes. // NOTE: This method MUST NOT acquire any mutexes.
Update func(tx kvdb.RwTx) error Update func(tx Q) error
// OnCommit is called if the batch or a subset of the batch including // OnCommit is called if the batch or a subset of the batch including
// this request all succeeded without failure. The passed error should // this request all succeeded without failure. The passed error should
@@ -32,16 +32,16 @@ type Request struct {
// SchedulerOptions holds various configuration options for a scheduled request. // SchedulerOptions holds various configuration options for a scheduled request.
type SchedulerOptions struct { type SchedulerOptions struct {
// lazy should be true if we don't have to immediately execute this // Lazy should be true if we don't have to immediately execute this
// request when it comes in. This means that it can be scheduled later, // request when it comes in. This means that it can be scheduled later,
// allowing larger batches. // allowing larger batches.
lazy bool Lazy bool
} }
// NewDefaultSchedulerOpts returns a new SchedulerOptions with default values. // NewDefaultSchedulerOpts returns a new SchedulerOptions with default values.
func NewDefaultSchedulerOpts() *SchedulerOptions { func NewDefaultSchedulerOpts() *SchedulerOptions {
return &SchedulerOptions{ return &SchedulerOptions{
lazy: false, Lazy: false,
} }
} }
@@ -62,20 +62,18 @@ type SchedulerOption func(*SchedulerOptions)
// LazyAdd will make the request be executed lazily, added to the next batch to // LazyAdd will make the request be executed lazily, added to the next batch to
// reduce db contention. // reduce db contention.
//
// NOTE: This is currently a no-op for any DB backend other than bbolt.
func LazyAdd() SchedulerOption { func LazyAdd() SchedulerOption {
return func(opts *SchedulerOptions) { return func(opts *SchedulerOptions) {
opts.lazy = true opts.Lazy = true
} }
} }
// Scheduler abstracts a generic batching engine that accumulates an incoming // Scheduler abstracts a generic batching engine that accumulates an incoming
// set of Requests, executes them, and returns the error from the operation. // set of Requests, executes them, and returns the error from the operation.
type Scheduler interface { type Scheduler[Q any] interface {
// Execute schedules a Request for execution with the next available // Execute schedules a Request for execution with the next available
// batch. This method blocks until the underlying closure has been // batch. This method blocks until the underlying closure has been
// run against the database. The resulting error is returned to the // run against the database. The resulting error is returned to the
// caller. // caller.
Execute(req *Request) error Execute(ctx context.Context, req *Request[Q]) error
} }

41
batch/kvdb.go Normal file
View File

@@ -0,0 +1,41 @@
package batch
import (
"context"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb"
)
// BoltBatcher is a bbolt implementation of the sqldb.BatchedTx interface.
type BoltBatcher[Q any] struct {
db kvdb.Backend
}
// NewBoltBackend creates a new BoltBackend instance.
func NewBoltBackend[Q any](db kvdb.Backend) *BoltBatcher[Q] {
return &BoltBatcher[Q]{db: db}
}
// ExecTx will execute the passed txBody, operating upon generic
// parameter Q (usually a storage interface) in a single transaction.
//
// NOTE: This is part of the sqldb.BatchedTx interface.
func (t *BoltBatcher[Q]) ExecTx(_ context.Context, opts sqldb.TxOptions,
txBody func(Q) error, reset func()) error {
if opts.ReadOnly() {
return fmt.Errorf("read-only transactions not supported")
}
return kvdb.Update(t.db, func(tx kvdb.RwTx) error {
q, ok := any(tx).(Q)
if !ok {
return fmt.Errorf("unable to cast tx(%T) into the "+
"type expected by the BoltBatcher(%T)", tx, t)
}
return txBody(q)
}, reset)
}

View File

@@ -1,10 +1,11 @@
package batch package batch
import ( import (
"context"
"sync" "sync"
"time" "time"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/sqldb"
) )
// TimeScheduler is a batching engine that executes requests within a fixed // TimeScheduler is a batching engine that executes requests within a fixed
@@ -12,13 +13,13 @@ import (
// configurable duration for other concurrent requests to join the batch. Once // configurable duration for other concurrent requests to join the batch. Once
// this time has elapsed, the batch is closed and executed. Subsequent requests // this time has elapsed, the batch is closed and executed. Subsequent requests
// are then added to a new batch which undergoes the same process. // are then added to a new batch which undergoes the same process.
type TimeScheduler struct { type TimeScheduler[Q any] struct {
db kvdb.Backend db sqldb.BatchedTx[Q]
locker sync.Locker locker sync.Locker
duration time.Duration duration time.Duration
mu sync.Mutex mu sync.Mutex
b *batch b *batch[Q]
} }
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at // NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
@@ -26,28 +27,34 @@ type TimeScheduler struct {
// cache, the cache's lock should be provided to so that external consistency // cache, the cache's lock should be provided to so that external consistency
// can be maintained, as successful db operations will cause a request's // can be maintained, as successful db operations will cause a request's
// OnCommit method to be executed while holding this lock. // OnCommit method to be executed while holding this lock.
func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
duration time.Duration) *TimeScheduler { duration time.Duration) *TimeScheduler[Q] {
return &TimeScheduler{ return &TimeScheduler[Q]{
db: db, db: db,
locker: locker, locker: locker,
duration: duration, duration: duration,
} }
} }
type writeOpts struct{}
func (*writeOpts) ReadOnly() bool {
return false
}
// Execute schedules the provided request for batch execution along with other // Execute schedules the provided request for batch execution along with other
// concurrent requests. The request will be executed within a fixed horizon, // concurrent requests. The request will be executed within a fixed horizon,
// parameterizeed by the duration of the scheduler. The error from the // parameterizeed by the duration of the scheduler. The error from the
// underlying operation is returned to the caller. // underlying operation is returned to the caller.
// //
// NOTE: Part of the Scheduler interface. // NOTE: Part of the Scheduler interface.
func (s *TimeScheduler) Execute(r *Request) error { func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
if r.Opts == nil { if r.Opts == nil {
r.Opts = NewDefaultSchedulerOpts() r.Opts = NewDefaultSchedulerOpts()
} }
req := request{ req := request[Q]{
Request: r, Request: r,
errChan: make(chan error, 1), errChan: make(chan error, 1),
} }
@@ -56,18 +63,21 @@ func (s *TimeScheduler) Execute(r *Request) error {
// or no batch exists, create a new one. // or no batch exists, create a new one.
s.mu.Lock() s.mu.Lock()
if s.b == nil { if s.b == nil {
s.b = &batch{ s.b = &batch[Q]{
db: s.db, db: s.db,
clear: s.clear, clear: s.clear,
locker: s.locker, locker: s.locker,
} }
time.AfterFunc(s.duration, s.b.trigger) trigger := s.b.trigger
time.AfterFunc(s.duration, func() {
trigger(ctx)
})
} }
s.b.reqs = append(s.b.reqs, &req) s.b.reqs = append(s.b.reqs, &req)
// If this is a non-lazy request, we'll execute the batch immediately. // If this is a non-lazy request, we'll execute the batch immediately.
if !r.Opts.lazy { if !r.Opts.Lazy {
go s.b.trigger() go s.b.trigger(ctx)
} }
s.mu.Unlock() s.mu.Unlock()
@@ -87,7 +97,10 @@ func (s *TimeScheduler) Execute(r *Request) error {
} }
// Otherwise, run the request on its own. // Otherwise, run the request on its own.
commitErr := kvdb.Update(s.db, req.Update, func() { var writeTx writeOpts
commitErr := s.db.ExecTx(ctx, &writeTx, func(tx Q) error {
return req.Update(tx)
}, func() {
if req.Reset != nil { if req.Reset != nil {
req.Reset() req.Reset()
} }
@@ -104,7 +117,7 @@ func (s *TimeScheduler) Execute(r *Request) error {
// clear resets the scheduler's batch to nil so that no more requests can be // clear resets the scheduler's batch to nil so that no more requests can be
// added. // added.
func (s *TimeScheduler) clear(b *batch) { func (s *TimeScheduler[Q]) clear(b *batch[Q]) {
s.mu.Lock() s.mu.Lock()
if s.b == b { if s.b == b {
s.b = nil s.b = nil

View File

@@ -2,6 +2,7 @@ package graphdb
import ( import (
"bytes" "bytes"
"context"
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"errors" "errors"
@@ -192,8 +193,8 @@ type KVStore struct {
rejectCache *rejectCache rejectCache *rejectCache
chanCache *channelCache chanCache *channelCache
chanScheduler batch.Scheduler chanScheduler batch.Scheduler[kvdb.RwTx]
nodeScheduler batch.Scheduler nodeScheduler batch.Scheduler[kvdb.RwTx]
} }
// A compile-time assertion to ensure that the KVStore struct implements the // A compile-time assertion to ensure that the KVStore struct implements the
@@ -222,10 +223,12 @@ func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore,
chanCache: newChannelCache(opts.ChannelCacheSize), chanCache: newChannelCache(opts.ChannelCacheSize),
} }
g.chanScheduler = batch.NewTimeScheduler( g.chanScheduler = batch.NewTimeScheduler(
db, &g.cacheMu, opts.BatchCommitInterval, batch.NewBoltBackend[kvdb.RwTx](db), &g.cacheMu,
opts.BatchCommitInterval,
) )
g.nodeScheduler = batch.NewTimeScheduler( g.nodeScheduler = batch.NewTimeScheduler(
db, nil, opts.BatchCommitInterval, batch.NewBoltBackend[kvdb.RwTx](db), nil,
opts.BatchCommitInterval,
) )
return g, nil return g, nil
@@ -854,14 +857,16 @@ func (c *KVStore) SetSourceNode(node *models.LightningNode) error {
func (c *KVStore) AddLightningNode(node *models.LightningNode, func (c *KVStore) AddLightningNode(node *models.LightningNode,
opts ...batch.SchedulerOption) error { opts ...batch.SchedulerOption) error {
r := &batch.Request{ ctx := context.TODO()
r := &batch.Request[kvdb.RwTx]{
Opts: batch.NewSchedulerOptions(opts...), Opts: batch.NewSchedulerOptions(opts...),
Update: func(tx kvdb.RwTx) error { Update: func(tx kvdb.RwTx) error {
return addLightningNode(tx, node) return addLightningNode(tx, node)
}, },
} }
return c.nodeScheduler.Execute(r) return c.nodeScheduler.Execute(ctx, r)
} }
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error { func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
@@ -989,8 +994,10 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket,
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
opts ...batch.SchedulerOption) error { opts ...batch.SchedulerOption) error {
ctx := context.TODO()
var alreadyExists bool var alreadyExists bool
r := &batch.Request{ r := &batch.Request[kvdb.RwTx]{
Opts: batch.NewSchedulerOptions(opts...), Opts: batch.NewSchedulerOptions(opts...),
Reset: func() { Reset: func() {
alreadyExists = false alreadyExists = false
@@ -1021,7 +1028,7 @@ func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
}, },
} }
return c.chanScheduler.Execute(r) return c.chanScheduler.Execute(ctx, r)
} }
// addChannelEdge is the private form of AddChannelEdge that allows callers to // addChannelEdge is the private form of AddChannelEdge that allows callers to
@@ -2693,12 +2700,13 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
var ( var (
ctx = context.TODO()
isUpdate1 bool isUpdate1 bool
edgeNotFound bool edgeNotFound bool
from, to route.Vertex from, to route.Vertex
) )
r := &batch.Request{ r := &batch.Request[kvdb.RwTx]{
Opts: batch.NewSchedulerOptions(opts...), Opts: batch.NewSchedulerOptions(opts...),
Reset: func() { Reset: func() {
isUpdate1 = false isUpdate1 = false
@@ -2733,7 +2741,7 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
}, },
} }
err := c.chanScheduler.Execute(r) err := c.chanScheduler.Execute(ctx, r)
return from, to, err return from, to, err
} }