From 11b27f07da8d3773fb0743bcf0c5704995991581 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Sun, 30 Mar 2025 11:26:02 +0200 Subject: [PATCH] batch: dont expose kvdb.RwTx in batch.SchedulerOptions Currently, a few of the graph KVStore methods take the `batch.SchedulerOptions` param. This is only used to set the LazyAdd option. A SchedulerOption is a functional option that takes a `batch.Request` which has bolt-specific fields in it. This commit restructures things a bit such that the `batch.Request` type is no longer part of the `batch.SchedulerOptions` - this will make it easier to implement the graph store with a different DB backend. --- batch/interface.go | 32 +++++++++++++++++++++++++++++--- batch/scheduler.go | 6 +++++- graph/db/kv_store.go | 25 ++++++------------------- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/batch/interface.go b/batch/interface.go index cd58a148c..2a92fbed2 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -5,6 +5,9 @@ import "github.com/lightningnetwork/lnd/kvdb" // Request defines an operation that can be batched into a single bbolt // transaction. type Request struct { + // Opts holds various configuration options for a scheduled request. + Opts *SchedulerOptions + // Reset is called before each invocation of Update and is used to clear // any possible modifications to local state as a result of previous // calls to Update that were not committed due to a concurrent batch @@ -25,22 +28,45 @@ type Request struct { // // NOTE: This field is optional. OnCommit func(commitErr error) error +} +// SchedulerOptions holds various configuration options for a scheduled request. +type SchedulerOptions struct { // lazy should be true if we don't have to immediately execute this // request when it comes in. This means that it can be scheduled later, // allowing larger batches. lazy bool } +// NewDefaultSchedulerOpts returns a new SchedulerOptions with default values. +func NewDefaultSchedulerOpts() *SchedulerOptions { + return &SchedulerOptions{ + lazy: false, + } +} + +// NewSchedulerOptions returns a new SchedulerOptions with the given options +// applied on top of the default options. +func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions { + opts := NewDefaultSchedulerOpts() + for _, o := range options { + o(opts) + } + + return opts +} + // SchedulerOption is a type that can be used to supply options to a scheduled // request. -type SchedulerOption func(r *Request) +type SchedulerOption func(*SchedulerOptions) // LazyAdd will make the request be executed lazily, added to the next batch to // reduce db contention. +// +// NOTE: This is currently a no-op for any DB backend other than bbolt. func LazyAdd() SchedulerOption { - return func(r *Request) { - r.lazy = true + return func(opts *SchedulerOptions) { + opts.lazy = true } } diff --git a/batch/scheduler.go b/batch/scheduler.go index 4a01adda7..b91ee615c 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -43,6 +43,10 @@ func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, // // NOTE: Part of the Scheduler interface. func (s *TimeScheduler) Execute(r *Request) error { + if r.Opts == nil { + r.Opts = NewDefaultSchedulerOpts() + } + req := request{ Request: r, errChan: make(chan error, 1), @@ -62,7 +66,7 @@ func (s *TimeScheduler) Execute(r *Request) error { s.b.reqs = append(s.b.reqs, &req) // If this is a non-lazy request, we'll execute the batch immediately. - if !r.lazy { + if !r.Opts.lazy { go s.b.trigger() } diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 104c65b36..34be8cfc0 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -848,18 +848,15 @@ func (c *KVStore) SetSourceNode(node *models.LightningNode) error { // // TODO(roasbeef): also need sig of announcement. func (c *KVStore) AddLightningNode(node *models.LightningNode, - op ...batch.SchedulerOption) error { + opts ...batch.SchedulerOption) error { r := &batch.Request{ + Opts: batch.NewSchedulerOptions(opts...), Update: func(tx kvdb.RwTx) error { return addLightningNode(tx, node) }, } - for _, f := range op { - f(r) - } - return c.nodeScheduler.Execute(r) } @@ -986,10 +983,11 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket, // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, - op ...batch.SchedulerOption) error { + opts ...batch.SchedulerOption) error { var alreadyExists bool r := &batch.Request{ + Opts: batch.NewSchedulerOptions(opts...), Reset: func() { alreadyExists = false }, @@ -1019,14 +1017,6 @@ func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, }, } - for _, f := range op { - if f == nil { - return fmt.Errorf("nil scheduler option was used") - } - - f(r) - } - return c.chanScheduler.Execute(r) } @@ -2696,7 +2686,7 @@ func makeZombiePubkeys(info *models.ChannelEdgeInfo, // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { + opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { var ( isUpdate1 bool @@ -2705,6 +2695,7 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, ) r := &batch.Request{ + Opts: batch.NewSchedulerOptions(opts...), Reset: func() { isUpdate1 = false edgeNotFound = false @@ -2738,10 +2729,6 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, }, } - for _, f := range op { - f(r) - } - err := c.chanScheduler.Execute(r) return from, to, err