mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-08 14:57:38 +02:00
batch: dont expose kvdb.RwTx in batch.SchedulerOptions
Currently, a few of the graph KVStore methods take the `batch.SchedulerOptions` param. This is only used to set the LazyAdd option. A SchedulerOption is a functional option that takes a `batch.Request` which has bolt-specific fields in it. This commit restructures things a bit such that the `batch.Request` type is no longer part of the `batch.SchedulerOptions` - this will make it easier to implement the graph store with a different DB backend.
This commit is contained in:
@@ -5,6 +5,9 @@ import "github.com/lightningnetwork/lnd/kvdb"
|
|||||||
// Request defines an operation that can be batched into a single bbolt
|
// Request defines an operation that can be batched into a single bbolt
|
||||||
// transaction.
|
// transaction.
|
||||||
type Request struct {
|
type Request struct {
|
||||||
|
// Opts holds various configuration options for a scheduled request.
|
||||||
|
Opts *SchedulerOptions
|
||||||
|
|
||||||
// Reset is called before each invocation of Update and is used to clear
|
// Reset is called before each invocation of Update and is used to clear
|
||||||
// any possible modifications to local state as a result of previous
|
// any possible modifications to local state as a result of previous
|
||||||
// calls to Update that were not committed due to a concurrent batch
|
// calls to Update that were not committed due to a concurrent batch
|
||||||
@@ -25,22 +28,45 @@ type Request struct {
|
|||||||
//
|
//
|
||||||
// NOTE: This field is optional.
|
// NOTE: This field is optional.
|
||||||
OnCommit func(commitErr error) error
|
OnCommit func(commitErr error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulerOptions holds various configuration options for a scheduled request.
|
||||||
|
type SchedulerOptions struct {
|
||||||
// lazy should be true if we don't have to immediately execute this
|
// lazy should be true if we don't have to immediately execute this
|
||||||
// request when it comes in. This means that it can be scheduled later,
|
// request when it comes in. This means that it can be scheduled later,
|
||||||
// allowing larger batches.
|
// allowing larger batches.
|
||||||
lazy bool
|
lazy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewDefaultSchedulerOpts returns a new SchedulerOptions with default values.
|
||||||
|
func NewDefaultSchedulerOpts() *SchedulerOptions {
|
||||||
|
return &SchedulerOptions{
|
||||||
|
lazy: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSchedulerOptions returns a new SchedulerOptions with the given options
|
||||||
|
// applied on top of the default options.
|
||||||
|
func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions {
|
||||||
|
opts := NewDefaultSchedulerOpts()
|
||||||
|
for _, o := range options {
|
||||||
|
o(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
|
||||||
// SchedulerOption is a type that can be used to supply options to a scheduled
|
// SchedulerOption is a type that can be used to supply options to a scheduled
|
||||||
// request.
|
// request.
|
||||||
type SchedulerOption func(r *Request)
|
type SchedulerOption func(*SchedulerOptions)
|
||||||
|
|
||||||
// LazyAdd will make the request be executed lazily, added to the next batch to
|
// LazyAdd will make the request be executed lazily, added to the next batch to
|
||||||
// reduce db contention.
|
// reduce db contention.
|
||||||
|
//
|
||||||
|
// NOTE: This is currently a no-op for any DB backend other than bbolt.
|
||||||
func LazyAdd() SchedulerOption {
|
func LazyAdd() SchedulerOption {
|
||||||
return func(r *Request) {
|
return func(opts *SchedulerOptions) {
|
||||||
r.lazy = true
|
opts.lazy = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -43,6 +43,10 @@ func NewTimeScheduler(db kvdb.Backend, locker sync.Locker,
|
|||||||
//
|
//
|
||||||
// NOTE: Part of the Scheduler interface.
|
// NOTE: Part of the Scheduler interface.
|
||||||
func (s *TimeScheduler) Execute(r *Request) error {
|
func (s *TimeScheduler) Execute(r *Request) error {
|
||||||
|
if r.Opts == nil {
|
||||||
|
r.Opts = NewDefaultSchedulerOpts()
|
||||||
|
}
|
||||||
|
|
||||||
req := request{
|
req := request{
|
||||||
Request: r,
|
Request: r,
|
||||||
errChan: make(chan error, 1),
|
errChan: make(chan error, 1),
|
||||||
@@ -62,7 +66,7 @@ func (s *TimeScheduler) Execute(r *Request) error {
|
|||||||
s.b.reqs = append(s.b.reqs, &req)
|
s.b.reqs = append(s.b.reqs, &req)
|
||||||
|
|
||||||
// If this is a non-lazy request, we'll execute the batch immediately.
|
// If this is a non-lazy request, we'll execute the batch immediately.
|
||||||
if !r.lazy {
|
if !r.Opts.lazy {
|
||||||
go s.b.trigger()
|
go s.b.trigger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -848,18 +848,15 @@ func (c *KVStore) SetSourceNode(node *models.LightningNode) error {
|
|||||||
//
|
//
|
||||||
// TODO(roasbeef): also need sig of announcement.
|
// TODO(roasbeef): also need sig of announcement.
|
||||||
func (c *KVStore) AddLightningNode(node *models.LightningNode,
|
func (c *KVStore) AddLightningNode(node *models.LightningNode,
|
||||||
op ...batch.SchedulerOption) error {
|
opts ...batch.SchedulerOption) error {
|
||||||
|
|
||||||
r := &batch.Request{
|
r := &batch.Request{
|
||||||
|
Opts: batch.NewSchedulerOptions(opts...),
|
||||||
Update: func(tx kvdb.RwTx) error {
|
Update: func(tx kvdb.RwTx) error {
|
||||||
return addLightningNode(tx, node)
|
return addLightningNode(tx, node)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, f := range op {
|
|
||||||
f(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.nodeScheduler.Execute(r)
|
return c.nodeScheduler.Execute(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -986,10 +983,11 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket,
|
|||||||
// supports. The chanPoint and chanID are used to uniquely identify the edge
|
// supports. The chanPoint and chanID are used to uniquely identify the edge
|
||||||
// globally within the database.
|
// globally within the database.
|
||||||
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
|
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
|
||||||
op ...batch.SchedulerOption) error {
|
opts ...batch.SchedulerOption) error {
|
||||||
|
|
||||||
var alreadyExists bool
|
var alreadyExists bool
|
||||||
r := &batch.Request{
|
r := &batch.Request{
|
||||||
|
Opts: batch.NewSchedulerOptions(opts...),
|
||||||
Reset: func() {
|
Reset: func() {
|
||||||
alreadyExists = false
|
alreadyExists = false
|
||||||
},
|
},
|
||||||
@@ -1019,14 +1017,6 @@ func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, f := range op {
|
|
||||||
if f == nil {
|
|
||||||
return fmt.Errorf("nil scheduler option was used")
|
|
||||||
}
|
|
||||||
|
|
||||||
f(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.chanScheduler.Execute(r)
|
return c.chanScheduler.Execute(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2696,7 +2686,7 @@ func makeZombiePubkeys(info *models.ChannelEdgeInfo,
|
|||||||
// determined by the lexicographical ordering of the identity public keys of the
|
// determined by the lexicographical ordering of the identity public keys of the
|
||||||
// nodes on either side of the channel.
|
// nodes on either side of the channel.
|
||||||
func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
|
func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
|
||||||
op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
|
opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
isUpdate1 bool
|
isUpdate1 bool
|
||||||
@@ -2705,6 +2695,7 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
|
|||||||
)
|
)
|
||||||
|
|
||||||
r := &batch.Request{
|
r := &batch.Request{
|
||||||
|
Opts: batch.NewSchedulerOptions(opts...),
|
||||||
Reset: func() {
|
Reset: func() {
|
||||||
isUpdate1 = false
|
isUpdate1 = false
|
||||||
edgeNotFound = false
|
edgeNotFound = false
|
||||||
@@ -2738,10 +2729,6 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, f := range op {
|
|
||||||
f(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.chanScheduler.Execute(r)
|
err := c.chanScheduler.Execute(r)
|
||||||
|
|
||||||
return from, to, err
|
return from, to, err
|
||||||
|
Reference in New Issue
Block a user