From 8b876be3b6275895e1047f9c2322378cd95db182 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 26 Oct 2023 14:27:46 +0800 Subject: [PATCH] sweep: add fee info for published inputs This commit attaches RBFInfo to an input before it's been published or it's already been published. --- sweep/sweeper.go | 91 ++++++++++++++++++++++++++++++---- sweep/sweeper_test.go | 110 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 11 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 1cc1a7209..d9f46dda6 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/labels" "github.com/lightningnetwork/lnd/lnwallet" @@ -157,6 +158,19 @@ func (s SweepState) String() string { } } +// RBFInfo stores the information required to perform a RBF bump on a pending +// sweeping tx. +type RBFInfo struct { + // Txid is the txid of the sweeping tx. + Txid chainhash.Hash + + // FeeRate is the fee rate of the sweeping tx. + FeeRate chainfee.SatPerKWeight + + // Fee is the total fee of the sweeping tx. + Fee btcutil.Amount +} + // pendingInput is created when an input reaches the main loop for the first // time. It wraps the input and tracks all relevant state that is needed for // sweeping. @@ -188,6 +202,9 @@ type pendingInput struct { // lastFeeRate is the most recent fee rate used for this input within a // transaction broadcast to the network. lastFeeRate chainfee.SatPerKWeight + + // rbf records the RBF constraints. + rbf fn.Option[RBFInfo] } // parameters returns the sweep parameters for this input. @@ -1012,11 +1029,16 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord, } // Update the input's state. - // - // TODO: also calculate the fees and fee rate of this tx to - // prepare possible RBF. pi.state = StatePendingPublish + // Record the fees and fee rate of this tx to prepare possible + // RBF. + pi.rbf = fn.Some(RBFInfo{ + Txid: tx.TxHash(), + FeeRate: chainfee.SatPerKWeight(tr.FeeRate), + Fee: btcutil.Amount(tr.Fee), + }) + // Record another publish attempt. pi.publishAttempts++ @@ -1440,12 +1462,9 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) { params: input.params, } - // If the input is already spent in the mempool, update its state to - // StatePublished. - _, spent := s.mempoolLookup(outpoint) - if spent { - pi.state = StatePublished - } + // Try to find fee info for possible RBF if this input has already been + // spent. + pi = s.attachAvailableRBFInfo(pi) s.pendingInputs[outpoint] = pi log.Tracef("input %v added to pendingInputs", outpoint) @@ -1466,6 +1485,60 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) { pi.ntfnRegCancel = cancel } +// attachAvailableRBFInfo queries the mempool to see whether the given input +// has already been spent. If so, it will query the sweeper store to fetch the +// fee info of the spending transction, hence preparing for possible RBF. +func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { + // Check if we can find the spending tx of this input in mempool. + tx, spent := s.mempoolLookup(*pi.OutPoint()) + + // Exit early if it's not found. + // + // NOTE: this is not accurate for backends that don't support mempool + // lookup: + // - for neutrino we don't have a mempool. + // - for btcd below v0.24.1 we don't have `gettxspendingprevout`. + if !spent { + return pi + } + + // Otherwise the input is already spent in the mempool, update its + // state to StatePublished. + pi.state = StatePublished + + // We also need to update the RBF info for this input. If the sweeping + // transaction is broadcast by us, we can find the fee info in the + // sweeper store. + txid := tx.TxHash() + tr, err := s.cfg.Store.GetTx(txid) + + // If the tx is not found in the store, it means it's not broadcast by + // us, hence we can't find the fee info. This is fine as, later on when + // this tx is confirmed, we will remove the input from our + // pendingInputs. + if errors.Is(err, ErrTxNotFound) { + log.Warnf("Spending tx %v not found in sweeper store", txid) + return pi + } + + // Exit if we get an db error. + if err != nil { + log.Errorf("Unable to get tx %v from sweeper store: %v", + txid, err) + + return pi + } + + // Attach the fee info and return it. + pi.rbf = fn.Some(RBFInfo{ + Txid: txid, + Fee: btcutil.Amount(tr.Fee), + FeeRate: chainfee.SatPerKWeight(tr.FeeRate), + }) + + return pi +} + // handleExistingInput processes an input that is already known to the sweeper. // It will overwrite the params of the old input with the new ones. func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 6e43e0af2..da9357895 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -15,11 +15,13 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" - "github.com/lightningnetwork/lnd/lntest/mock" + lnmock "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -137,7 +139,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { Wallet: backend, TickerDuration: 100 * time.Millisecond, Store: store, - Signer: &mock.DummySigner{}, + Signer: &lnmock.DummySigner{}, GenSweepScript: func() ([]byte, error) { script := make([]byte, input.P2WPKHSize) script[0] = 0 @@ -2308,3 +2310,107 @@ func TestUpdateSweeperInputs(t *testing.T) { // Assert the sweeper inputs are as expected. require.Equal(expectedInputs, s.pendingInputs) } + +// TestAttachAvailableRBFInfo checks that the RBF info is attached to the +// pending input, along with the state being marked as published, when this +// input can be found both in mempool and the sweeper store. +func TestAttachAvailableRBFInfo(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Create a test outpoint. + op := wire.OutPoint{Index: 1} + + // Create a mock input. + testInput := &input.MockInput{} + testInput.On("OutPoint").Return(&op) + pi := &pendingInput{ + Input: testInput, + state: StateInit, + } + + // Create a mock mempool watcher and a mock sweeper store. + mockMempool := chainntnfs.NewMockMempoolWatcher() + mockStore := NewMockSweeperStore() + + // Create a mempool spend event to be returned by the mempool watcher. + spendChan := make(chan *chainntnfs.SpendDetail, 1) + spendEvent := &chainntnfs.MempoolSpendEvent{ + Spend: spendChan, + } + + // Mock the cancel subscription calls. + mockMempool.On("CancelMempoolSpendEvent", spendEvent) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: mockStore, + Mempool: mockMempool, + }) + + // First, mock the mempool to return an error. + dummyErr := errors.New("dummy err") + mockMempool.On("SubscribeMempoolSpent", op).Return(nil, dummyErr).Once() + + // Since the mempool lookup failed, we exepect the original pending + // input to stay unchanged. + result := s.attachAvailableRBFInfo(pi) + require.True(result.rbf.IsNone()) + require.Equal(StateInit, result.state) + + // Mock the mempool lookup to return a tx three times. + tx := &wire.MsgTx{} + mockMempool.On("SubscribeMempoolSpent", op).Return( + spendEvent, nil).Times(3).Run(func(_ mock.Arguments) { + // Eeac time the method is called, we send a tx to the spend + // channel. + spendChan <- &chainntnfs.SpendDetail{ + SpendingTx: tx, + } + }) + + // Mock the store to return an error saying the tx cannot be found. + mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once() + + // Although the db lookup failed, the pending input should have been + // marked as published without attaching any RBF info. + result = s.attachAvailableRBFInfo(pi) + require.True(result.rbf.IsNone()) + require.Equal(StatePublished, result.state) + + // Mock the store to return a db error. + mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() + + // Although the db lookup failed, the pending input should have been + // marked as published without attaching any RBF info. + result = s.attachAvailableRBFInfo(pi) + require.True(result.rbf.IsNone()) + require.Equal(StatePublished, result.state) + + // Mock the store to return a record. + tr := &TxRecord{ + Fee: 100, + FeeRate: 100, + } + mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once() + + // Call the method again. + result = s.attachAvailableRBFInfo(pi) + + // Assert that the RBF info is attached to the pending input. + rbfInfo := fn.Some(RBFInfo{ + Txid: tx.TxHash(), + Fee: btcutil.Amount(tr.Fee), + FeeRate: chainfee.SatPerKWeight(tr.FeeRate), + }) + require.Equal(rbfInfo, result.rbf) + + // Assert the state is updated. + require.Equal(StatePublished, result.state) + + // Assert mocked statements. + testInput.AssertExpectations(t) + mockMempool.AssertExpectations(t) + mockStore.AssertExpectations(t) +}