diff --git a/channeldb/db.go b/channeldb/db.go index 793eb7018..715b90686 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -352,7 +352,6 @@ type DB struct { dbPath string clock clock.Clock dryRun bool - keepFailedPaymentAttempts bool storeFinalHtlcResolutions bool // noRevLogAmtData if true, means that commitment transaction amount @@ -413,7 +412,6 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, }, clock: opts.clock, dryRun: opts.dryRun, - keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts, storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions, noRevLogAmtData: opts.NoRevLogAmtData, } diff --git a/channeldb/options.go b/channeldb/options.go index b00ba1f59..a8ec8cfd6 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -68,10 +68,6 @@ type Options struct { // database if set to true. dryRun bool - // keepFailedPaymentAttempts determines whether failed htlc attempts - // are kept on disk or removed to save space. - keepFailedPaymentAttempts bool - // storeFinalHtlcResolutions determines whether to persistently store // the final resolution of incoming htlcs. storeFinalHtlcResolutions bool @@ -120,14 +116,6 @@ func OptionDryRunMigration(dryRun bool) OptionModifier { } } -// OptionKeepFailedPaymentAttempts controls whether failed payment attempts are -// kept on disk after a payment settles. -func OptionKeepFailedPaymentAttempts(keepFailedPaymentAttempts bool) OptionModifier { - return func(o *Options) { - o.keepFailedPaymentAttempts = keepFailedPaymentAttempts - } -} - // OptionStoreFinalHtlcResolutions controls whether to persistently store the // final resolution of incoming htlcs. func OptionStoreFinalHtlcResolutions( diff --git a/channeldb/payments_kv_store.go b/channeldb/payments_kv_store.go index 4c1880a8b..5474f60fa 100644 --- a/channeldb/payments_kv_store.go +++ b/channeldb/payments_kv_store.go @@ -119,19 +119,68 @@ var ( // KVPaymentsDB implements persistence for payments and payment attempts. type KVPaymentsDB struct { + // Sequence management for the kv store. paymentSeqMx sync.Mutex currPaymentSeq uint64 storedPaymentSeq uint64 - db *DB + + // db is the underlying database implementation. + db kvdb.Backend + + keepFailedPaymentAttempts bool } -// NewKVPaymentsDB creates a new instance of the KVPaymentsDB. -func NewKVPaymentsDB(db *DB) *KVPaymentsDB { - return &KVPaymentsDB{ - db: db, +// defaultKVStoreOptions returns the default options for the KV store. +func defaultKVStoreOptions() *paymentsdb.StoreOptions { + return &paymentsdb.StoreOptions{ + KeepFailedPaymentAttempts: false, } } +// NewKVPaymentsDB creates a new KVStore for payments. +func NewKVPaymentsDB(db kvdb.Backend, + options ...paymentsdb.OptionModifier) (*KVPaymentsDB, error) { + + opts := defaultKVStoreOptions() + for _, applyOption := range options { + applyOption(opts) + } + + if !opts.NoMigration { + if err := initKVStore(db); err != nil { + return nil, err + } + } + + return &KVPaymentsDB{ + db: db, + keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts, + }, nil +} + +var paymentsTopLevelBuckets = [][]byte{ + paymentsRootBucket, + paymentsIndexBucket, +} + +// initKVStore creates and initializes the top-level buckets for the payment db. +func initKVStore(db kvdb.Backend) error { + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + for _, tlb := range paymentsTopLevelBuckets { + if _, err := tx.CreateTopLevelBucket(tlb); err != nil { + return err + } + } + + return nil + }, func() {}) + if err != nil { + return fmt.Errorf("unable to create new payments db: %w", err) + } + + return nil +} + // InitPayment checks or records the given PaymentCreationInfo with the DB, // making sure it does not already exist as an in-flight payment. When this // method returns successfully, the payment is guaranteed to be in the InFlight @@ -154,7 +203,7 @@ func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash, infoBytes := b.Bytes() var updateErr error - err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error { + err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error { // Reset the update error, to avoid carrying over an error // from a previous execution of the batched db transaction. updateErr = nil @@ -241,7 +290,7 @@ func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash, // DeleteFailedAttempts deletes all failed htlcs for a payment if configured // by the KVPaymentsDB db. func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error { - if !p.db.keepFailedPaymentAttempts { + if !p.keepFailedPaymentAttempts { const failedHtlcsOnly = true err := p.DeletePayment(hash, failedHtlcsOnly) if err != nil { @@ -322,7 +371,7 @@ func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash, binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID) var payment *MPPayment - err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error { + err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error { prefetchPayment(tx, paymentHash) bucket, err := fetchPaymentBucketUpdate(tx, paymentHash) if err != nil { @@ -490,7 +539,7 @@ func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash, binary.BigEndian.PutUint64(aid, attemptID) var payment *MPPayment - err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error { + err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error { payment = nil prefetchPayment(tx, paymentHash) @@ -562,7 +611,7 @@ func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash, updateErr error payment *MPPayment ) - err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error { + err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error { // Reset the update error, to avoid carrying over an error // from a previous execution of the batched db transaction. updateErr = nil @@ -715,7 +764,7 @@ func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) { // conflicts on the sequence when using etcd. if p.currPaymentSeq == p.storedPaymentSeq { var currPaymentSeq, newUpperBound uint64 - if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error { + if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error { paymentsBucket, err := tx.CreateTopLevelBucket( paymentsRootBucket, ) diff --git a/channeldb/payments_kv_store_test.go b/channeldb/payments_kv_store_test.go index 4083fa327..d1953fa95 100644 --- a/channeldb/payments_kv_store_test.go +++ b/channeldb/payments_kv_store_test.go @@ -67,7 +67,8 @@ func TestKVPaymentsDBSwitchFail(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, attempt, preimg, err := genInfo(t) require.NoError(t, err, "unable to generate htlc message") @@ -207,7 +208,8 @@ func TestKVPaymentsDBSwitchDoubleSend(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, attempt, preimg, err := genInfo(t) require.NoError(t, err, "unable to generate htlc message") @@ -283,7 +285,8 @@ func TestKVPaymentsDBSuccessesWithoutInFlight(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, _, preimg, err := genInfo(t) require.NoError(t, err, "unable to generate htlc message") @@ -306,7 +309,8 @@ func TestKVPaymentsDBFailsWithoutInFlight(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, _, _, err := genInfo(t) require.NoError(t, err, "unable to generate htlc message") @@ -329,7 +333,8 @@ func TestKVPaymentsDBDeleteNonInFlight(t *testing.T) { // start at 1, so 9999 is a safe bet for this test. var duplicateSeqNr = 9999 - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) payments := []struct { failed bool @@ -547,7 +552,8 @@ func TestKVPaymentsDBDeletePayments(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) // Register three payments: // 1. A payment with two failed attempts. @@ -608,7 +614,8 @@ func TestKVPaymentsDBDeleteSinglePayment(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) // Register four payments: // All payments will have one failed HTLC attempt and one HTLC attempt @@ -714,7 +721,8 @@ func TestKVPaymentsDBMultiShard(t *testing.T) { t.Fatalf("unable to init db: %v", err) } - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, attempt, preimg, err := genInfo(t) if err != nil { @@ -995,7 +1003,8 @@ func TestKVPaymentsDBMPPRecordValidation(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err, "unable to init db") - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) info, attempt, _, err := genInfo(t) require.NoError(t, err, "unable to generate htlc message") @@ -1076,11 +1085,15 @@ func TestDeleteFailedAttempts(t *testing.T) { func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { db, err := MakeTestDB(t) - require.NoError(t, err, "unable to init db") - db.keepFailedPaymentAttempts = keepFailedPaymentAttempts - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts( + keepFailedPaymentAttempts, + ), + ) + require.NoError(t, err) // Register three payments: // All payments will have one failed HTLC attempt and one HTLC attempt @@ -1577,7 +1590,8 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { db, err := MakeTestDB(t) require.NoError(t, err) - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) // Generate a test payment which does not have duplicates. noDuplicates, _, _, err := genInfo(t) @@ -1703,8 +1717,8 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { // // This code is *only* intended to replicate legacy duplicate payments in lnd, // our current schema does not allow duplicates. -func appendDuplicatePayment(t *testing.T, db *DB, paymentHash lntypes.Hash, - seqNr uint64, preImg lntypes.Preimage) { +func appendDuplicatePayment(t *testing.T, db kvdb.Backend, + paymentHash lntypes.Hash, seqNr uint64, preImg lntypes.Preimage) { err := kvdb.Update(db, func(tx walletdb.ReadWriteTx) error { bucket, err := fetchPaymentBucketUpdate( diff --git a/channeldb/payments_test.go b/channeldb/payments_test.go index 727bd91fa..dce993ffe 100644 --- a/channeldb/payments_test.go +++ b/channeldb/payments_test.go @@ -358,7 +358,8 @@ func TestQueryPayments(t *testing.T) { require.NoError(t, err) // Initialize the payment database. - paymentDB := NewKVPaymentsDB(db) + paymentDB, err := NewKVPaymentsDB(db) + require.NoError(t, err) // Make a preliminary query to make sure it's ok to // query when we have no payments. diff --git a/config_builder.go b/config_builder.go index 35a44e2c6..47eb96b50 100644 --- a/config_builder.go +++ b/config_builder.go @@ -48,6 +48,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/rpcwallet" "github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/msgmux" + paymentsdb "github.com/lightningnetwork/lnd/payments/db" "github.com/lightningnetwork/lnd/rpcperms" "github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/sqldb" @@ -1052,9 +1053,6 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( dbOptions := []channeldb.OptionModifier{ channeldb.OptionDryRunMigration(cfg.DryRunMigration), - channeldb.OptionKeepFailedPaymentAttempts( - cfg.KeepFailedPaymentAttempts, - ), channeldb.OptionStoreFinalHtlcResolutions( cfg.StoreFinalHtlcResolutions, ), @@ -1222,9 +1220,24 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( // Mount the payments DB which is only KV for now. // // TODO(ziggie): Add support for SQL payments DB. - kvPaymentsDB := channeldb.NewKVPaymentsDB( + // Mount the payments DB for the KV store. + paymentsDBOptions := []paymentsdb.OptionModifier{ + paymentsdb.WithKeepFailedPaymentAttempts( + cfg.KeepFailedPaymentAttempts, + ), + } + kvPaymentsDB, err := channeldb.NewKVPaymentsDB( dbs.ChanStateDB, + paymentsDBOptions..., ) + if err != nil { + cleanUp() + + err = fmt.Errorf("unable to open payments DB: %w", err) + d.logger.Error(err) + + return nil, nil, err + } dbs.KVPaymentsDB = kvPaymentsDB // Wrap the watchtower client DB and make sure we clean up. diff --git a/payments/db/options.go b/payments/db/options.go new file mode 100644 index 000000000..4f6fa573e --- /dev/null +++ b/payments/db/options.go @@ -0,0 +1,30 @@ +package paymentsdb + +// StoreOptions holds parameters for the KVStore. +type StoreOptions struct { + // NoMigration allows to open the database in readonly mode + NoMigration bool + + // KeepFailedPaymentAttempts is a flag that determines whether to keep + // failed payment attempts for a settled payment in the db. + KeepFailedPaymentAttempts bool +} + +// OptionModifier is a function signature for modifying the default +// StoreOptions. +type OptionModifier func(*StoreOptions) + +// WithKeepFailedPaymentAttempts sets the KeepFailedPaymentAttempts to n. +func WithKeepFailedPaymentAttempts(n bool) OptionModifier { + return func(o *StoreOptions) { + o.KeepFailedPaymentAttempts = n + } +} + +// WithNoMigration allows the database to be opened in read only mode by +// disabling migrations. +func WithNoMigration(b bool) OptionModifier { + return func(o *StoreOptions) { + o.NoMigration = b + } +} diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index ceb5cb64f..f45c2de38 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -48,12 +48,18 @@ var ( func TestControlTowerSubscribeUnknown(t *testing.T) { t.Parallel() - db := initDB(t, false) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts(true), + ) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) // Subscription should fail when the payment is not known. - _, err := pControl.SubscribePayment(lntypes.Hash{1}) + _, err = pControl.SubscribePayment(lntypes.Hash{1}) require.ErrorIs(t, err, paymentsdb.ErrPaymentNotInitiated) } @@ -62,9 +68,12 @@ func TestControlTowerSubscribeUnknown(t *testing.T) { func TestControlTowerSubscribeSuccess(t *testing.T) { t.Parallel() - db := initDB(t, false) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB(db) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) // Initiate a payment. info, attempt, preimg, err := genInfo() @@ -181,9 +190,15 @@ func TestKVPaymentsDBSubscribeFail(t *testing.T) { func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) { t.Parallel() - db := initDB(t, true) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts(true), + ) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) // Initiate a payment. info1, attempt1, preimg1, err := genInfo() @@ -294,9 +309,15 @@ func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) { func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) { t.Parallel() - db := initDB(t, true) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts(true), + ) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) // Initiate a payment. info, attempt, _, err := genInfo() @@ -331,9 +352,15 @@ func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) { func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) { t.Parallel() - db := initDB(t, true) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts(true), + ) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) subscription1, err := pControl.SubscribeAllPayments() require.NoError(t, err, "expected subscribe to succeed, but got: %v") @@ -400,9 +427,17 @@ func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) { func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt, keepFailedPaymentAttempts bool) { - db := initDB(t, keepFailedPaymentAttempts) + db := initDB(t) - pControl := NewControlTower(channeldb.NewKVPaymentsDB(db)) + paymentDB, err := channeldb.NewKVPaymentsDB( + db, + paymentsdb.WithKeepFailedPaymentAttempts( + keepFailedPaymentAttempts, + ), + ) + require.NoError(t, err) + + pControl := NewControlTower(paymentDB) // Initiate a payment. info, attempt, _, err := genInfo() @@ -518,11 +553,9 @@ func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt, } } -func initDB(t *testing.T, keepFailedPaymentAttempts bool) *channeldb.DB { +func initDB(t *testing.T) *channeldb.DB { return channeldb.OpenForTesting( - t, t.TempDir(), channeldb.OptionKeepFailedPaymentAttempts( - keepFailedPaymentAttempts, - ), + t, t.TempDir(), ) }