mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-14 18:30:52 +02:00
mulit: use kvdb.Backend for the kv payment db
Instead of the ChannelState struct we now use the kv backend interface for the payment kv database.
This commit is contained in:
@@ -352,7 +352,6 @@ type DB struct {
|
|||||||
dbPath string
|
dbPath string
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
dryRun bool
|
dryRun bool
|
||||||
keepFailedPaymentAttempts bool
|
|
||||||
storeFinalHtlcResolutions bool
|
storeFinalHtlcResolutions bool
|
||||||
|
|
||||||
// noRevLogAmtData if true, means that commitment transaction amount
|
// noRevLogAmtData if true, means that commitment transaction amount
|
||||||
@@ -413,7 +412,6 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
|
|||||||
},
|
},
|
||||||
clock: opts.clock,
|
clock: opts.clock,
|
||||||
dryRun: opts.dryRun,
|
dryRun: opts.dryRun,
|
||||||
keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
|
|
||||||
storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
|
storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
|
||||||
noRevLogAmtData: opts.NoRevLogAmtData,
|
noRevLogAmtData: opts.NoRevLogAmtData,
|
||||||
}
|
}
|
||||||
|
@@ -68,10 +68,6 @@ type Options struct {
|
|||||||
// database if set to true.
|
// database if set to true.
|
||||||
dryRun bool
|
dryRun bool
|
||||||
|
|
||||||
// keepFailedPaymentAttempts determines whether failed htlc attempts
|
|
||||||
// are kept on disk or removed to save space.
|
|
||||||
keepFailedPaymentAttempts bool
|
|
||||||
|
|
||||||
// storeFinalHtlcResolutions determines whether to persistently store
|
// storeFinalHtlcResolutions determines whether to persistently store
|
||||||
// the final resolution of incoming htlcs.
|
// the final resolution of incoming htlcs.
|
||||||
storeFinalHtlcResolutions bool
|
storeFinalHtlcResolutions bool
|
||||||
@@ -120,14 +116,6 @@ func OptionDryRunMigration(dryRun bool) OptionModifier {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OptionKeepFailedPaymentAttempts controls whether failed payment attempts are
|
|
||||||
// kept on disk after a payment settles.
|
|
||||||
func OptionKeepFailedPaymentAttempts(keepFailedPaymentAttempts bool) OptionModifier {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.keepFailedPaymentAttempts = keepFailedPaymentAttempts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionStoreFinalHtlcResolutions controls whether to persistently store the
|
// OptionStoreFinalHtlcResolutions controls whether to persistently store the
|
||||||
// final resolution of incoming htlcs.
|
// final resolution of incoming htlcs.
|
||||||
func OptionStoreFinalHtlcResolutions(
|
func OptionStoreFinalHtlcResolutions(
|
||||||
|
@@ -119,19 +119,68 @@ var (
|
|||||||
|
|
||||||
// KVPaymentsDB implements persistence for payments and payment attempts.
|
// KVPaymentsDB implements persistence for payments and payment attempts.
|
||||||
type KVPaymentsDB struct {
|
type KVPaymentsDB struct {
|
||||||
|
// Sequence management for the kv store.
|
||||||
paymentSeqMx sync.Mutex
|
paymentSeqMx sync.Mutex
|
||||||
currPaymentSeq uint64
|
currPaymentSeq uint64
|
||||||
storedPaymentSeq uint64
|
storedPaymentSeq uint64
|
||||||
db *DB
|
|
||||||
|
// db is the underlying database implementation.
|
||||||
|
db kvdb.Backend
|
||||||
|
|
||||||
|
keepFailedPaymentAttempts bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKVPaymentsDB creates a new instance of the KVPaymentsDB.
|
// defaultKVStoreOptions returns the default options for the KV store.
|
||||||
func NewKVPaymentsDB(db *DB) *KVPaymentsDB {
|
func defaultKVStoreOptions() *paymentsdb.StoreOptions {
|
||||||
return &KVPaymentsDB{
|
return &paymentsdb.StoreOptions{
|
||||||
db: db,
|
KeepFailedPaymentAttempts: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewKVPaymentsDB creates a new KVStore for payments.
|
||||||
|
func NewKVPaymentsDB(db kvdb.Backend,
|
||||||
|
options ...paymentsdb.OptionModifier) (*KVPaymentsDB, error) {
|
||||||
|
|
||||||
|
opts := defaultKVStoreOptions()
|
||||||
|
for _, applyOption := range options {
|
||||||
|
applyOption(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !opts.NoMigration {
|
||||||
|
if err := initKVStore(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &KVPaymentsDB{
|
||||||
|
db: db,
|
||||||
|
keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var paymentsTopLevelBuckets = [][]byte{
|
||||||
|
paymentsRootBucket,
|
||||||
|
paymentsIndexBucket,
|
||||||
|
}
|
||||||
|
|
||||||
|
// initKVStore creates and initializes the top-level buckets for the payment db.
|
||||||
|
func initKVStore(db kvdb.Backend) error {
|
||||||
|
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||||
|
for _, tlb := range paymentsTopLevelBuckets {
|
||||||
|
if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, func() {})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create new payments db: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// InitPayment checks or records the given PaymentCreationInfo with the DB,
|
// InitPayment checks or records the given PaymentCreationInfo with the DB,
|
||||||
// making sure it does not already exist as an in-flight payment. When this
|
// making sure it does not already exist as an in-flight payment. When this
|
||||||
// method returns successfully, the payment is guaranteed to be in the InFlight
|
// method returns successfully, the payment is guaranteed to be in the InFlight
|
||||||
@@ -154,7 +203,7 @@ func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
|
|||||||
infoBytes := b.Bytes()
|
infoBytes := b.Bytes()
|
||||||
|
|
||||||
var updateErr error
|
var updateErr error
|
||||||
err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
|
err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
|
||||||
// Reset the update error, to avoid carrying over an error
|
// Reset the update error, to avoid carrying over an error
|
||||||
// from a previous execution of the batched db transaction.
|
// from a previous execution of the batched db transaction.
|
||||||
updateErr = nil
|
updateErr = nil
|
||||||
@@ -241,7 +290,7 @@ func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
|
|||||||
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
|
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
|
||||||
// by the KVPaymentsDB db.
|
// by the KVPaymentsDB db.
|
||||||
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
|
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
|
||||||
if !p.db.keepFailedPaymentAttempts {
|
if !p.keepFailedPaymentAttempts {
|
||||||
const failedHtlcsOnly = true
|
const failedHtlcsOnly = true
|
||||||
err := p.DeletePayment(hash, failedHtlcsOnly)
|
err := p.DeletePayment(hash, failedHtlcsOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -322,7 +371,7 @@ func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash,
|
|||||||
binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
|
binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
|
||||||
|
|
||||||
var payment *MPPayment
|
var payment *MPPayment
|
||||||
err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
|
err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
|
||||||
prefetchPayment(tx, paymentHash)
|
prefetchPayment(tx, paymentHash)
|
||||||
bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
|
bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -490,7 +539,7 @@ func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash,
|
|||||||
binary.BigEndian.PutUint64(aid, attemptID)
|
binary.BigEndian.PutUint64(aid, attemptID)
|
||||||
|
|
||||||
var payment *MPPayment
|
var payment *MPPayment
|
||||||
err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
|
err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
|
||||||
payment = nil
|
payment = nil
|
||||||
|
|
||||||
prefetchPayment(tx, paymentHash)
|
prefetchPayment(tx, paymentHash)
|
||||||
@@ -562,7 +611,7 @@ func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash,
|
|||||||
updateErr error
|
updateErr error
|
||||||
payment *MPPayment
|
payment *MPPayment
|
||||||
)
|
)
|
||||||
err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
|
err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
|
||||||
// Reset the update error, to avoid carrying over an error
|
// Reset the update error, to avoid carrying over an error
|
||||||
// from a previous execution of the batched db transaction.
|
// from a previous execution of the batched db transaction.
|
||||||
updateErr = nil
|
updateErr = nil
|
||||||
@@ -715,7 +764,7 @@ func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) {
|
|||||||
// conflicts on the sequence when using etcd.
|
// conflicts on the sequence when using etcd.
|
||||||
if p.currPaymentSeq == p.storedPaymentSeq {
|
if p.currPaymentSeq == p.storedPaymentSeq {
|
||||||
var currPaymentSeq, newUpperBound uint64
|
var currPaymentSeq, newUpperBound uint64
|
||||||
if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
|
if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
|
||||||
paymentsBucket, err := tx.CreateTopLevelBucket(
|
paymentsBucket, err := tx.CreateTopLevelBucket(
|
||||||
paymentsRootBucket,
|
paymentsRootBucket,
|
||||||
)
|
)
|
||||||
|
@@ -67,7 +67,8 @@ func TestKVPaymentsDBSwitchFail(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, attempt, preimg, err := genInfo(t)
|
info, attempt, preimg, err := genInfo(t)
|
||||||
require.NoError(t, err, "unable to generate htlc message")
|
require.NoError(t, err, "unable to generate htlc message")
|
||||||
@@ -207,7 +208,8 @@ func TestKVPaymentsDBSwitchDoubleSend(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, attempt, preimg, err := genInfo(t)
|
info, attempt, preimg, err := genInfo(t)
|
||||||
require.NoError(t, err, "unable to generate htlc message")
|
require.NoError(t, err, "unable to generate htlc message")
|
||||||
@@ -283,7 +285,8 @@ func TestKVPaymentsDBSuccessesWithoutInFlight(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, _, preimg, err := genInfo(t)
|
info, _, preimg, err := genInfo(t)
|
||||||
require.NoError(t, err, "unable to generate htlc message")
|
require.NoError(t, err, "unable to generate htlc message")
|
||||||
@@ -306,7 +309,8 @@ func TestKVPaymentsDBFailsWithoutInFlight(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, _, _, err := genInfo(t)
|
info, _, _, err := genInfo(t)
|
||||||
require.NoError(t, err, "unable to generate htlc message")
|
require.NoError(t, err, "unable to generate htlc message")
|
||||||
@@ -329,7 +333,8 @@ func TestKVPaymentsDBDeleteNonInFlight(t *testing.T) {
|
|||||||
// start at 1, so 9999 is a safe bet for this test.
|
// start at 1, so 9999 is a safe bet for this test.
|
||||||
var duplicateSeqNr = 9999
|
var duplicateSeqNr = 9999
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
payments := []struct {
|
payments := []struct {
|
||||||
failed bool
|
failed bool
|
||||||
@@ -547,7 +552,8 @@ func TestKVPaymentsDBDeletePayments(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Register three payments:
|
// Register three payments:
|
||||||
// 1. A payment with two failed attempts.
|
// 1. A payment with two failed attempts.
|
||||||
@@ -608,7 +614,8 @@ func TestKVPaymentsDBDeleteSinglePayment(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Register four payments:
|
// Register four payments:
|
||||||
// All payments will have one failed HTLC attempt and one HTLC attempt
|
// All payments will have one failed HTLC attempt and one HTLC attempt
|
||||||
@@ -714,7 +721,8 @@ func TestKVPaymentsDBMultiShard(t *testing.T) {
|
|||||||
t.Fatalf("unable to init db: %v", err)
|
t.Fatalf("unable to init db: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, attempt, preimg, err := genInfo(t)
|
info, attempt, preimg, err := genInfo(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -995,7 +1003,8 @@ func TestKVPaymentsDBMPPRecordValidation(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, attempt, _, err := genInfo(t)
|
info, attempt, _, err := genInfo(t)
|
||||||
require.NoError(t, err, "unable to generate htlc message")
|
require.NoError(t, err, "unable to generate htlc message")
|
||||||
@@ -1076,11 +1085,15 @@ func TestDeleteFailedAttempts(t *testing.T) {
|
|||||||
|
|
||||||
func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) {
|
func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) {
|
||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
|
|
||||||
require.NoError(t, err, "unable to init db")
|
require.NoError(t, err, "unable to init db")
|
||||||
db.keepFailedPaymentAttempts = keepFailedPaymentAttempts
|
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(
|
||||||
|
keepFailedPaymentAttempts,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Register three payments:
|
// Register three payments:
|
||||||
// All payments will have one failed HTLC attempt and one HTLC attempt
|
// All payments will have one failed HTLC attempt and one HTLC attempt
|
||||||
@@ -1577,7 +1590,8 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) {
|
|||||||
db, err := MakeTestDB(t)
|
db, err := MakeTestDB(t)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Generate a test payment which does not have duplicates.
|
// Generate a test payment which does not have duplicates.
|
||||||
noDuplicates, _, _, err := genInfo(t)
|
noDuplicates, _, _, err := genInfo(t)
|
||||||
@@ -1703,8 +1717,8 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) {
|
|||||||
//
|
//
|
||||||
// This code is *only* intended to replicate legacy duplicate payments in lnd,
|
// This code is *only* intended to replicate legacy duplicate payments in lnd,
|
||||||
// our current schema does not allow duplicates.
|
// our current schema does not allow duplicates.
|
||||||
func appendDuplicatePayment(t *testing.T, db *DB, paymentHash lntypes.Hash,
|
func appendDuplicatePayment(t *testing.T, db kvdb.Backend,
|
||||||
seqNr uint64, preImg lntypes.Preimage) {
|
paymentHash lntypes.Hash, seqNr uint64, preImg lntypes.Preimage) {
|
||||||
|
|
||||||
err := kvdb.Update(db, func(tx walletdb.ReadWriteTx) error {
|
err := kvdb.Update(db, func(tx walletdb.ReadWriteTx) error {
|
||||||
bucket, err := fetchPaymentBucketUpdate(
|
bucket, err := fetchPaymentBucketUpdate(
|
||||||
|
@@ -358,7 +358,8 @@ func TestQueryPayments(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Initialize the payment database.
|
// Initialize the payment database.
|
||||||
paymentDB := NewKVPaymentsDB(db)
|
paymentDB, err := NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Make a preliminary query to make sure it's ok to
|
// Make a preliminary query to make sure it's ok to
|
||||||
// query when we have no payments.
|
// query when we have no payments.
|
||||||
|
@@ -48,6 +48,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
|
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
|
||||||
"github.com/lightningnetwork/lnd/macaroons"
|
"github.com/lightningnetwork/lnd/macaroons"
|
||||||
"github.com/lightningnetwork/lnd/msgmux"
|
"github.com/lightningnetwork/lnd/msgmux"
|
||||||
|
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
|
||||||
"github.com/lightningnetwork/lnd/rpcperms"
|
"github.com/lightningnetwork/lnd/rpcperms"
|
||||||
"github.com/lightningnetwork/lnd/signal"
|
"github.com/lightningnetwork/lnd/signal"
|
||||||
"github.com/lightningnetwork/lnd/sqldb"
|
"github.com/lightningnetwork/lnd/sqldb"
|
||||||
@@ -1052,9 +1053,6 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
|
|||||||
|
|
||||||
dbOptions := []channeldb.OptionModifier{
|
dbOptions := []channeldb.OptionModifier{
|
||||||
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
|
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
|
||||||
channeldb.OptionKeepFailedPaymentAttempts(
|
|
||||||
cfg.KeepFailedPaymentAttempts,
|
|
||||||
),
|
|
||||||
channeldb.OptionStoreFinalHtlcResolutions(
|
channeldb.OptionStoreFinalHtlcResolutions(
|
||||||
cfg.StoreFinalHtlcResolutions,
|
cfg.StoreFinalHtlcResolutions,
|
||||||
),
|
),
|
||||||
@@ -1222,9 +1220,24 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
|
|||||||
// Mount the payments DB which is only KV for now.
|
// Mount the payments DB which is only KV for now.
|
||||||
//
|
//
|
||||||
// TODO(ziggie): Add support for SQL payments DB.
|
// TODO(ziggie): Add support for SQL payments DB.
|
||||||
kvPaymentsDB := channeldb.NewKVPaymentsDB(
|
// Mount the payments DB for the KV store.
|
||||||
|
paymentsDBOptions := []paymentsdb.OptionModifier{
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(
|
||||||
|
cfg.KeepFailedPaymentAttempts,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
kvPaymentsDB, err := channeldb.NewKVPaymentsDB(
|
||||||
dbs.ChanStateDB,
|
dbs.ChanStateDB,
|
||||||
|
paymentsDBOptions...,
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
cleanUp()
|
||||||
|
|
||||||
|
err = fmt.Errorf("unable to open payments DB: %w", err)
|
||||||
|
d.logger.Error(err)
|
||||||
|
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
dbs.KVPaymentsDB = kvPaymentsDB
|
dbs.KVPaymentsDB = kvPaymentsDB
|
||||||
|
|
||||||
// Wrap the watchtower client DB and make sure we clean up.
|
// Wrap the watchtower client DB and make sure we clean up.
|
||||||
|
30
payments/db/options.go
Normal file
30
payments/db/options.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package paymentsdb
|
||||||
|
|
||||||
|
// StoreOptions holds parameters for the KVStore.
|
||||||
|
type StoreOptions struct {
|
||||||
|
// NoMigration allows to open the database in readonly mode
|
||||||
|
NoMigration bool
|
||||||
|
|
||||||
|
// KeepFailedPaymentAttempts is a flag that determines whether to keep
|
||||||
|
// failed payment attempts for a settled payment in the db.
|
||||||
|
KeepFailedPaymentAttempts bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// OptionModifier is a function signature for modifying the default
|
||||||
|
// StoreOptions.
|
||||||
|
type OptionModifier func(*StoreOptions)
|
||||||
|
|
||||||
|
// WithKeepFailedPaymentAttempts sets the KeepFailedPaymentAttempts to n.
|
||||||
|
func WithKeepFailedPaymentAttempts(n bool) OptionModifier {
|
||||||
|
return func(o *StoreOptions) {
|
||||||
|
o.KeepFailedPaymentAttempts = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNoMigration allows the database to be opened in read only mode by
|
||||||
|
// disabling migrations.
|
||||||
|
func WithNoMigration(b bool) OptionModifier {
|
||||||
|
return func(o *StoreOptions) {
|
||||||
|
o.NoMigration = b
|
||||||
|
}
|
||||||
|
}
|
@@ -48,12 +48,18 @@ var (
|
|||||||
func TestControlTowerSubscribeUnknown(t *testing.T) {
|
func TestControlTowerSubscribeUnknown(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := initDB(t, false)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(true),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
// Subscription should fail when the payment is not known.
|
// Subscription should fail when the payment is not known.
|
||||||
_, err := pControl.SubscribePayment(lntypes.Hash{1})
|
_, err = pControl.SubscribePayment(lntypes.Hash{1})
|
||||||
require.ErrorIs(t, err, paymentsdb.ErrPaymentNotInitiated)
|
require.ErrorIs(t, err, paymentsdb.ErrPaymentNotInitiated)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -62,9 +68,12 @@ func TestControlTowerSubscribeUnknown(t *testing.T) {
|
|||||||
func TestControlTowerSubscribeSuccess(t *testing.T) {
|
func TestControlTowerSubscribeSuccess(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := initDB(t, false)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
// Initiate a payment.
|
// Initiate a payment.
|
||||||
info, attempt, preimg, err := genInfo()
|
info, attempt, preimg, err := genInfo()
|
||||||
@@ -181,9 +190,15 @@ func TestKVPaymentsDBSubscribeFail(t *testing.T) {
|
|||||||
func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) {
|
func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := initDB(t, true)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(true),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
// Initiate a payment.
|
// Initiate a payment.
|
||||||
info1, attempt1, preimg1, err := genInfo()
|
info1, attempt1, preimg1, err := genInfo()
|
||||||
@@ -294,9 +309,15 @@ func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) {
|
|||||||
func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) {
|
func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := initDB(t, true)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(true),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
// Initiate a payment.
|
// Initiate a payment.
|
||||||
info, attempt, _, err := genInfo()
|
info, attempt, _, err := genInfo()
|
||||||
@@ -331,9 +352,15 @@ func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) {
|
|||||||
func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) {
|
func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := initDB(t, true)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(true),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
subscription1, err := pControl.SubscribeAllPayments()
|
subscription1, err := pControl.SubscribeAllPayments()
|
||||||
require.NoError(t, err, "expected subscribe to succeed, but got: %v")
|
require.NoError(t, err, "expected subscribe to succeed, but got: %v")
|
||||||
@@ -400,9 +427,17 @@ func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) {
|
|||||||
func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt,
|
func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt,
|
||||||
keepFailedPaymentAttempts bool) {
|
keepFailedPaymentAttempts bool) {
|
||||||
|
|
||||||
db := initDB(t, keepFailedPaymentAttempts)
|
db := initDB(t)
|
||||||
|
|
||||||
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))
|
paymentDB, err := channeldb.NewKVPaymentsDB(
|
||||||
|
db,
|
||||||
|
paymentsdb.WithKeepFailedPaymentAttempts(
|
||||||
|
keepFailedPaymentAttempts,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pControl := NewControlTower(paymentDB)
|
||||||
|
|
||||||
// Initiate a payment.
|
// Initiate a payment.
|
||||||
info, attempt, _, err := genInfo()
|
info, attempt, _, err := genInfo()
|
||||||
@@ -518,11 +553,9 @@ func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initDB(t *testing.T, keepFailedPaymentAttempts bool) *channeldb.DB {
|
func initDB(t *testing.T) *channeldb.DB {
|
||||||
return channeldb.OpenForTesting(
|
return channeldb.OpenForTesting(
|
||||||
t, t.TempDir(), channeldb.OptionKeepFailedPaymentAttempts(
|
t, t.TempDir(),
|
||||||
keepFailedPaymentAttempts,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user