diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index ea2a56cf0..530cac823 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -7,6 +7,7 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcwallet/walletdb" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/kvdb" @@ -226,7 +227,8 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, } store, err := newMissionControlStore( - db, cfg.MaxMcHistory, cfg.McFlushInterval, + newNamespacedDB(db), cfg.MaxMcHistory, + cfg.McFlushInterval, ) if err != nil { return nil, err @@ -545,3 +547,74 @@ func (m *MissionControl) applyPaymentResult( return i.finalFailureReason } + +// namespacedDB is an implementation of the missionControlDB that gives a user +// of the interface access to the top level mission control bucket. In a +// follow-up commit (accompanied by a migration), this will change to giving +// the user of the interface access to a namespaced sub-bucket instead. +type namespacedDB struct { + topLevelBucketKey []byte + db kvdb.Backend +} + +// A compile-time check to ensure that namespacedDB implements missionControlDB. +var _ missionControlDB = (*namespacedDB)(nil) + +// newNamespacedDB creates a new instance of missionControlDB where the DB will +// have access to the top level bucket. +func newNamespacedDB(db kvdb.Backend) missionControlDB { + return &namespacedDB{ + db: db, + topLevelBucketKey: resultsKey, + } +} + +// update can be used to perform reads and writes on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error, + reset func()) error { + + return n.db.Update(func(tx kvdb.RwTx) error { + mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey) + if err != nil { + return fmt.Errorf("cannot create top level mission "+ + "control bucket: %w", err) + } + + return f(mcStoreBkt) + }, reset) +} + +// view can be used to perform reads on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error, + reset func()) error { + + return n.db.View(func(tx kvdb.RTx) error { + mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + return f(mcStoreBkt) + }, reset) +} + +// purge will delete all the contents in the namespace. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) purge() error { + return n.db.Update(func(tx kvdb.RwTx) error { + err := tx.DeleteTopLevelBucket(n.topLevelBucketKey) + if err != nil { + return err + } + + _, err = tx.CreateTopLevelBucket(n.topLevelBucketKey) + + return err + }, func() {}) +} diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index 9f1dc8d31..c40f24697 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -32,6 +32,21 @@ const ( unknownFailureSourceIdx = -1 ) +// missionControlDB is an interface that defines the database methods that a +// single missionControlStore has access to. It allows the missionControlStore +// to be unaware of the overall DB structure and restricts its access to the DB +// by only providing it the bucket that it needs to care about. +type missionControlDB interface { + // update can be used to perform reads and writes on the given bucket. + update(f func(bkt kvdb.RwBucket) error, reset func()) error + + // view can be used to perform reads on the given bucket. + view(f func(bkt kvdb.RBucket) error, reset func()) error + + // purge will delete all the contents in this store. + purge() error +} + // missionControlStore is a bolt db based implementation of a mission control // store. It stores the raw payment attempt data from which the internal mission // controls state can be rederived on startup. This allows the mission control @@ -41,7 +56,7 @@ const ( type missionControlStore struct { done chan struct{} wg sync.WaitGroup - db kvdb.Backend + db missionControlDB // queueCond is signalled when items are put into the queue. queueCond *sync.Cond @@ -67,7 +82,7 @@ type missionControlStore struct { flushInterval time.Duration } -func newMissionControlStore(db kvdb.Backend, maxRecords int, +func newMissionControlStore(db missionControlDB, maxRecords int, flushInterval time.Duration) (*missionControlStore, error) { var ( @@ -76,13 +91,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int, ) // Create buckets if not yet existing. - err := kvdb.Update(db, func(tx kvdb.RwTx) error { - resultsBucket, err := tx.CreateTopLevelBucket(resultsKey) - if err != nil { - return fmt.Errorf("cannot create results bucket: %w", - err) - } - + err := db.update(func(resultsBucket kvdb.RwBucket) error { // Collect all keys to be able to quickly calculate the // difference when updating the DB state. c := resultsBucket.ReadCursor() @@ -119,20 +128,12 @@ func (b *missionControlStore) clear() error { b.queueCond.L.Lock() defer b.queueCond.L.Unlock() - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { - return err - } - - _, err := tx.CreateTopLevelBucket(resultsKey) - return err - }, func() {}) - - if err != nil { + if err := b.db.purge(); err != nil { return err } b.queue = list.New() + return nil } @@ -140,8 +141,7 @@ func (b *missionControlStore) clear() error { func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { var results []*paymentResult - err := kvdb.View(b.db, func(tx kvdb.RTx) error { - resultBucket := tx.ReadBucket(resultsKey) + err := b.db.view(func(resultBucket kvdb.RBucket) error { results = make([]*paymentResult, 0) return resultBucket.ForEach(func(k, v []byte) error { @@ -511,9 +511,7 @@ func (b *missionControlStore) storeResults() error { } } - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - bucket := tx.ReadWriteBucket(resultsKey) - + err := b.db.update(func(bucket kvdb.RwBucket) error { for e := l.Front(); e != nil; e = e.Next() { pr, ok := e.Value.(*paymentResult) if !ok { diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 804824f64..74cc72309 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -62,7 +62,9 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int, require.NoError(t, db.Close()) }) - store, err := newMissionControlStore(db, maxRecords, flushInterval) + store, err := newMissionControlStore( + newNamespacedDB(db), maxRecords, flushInterval, + ) require.NoError(t, err) return mcStoreTestHarness{db: db, store: store} @@ -115,7 +117,9 @@ func TestMissionControlStore(t *testing.T) { require.Equal(t, &result2, results[1]) // Recreate store to test pruning. - store, err = newMissionControlStore(db, testMaxRecords, time.Second) + store, err = newMissionControlStore( + newNamespacedDB(db), testMaxRecords, time.Second, + ) require.NoError(t, err) // Add a newer result which failed due to mpp timeout. @@ -213,7 +217,9 @@ func TestMissionControlStoreFlushing(t *testing.T) { store.stop() // Recreate store. - store, err := newMissionControlStore(db, testMaxRecords, flushInterval) + store, err := newMissionControlStore( + newNamespacedDB(db), testMaxRecords, flushInterval, + ) require.NoError(t, err) store.run() defer store.stop()