From 8c9ba327cc3b2a12860b25c4b78594caad53cb8e Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 08:35:40 +0800 Subject: [PATCH 01/19] sweep: add method `getSpentInputs` To track the input and its spending tx, which will be used later to detect unknown spends. --- sweep/fee_bumper.go | 72 +++++++++++++++------- sweep/fee_bumper_test.go | 125 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 175 insertions(+), 22 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 620ad7554..f215df71e 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -908,7 +908,7 @@ func (t *TxPublisher) processRecords() { // Check whether the inputs has been spent by a third party. // // NOTE: this check is only done for neutrino backend. - if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) { + if t.isThirdPartySpent(r) { failedRecords[requestID] = r // Move to the next record. @@ -1253,26 +1253,59 @@ func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool { // // NOTE: this check is only performed for neutrino backend as it has no // reliable way to tell a tx has been replaced. -func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, - inputs []input.Input) bool { - +func (t *TxPublisher) isThirdPartySpent(r *monitorRecord) bool { // Skip this check for if this is not neutrino backend. if !t.isNeutrinoBackend() { return false } + txid := r.tx.TxHash() + spends := t.getSpentInputs(r) + + // Iterate all the spending txns and check if they match the sweeping + // tx. + for op, spendingTx := range spends { + spendingTxID := spendingTx.TxHash() + + // If the spending tx is the same as the sweeping tx + // then we are good. + if spendingTxID == txid { + continue + } + + log.Warnf("Detected third party spent of output=%v "+ + "in tx=%v", op, spendingTx.TxHash()) + + return true + } + + return false +} + +// getSpentInputs performs a non-blocking read on the spending subscriptions to +// see whether any of the monitored inputs has been spent. A map of inputs with +// their spending txns are returned if found. +func (t *TxPublisher) getSpentInputs( + r *monitorRecord) map[wire.OutPoint]*wire.MsgTx { + + // Create a slice to record the inputs spent. + spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs)) + // Iterate all the inputs and check if they have been spent already. - for _, inp := range inputs { + for _, inp := range r.req.Inputs { op := inp.OutPoint() // For wallet utxos, the height hint is not set - we don't need // to monitor them for third party spend. + // + // TODO(yy): We need to properly lock wallet utxos before + // skipping this check as the same wallet utxo can be used by + // different sweeping txns. heightHint := inp.HeightHint() if heightHint == 0 { - log.Debugf("Skipped third party check for wallet "+ - "input %v", op) - - continue + heightHint = uint32(t.currentHeight.Load()) + log.Debugf("Checking wallet input %v using heightHint "+ + "%v", op, heightHint) } // If the input has already been spent after the height hint, a @@ -1283,7 +1316,8 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, if err != nil { log.Criticalf("Failed to register spend ntfn for "+ "input=%v: %v", op, err) - return false + + return nil } // Remove the subscription when exit. @@ -1294,28 +1328,24 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, case spend, ok := <-spendEvent.Spend: if !ok { log.Debugf("Spend ntfn for %v canceled", op) - return false - } - spendingTxID := spend.SpendingTx.TxHash() - - // If the spending tx is the same as the sweeping tx - // then we are good. - if spendingTxID == txid { continue } - log.Warnf("Detected third party spent of output=%v "+ - "in tx=%v", op, spend.SpendingTx.TxHash()) + spendingTx := spend.SpendingTx - return true + log.Debugf("Detected spent of input=%v in tx=%v", op, + spendingTx.TxHash()) + + spentInputs[op] = spendingTx // Move to the next input. default: + log.Tracef("Input %v not spent yet", op) } } - return false + return spentInputs } // calcCurrentConfTarget calculates the current confirmation target based on diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 0531dec8d..1d53432d9 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -55,7 +55,7 @@ func createTestInput(value int64, PubKey: testPubKey, }, }, - 0, + 1, nil, ) @@ -1776,3 +1776,126 @@ func TestHandleInitialBroadcastFail(t *testing.T) { require.Equal(t, 0, tp.records.Len()) require.Equal(t, 0, tp.subscriberChans.Len()) } + +// TestHasInputsSpent checks the expected outpoint:tx map is returned. +func TestHasInputsSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create mock inputs. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 1, + } + inp1 := &input.MockInput{} + heightHint1 := uint32(1) + defer inp1.AssertExpectations(t) + + op2 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 2, + } + inp2 := &input.MockInput{} + heightHint2 := uint32(2) + defer inp2.AssertExpectations(t) + + op3 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 3, + } + walletInp := &input.MockInput{} + heightHint3 := uint32(0) + defer walletInp.AssertExpectations(t) + + // We expect all the inputs to call OutPoint and HeightHint. + inp1.On("OutPoint").Return(op1).Once() + inp2.On("OutPoint").Return(op2).Once() + walletInp.On("OutPoint").Return(op3).Once() + inp1.On("HeightHint").Return(heightHint1).Once() + inp2.On("HeightHint").Return(heightHint2).Once() + walletInp.On("HeightHint").Return(heightHint3).Once() + + // We expect the normal inputs to call SignDesc. + pkScript1 := []byte{1} + sd1 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript1, + }, + } + inp1.On("SignDesc").Return(sd1).Once() + + pkScript2 := []byte{1} + sd2 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript2, + }, + } + inp2.On("SignDesc").Return(sd2).Once() + + pkScript3 := []byte{3} + sd3 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript3, + }, + } + walletInp.On("SignDesc").Return(sd3).Once() + + // Mock RegisterSpendNtfn. + // + // spendingTx1 is the tx spending op1. + spendingTx1 := &wire.MsgTx{} + se1 := createTestSpendEvent(spendingTx1) + m.notifier.On("RegisterSpendNtfn", + &op1, pkScript1, heightHint1).Return(se1, nil).Once() + + // Create the spending event that doesn't send an event. + se2 := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op2, pkScript2, heightHint2).Return(se2, nil).Once() + + se3 := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op3, pkScript3, heightHint3).Return(se3, nil).Once() + + // Prepare the test inputs. + inputs := []input.Input{inp1, inp2, walletInp} + + // Prepare the test record. + record := &monitorRecord{ + req: &BumpRequest{ + Inputs: inputs, + }, + } + + // Call the method under test. + result := tp.getSpentInputs(record) + + // Assert the expected map is created. + expected := map[wire.OutPoint]*wire.MsgTx{ + op1: spendingTx1, + } + require.Equal(t, expected, result) +} + +// createTestSpendEvent creates a SpendEvent which places the specified tx in +// the channel, which can be read by a spending subscriber. +func createTestSpendEvent(tx *wire.MsgTx) *chainntnfs.SpendEvent { + // Create a monitor record that's confirmed. + spendDetails := chainntnfs.SpendDetail{ + SpendingTx: tx, + } + spendChan1 := make(chan *chainntnfs.SpendDetail, 1) + spendChan1 <- &spendDetails + + // Create the spend events. + return &chainntnfs.SpendEvent{ + Spend: spendChan1, + Cancel: func() {}, + } +} From 61cec43951d59a78d08699e9b64c4bbc73c374bb Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 03:17:42 +0800 Subject: [PATCH 02/19] sweep: add a new event `TxUnknownSpend` --- sweep/fee_bumper.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index f215df71e..8dc30d230 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -95,6 +95,17 @@ const ( // TxConfirmed is sent when the tx is confirmed. TxConfirmed + // TxUnknownSpend is sent when at least one of the inputs is spent but + // not by the current sweeping tx, this can happen when, + // - a remote party has replaced our sweeping tx by spending the + // input(s), e.g., via the direct preimage spend on our outgoing HTLC. + // - a third party has replaced our sweeping tx, e.g., the anchor output + // after 16 blocks. + // - A previous sweeping tx has confirmed but the fee bumper is not + // aware of it, e.g., a restart happens right after the sweeping tx is + // broadcast and confirmed. + TxUnknownSpend + // TxFatal is sent when the inputs in this tx cannot be retried. Txns // will end up in this state if they have encountered a non-fee related // error, which means they cannot be retried with increased budget. @@ -115,6 +126,8 @@ func (e BumpEvent) String() string { return "Replaced" case TxConfirmed: return "Confirmed" + case TxUnknownSpend: + return "UnknownSpend" case TxFatal: return "Fatal" default: @@ -280,7 +293,8 @@ func (b *BumpResult) String() string { // Validate validates the BumpResult so it's safe to use. func (b *BumpResult) Validate() error { - isFailureEvent := b.Event == TxFailed || b.Event == TxFatal + isFailureEvent := b.Event == TxFailed || b.Event == TxFatal || + b.Event == TxUnknownSpend // Every result must have a tx except the fatal or failed case. if b.Tx == nil && !isFailureEvent { @@ -754,6 +768,11 @@ func (t *TxPublisher) removeResult(result *BumpResult) { log.Debugf("Removing monitor record=%v due to fatal err: %v", id, result.Err) + case TxUnknownSpend: + // Remove the record if there's an unknown spend. + log.Debugf("Removing monitor record=%v due unknown spent: "+ + "%v", id, result.Err) + // Do nothing if it's neither failed or confirmed. default: log.Tracef("Skipping record removal for id=%v, event=%v", id, @@ -1120,12 +1139,8 @@ func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { // Create a result that will be sent to the resultChan which is // listened by the caller. - // - // TODO(yy): create a new state `TxThirdPartySpent` to notify the - // sweeper to remove the input, hence moving the monitoring of inputs - // spent inside the fee bumper. result := &BumpResult{ - Event: TxFailed, + Event: TxUnknownSpend, Tx: r.tx, requestID: r.requestID, Err: ErrThirdPartySpent, From 50bc191feb3a863f6e7c8361491ec4d23a39cd9b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 04:45:10 +0800 Subject: [PATCH 03/19] sweep: handle unknown spent in `processRecords` This commit refactors the `processRecords` to always handle the inputs spent when processing the records. We now make sure to handle unknown spends for all backends (previously only neutrino), and rely solely on the spending notification to give us the onchain status of inputs. --- sweep/fee_bumper.go | 64 +++++--- sweep/fee_bumper_test.go | 322 +++++++++++++++++++++++++++++++-------- 2 files changed, 297 insertions(+), 89 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 8dc30d230..6ea7d43f1 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -897,8 +897,6 @@ func (t *TxPublisher) processRecords() { // failedRecords stores a map of records which has inputs being spent // by a third party. - // - // NOTE: this is only used for neutrino backend. failedRecords := make(map[uint64]*monitorRecord) // initialRecords stores a map of records which are being created and @@ -908,32 +906,55 @@ func (t *TxPublisher) processRecords() { // visitor is a helper closure that visits each record and divides them // into two groups. visitor := func(requestID uint64, r *monitorRecord) error { - if r.tx == nil { - initialRecords[requestID] = r - return nil - } + log.Tracef("Checking monitor recordID=%v", requestID) - log.Tracef("Checking monitor recordID=%v for tx=%v", requestID, - r.tx.TxHash()) + // Check whether the inputs have already been spent. + spends := t.getSpentInputs(r) - // If the tx is already confirmed, we can stop monitoring it. - if t.isConfirmed(r.tx.TxHash()) { + // If the any of the inputs has been spent, the record will be + // marked as failed or confirmed. + if len(spends) != 0 { + // When tx is nil, it means we haven't tried the initial + // broadcast yet the input is already spent. This could + // happen when the node shuts down, a previous sweeping + // tx confirmed, then the node comes back online and + // reoffers the inputs. Another case is the remote node + // spends the input quickly before we even attempt the + // sweep. In either case we will fail the record and let + // the sweeper handles it. + if r.tx == nil { + failedRecords[requestID] = r + return nil + } + + // Check whether the inputs has been spent by a unknown + // tx. + if t.isThirdPartySpent(r, spends) { + failedRecords[requestID] = r + + // Move to the next record. + return nil + } + + // The tx is ours, we can move it to the confirmed queue + // and stop monitoring it. confirmedRecords[requestID] = r // Move to the next record. return nil } - // Check whether the inputs has been spent by a third party. - // - // NOTE: this check is only done for neutrino backend. - if t.isThirdPartySpent(r) { - failedRecords[requestID] = r + // This is the first time we see this record, so we put it in + // the initial queue. + if r.tx == nil { + initialRecords[requestID] = r - // Move to the next record. return nil } + // We can only get here when the inputs are not spent and a + // previous sweeping tx has been attempted. In this case we will + // perform an RBF on it in the current block. feeBumpRecords[requestID] = r // Return nil to move to the next record. @@ -1265,17 +1286,10 @@ func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool { // isThirdPartySpent checks whether the inputs of the tx has already been spent // by a third party. When a tx is not confirmed, yet its inputs has been spent, // then it must be spent by a different tx other than the sweeping tx here. -// -// NOTE: this check is only performed for neutrino backend as it has no -// reliable way to tell a tx has been replaced. -func (t *TxPublisher) isThirdPartySpent(r *monitorRecord) bool { - // Skip this check for if this is not neutrino backend. - if !t.isNeutrinoBackend() { - return false - } +func (t *TxPublisher) isThirdPartySpent(r *monitorRecord, + spends map[wire.OutPoint]*wire.MsgTx) bool { txid := r.tx.TxHash() - spends := t.getSpentInputs(r) // Iterate all the spending txns and check if they match the sweeping // tx. diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 1d53432d9..582d737b4 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -1481,60 +1481,163 @@ func TestHandleFeeBumpTx(t *testing.T) { require.True(t, found) } -// TestProcessRecords validates processRecords behaves as expected. -func TestProcessRecords(t *testing.T) { +// TestProcessRecordsInitial validates processRecords behaves as expected when +// processing the initial broadcast. +func TestProcessRecordsInitial(t *testing.T) { t.Parallel() // Create a publisher using the mocks. tp, m := createTestPublisher(t) // Create testing objects. - requestID1 := uint64(1) - req1 := createTestBumpRequest() - tx1 := &wire.MsgTx{LockTime: 1} - txid1 := tx1.TxHash() + requestID := uint64(1) + req := createTestBumpRequest() + op := req.Inputs[0].OutPoint() - requestID2 := uint64(2) - req2 := createTestBumpRequest() - tx2 := &wire.MsgTx{LockTime: 2} - txid2 := tx2.TxHash() - - // Create a monitor record that's confirmed. - recordConfirmed := &monitorRecord{ - requestID: requestID1, - req: req1, - feeFunction: m.feeFunc, - tx: tx1, + // Mock RegisterSpendNtfn. + // + // Create the spending event that doesn't send an event. + se := &chainntnfs.SpendEvent{ + Cancel: func() {}, } - m.wallet.On("GetTransactionDetails", &txid1).Return( - &lnwallet.TransactionDetail{ - NumConfirmations: 1, - }, nil, - ).Once() + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() - // Create a monitor record that's not confirmed. We know it's not - // confirmed because the num of confirms is zero. - recordFeeBump := &monitorRecord{ - requestID: requestID2, - req: req2, - feeFunction: m.feeFunc, - tx: tx2, + // Create a monitor record that's broadcast the first time. + record := &monitorRecord{ + requestID: requestID, + req: req, } - m.wallet.On("GetTransactionDetails", &txid2).Return( - &lnwallet.TransactionDetail{ - NumConfirmations: 0, - }, nil, - ).Once() - m.wallet.On("BackEnd").Return("test-backend").Once() // Setup the initial publisher state by adding the records to the maps. - subscriberConfirmed := make(chan *BumpResult, 1) - tp.subscriberChans.Store(requestID1, subscriberConfirmed) - tp.records.Store(requestID1, recordConfirmed) + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) - subscriberReplaced := make(chan *BumpResult, 1) - tp.subscriberChans.Store(requestID2, subscriberReplaced) - tp.records.Store(requestID2, recordFeeBump) + // The following methods should only be called once when creating the + // initial broadcast tx. + // + // Mock the signer to always return a valid script. + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Once() + + // Mock the testmempoolaccept to return nil. + m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() + + // Mock the wallet to publish successfully. + m.wallet.On("PublishTransaction", + mock.Anything, mock.Anything).Return(nil).Once() + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // We expect the published tx to be notified back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxPublished. + require.Equal(t, TxPublished, result.Event) + + // Expect the tx to be set but not the replaced tx. + require.NotNil(t, result.Tx) + require.Nil(t, result.ReplacedTx) + + // No error should be set. + require.Nil(t, result.Err) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsInitialSpent validates processRecords behaves as expected +// when processing the initial broadcast when the input is spent. +func TestProcessRecordsInitialSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(tx) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's broadcast the first time. + record := &monitorRecord{ + requestID: requestID, + req: req, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // We expect the published tx to be notified back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxUnknownSpend. + require.Equal(t, TxUnknownSpend, result.Event) + + // Expect the tx and the replaced tx to be nil. + require.Nil(t, result.Tx) + require.Nil(t, result.ReplacedTx) + + // The error should be set. + require.ErrorIs(t, result.Err, ErrThirdPartySpent) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsFeeBump validates processRecords behaves as expected when +// processing fee bump records. +func TestProcessRecordsFeeBump(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + // + // Create the spending event that doesn't send an event. + se := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's not confirmed. We know it's not + // confirmed because the `SpendEvent` is empty. + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) // Create a test feerate and return it from the mock fee function. feerate := chainfee.SatPerKWeight(1000) @@ -1560,40 +1663,131 @@ func TestProcessRecords(t *testing.T) { // Call processRecords and expect the results are notified back. tp.processRecords() - // We expect two results to be received. One for the confirmed tx and - // one for the replaced tx. - // - // Check the confirmed tx result. - select { - case <-time.After(time.Second): - t.Fatal("timeout waiting for subscriberConfirmed") - - case result := <-subscriberConfirmed: - // We expect the result to be TxConfirmed. - require.Equal(t, TxConfirmed, result.Event) - require.Equal(t, tx1, result.Tx) - - // No error should be set. - require.Nil(t, result.Err) - require.Equal(t, requestID1, result.requestID) - } - - // Now check the replaced tx result. + // We expect the replaced tx to be notified back. select { case <-time.After(time.Second): t.Fatal("timeout waiting for subscriberReplaced") - case result := <-subscriberReplaced: + case result := <-subscriber: // We expect the result to be TxReplaced. require.Equal(t, TxReplaced, result.Event) // The new tx and old tx should be properly set. - require.NotEqual(t, tx2, result.Tx) - require.Equal(t, tx2, result.ReplacedTx) + require.NotEqual(t, tx, result.Tx) + require.Equal(t, tx, result.ReplacedTx) // No error should be set. require.Nil(t, result.Err) - require.Equal(t, requestID2, result.requestID) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsConfirmed validates processRecords behaves as expected when +// processing confirmed records. +func TestProcessRecordsConfirmed(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(tx) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's confirmed. + recordConfirmed := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, recordConfirmed) + + // Create a test feerate and return it from the mock fee function. + feerate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feerate) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // Check the confirmed tx result. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxConfirmed. + require.Equal(t, TxConfirmed, result.Event) + require.Equal(t, tx, result.Tx) + + // No error should be set. + require.Nil(t, result.Err) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsSpent validates processRecords behaves as expected when +// processing unknown spent records. +func TestProcessRecordsSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Create a unknown tx. + txUnknown := &wire.MsgTx{LockTime: 2} + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(txUnknown) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's spent by txUnknown. + recordConfirmed := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, recordConfirmed) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // Check the unknown tx result. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxUnknownSpend. + require.Equal(t, TxUnknownSpend, result.Event) + require.Equal(t, tx, result.Tx) + + // No error should be set. + require.ErrorIs(t, result.Err, ErrThirdPartySpent) + require.Equal(t, requestID, result.requestID) } } From 121116cff7ecfa8a583681a0eac2dadb8f062465 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 04:50:08 +0800 Subject: [PATCH 04/19] sweep: remove dead code and add better logging --- chainntnfs/mempool.go | 2 +- sweep/fee_bumper.go | 24 +++++++++--------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/chainntnfs/mempool.go b/chainntnfs/mempool.go index 4cd181df8..2e31751fc 100644 --- a/chainntnfs/mempool.go +++ b/chainntnfs/mempool.go @@ -211,7 +211,7 @@ func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx, // If found, save it to watchedInputs to notify the // subscriber later. - Log.Infof("Found input %s, spent in %s", op, txid) + Log.Debugf("Found input %s, spent in %s", op, txid) // Construct the spend details. details := &SpendDetail{ diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 6ea7d43f1..8d6d04fd6 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -972,7 +972,6 @@ func (t *TxPublisher) processRecords() { // For records that are confirmed, we'll notify the caller about this // result. for _, r := range confirmedRecords { - log.Debugf("Tx=%v is confirmed", r.tx.TxHash()) t.wg.Add(1) go t.handleTxConfirmed(r) } @@ -982,7 +981,6 @@ func (t *TxPublisher) processRecords() { // For records that are not confirmed, we perform a fee bump if needed. for _, r := range feeBumpRecords { - log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash()) t.wg.Add(1) go t.handleFeeBumpTx(r, currentHeight) } @@ -990,8 +988,6 @@ func (t *TxPublisher) processRecords() { // For records that are failed, we'll notify the caller about this // result. for _, r := range failedRecords { - log.Debugf("Tx=%v has inputs been spent by a third party, "+ - "failing it now", r.tx.TxHash()) t.wg.Add(1) go t.handleThirdPartySpent(r) } @@ -1004,6 +1000,8 @@ func (t *TxPublisher) processRecords() { func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { defer t.wg.Done() + log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash()) + // Create a result that will be sent to the resultChan which is // listened by the caller. result := &BumpResult{ @@ -1113,6 +1111,9 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { defer t.wg.Done() + log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(), + r.requestID) + oldTxid := r.tx.TxHash() // Get the current conf target for this record. @@ -1158,6 +1159,10 @@ func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { defer t.wg.Done() + log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+ + "bumper, failing it now:\n%v", r.requestID, + inputTypeSummary(r.req.Inputs)) + // Create a result that will be sent to the resultChan which is // listened by the caller. result := &BumpResult{ @@ -1272,17 +1277,6 @@ func (t *TxPublisher) createAndPublishTx( return fn.Some(*result) } -// isConfirmed checks the btcwallet to see whether the tx is confirmed. -func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool { - details, err := t.cfg.Wallet.GetTransactionDetails(&txid) - if err != nil { - log.Warnf("Failed to get tx details for %v: %v", txid, err) - return false - } - - return details.NumConfirmations > 0 -} - // isThirdPartySpent checks whether the inputs of the tx has already been spent // by a third party. When a tx is not confirmed, yet its inputs has been spent, // then it must be spent by a different tx other than the sweeping tx here. From db351e1908fd1a7c24814b5c85d7210ae66173d5 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 04:56:36 +0800 Subject: [PATCH 05/19] sweep: rename methods for clarity We now rename "third party" to "unknown" as the inputs can be spent via an older sweeping tx, a third party (anchor), or a remote party (pin). In fee bumper we don't have the info to distinguish the above cases, and leave them to be further handled by the sweeper as it has more context. --- sweep/fee_bumper.go | 41 +++++++++++++++++++--------------------- sweep/fee_bumper_test.go | 4 ++-- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 8d6d04fd6..5cd08fc88 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -40,9 +40,9 @@ var ( // preparation, usually due to the output being dust. ErrTxNoOutput = errors.New("tx has no output") - // ErrThirdPartySpent is returned when a third party has spent the - // input in the sweeping tx. - ErrThirdPartySpent = errors.New("third party spent the output") + // ErrUnknownSpent is returned when an unknown tx has spent an input in + // the sweeping tx. + ErrUnknownSpent = errors.New("unknown spend of input") ) var ( @@ -81,10 +81,6 @@ const ( // bumper. In either case the inputs in this tx should be retried with // either a different grouping strategy or an increased budget. // - // NOTE: We also send this event when there's a third party spend - // event, and the sweeper will handle cleaning this up once it's - // confirmed. - // // TODO(yy): Remove the above usage once we remove sweeping non-CPFP // anchors. TxFailed @@ -929,7 +925,7 @@ func (t *TxPublisher) processRecords() { // Check whether the inputs has been spent by a unknown // tx. - if t.isThirdPartySpent(r, spends) { + if t.isUnknownSpent(r, spends) { failedRecords[requestID] = r // Move to the next record. @@ -989,7 +985,7 @@ func (t *TxPublisher) processRecords() { // result. for _, r := range failedRecords { t.wg.Add(1) - go t.handleThirdPartySpent(r) + go t.handleUnknownSpent(r) } } @@ -1151,12 +1147,12 @@ func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { }) } -// handleThirdPartySpent is called when the inputs in an unconfirmed tx is -// spent. It will notify the subscriber then remove the record from the maps -// and send a TxFailed event to the subscriber. +// handleUnknownSpent is called when the inputs are spent by a unknown tx. It +// will notify the subscriber then remove the record from the maps and send a +// TxUnknownSpend event to the subscriber. // // NOTE: Must be run as a goroutine to avoid blocking on sending the result. -func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { +func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { defer t.wg.Done() log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+ @@ -1169,7 +1165,7 @@ func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { Event: TxUnknownSpend, Tx: r.tx, requestID: r.requestID, - Err: ErrThirdPartySpent, + Err: ErrUnknownSpent, } // Notify that this tx is confirmed and remove the record from the map. @@ -1277,10 +1273,11 @@ func (t *TxPublisher) createAndPublishTx( return fn.Some(*result) } -// isThirdPartySpent checks whether the inputs of the tx has already been spent -// by a third party. When a tx is not confirmed, yet its inputs has been spent, -// then it must be spent by a different tx other than the sweeping tx here. -func (t *TxPublisher) isThirdPartySpent(r *monitorRecord, +// isUnknownSpent checks whether the inputs of the tx has already been spent by +// a tx not known to us. When a tx is not confirmed, yet its inputs has been +// spent, then it must be spent by a different tx other than the sweeping tx +// here. +func (t *TxPublisher) isUnknownSpent(r *monitorRecord, spends map[wire.OutPoint]*wire.MsgTx) bool { txid := r.tx.TxHash() @@ -1290,14 +1287,14 @@ func (t *TxPublisher) isThirdPartySpent(r *monitorRecord, for op, spendingTx := range spends { spendingTxID := spendingTx.TxHash() - // If the spending tx is the same as the sweeping tx - // then we are good. + // If the spending tx is the same as the sweeping tx then we are + // good. if spendingTxID == txid { continue } - log.Warnf("Detected third party spent of output=%v "+ - "in tx=%v", op, spendingTx.TxHash()) + log.Warnf("Detected unknown spend of input=%v in tx=%v", op, + spendingTx.TxHash()) return true } diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 582d737b4..ea694281b 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -1597,7 +1597,7 @@ func TestProcessRecordsInitialSpent(t *testing.T) { require.Nil(t, result.ReplacedTx) // The error should be set. - require.ErrorIs(t, result.Err, ErrThirdPartySpent) + require.ErrorIs(t, result.Err, ErrUnknownSpent) require.Equal(t, requestID, result.requestID) } } @@ -1786,7 +1786,7 @@ func TestProcessRecordsSpent(t *testing.T) { require.Equal(t, tx, result.Tx) // No error should be set. - require.ErrorIs(t, result.Err, ErrThirdPartySpent) + require.ErrorIs(t, result.Err, ErrUnknownSpent) require.Equal(t, requestID, result.requestID) } } From 388183e173e86ff3ac861243219a5f106a3c7b48 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 19 Jan 2025 00:36:23 +0800 Subject: [PATCH 06/19] itest: add fee replacement test --- itest/list_on_test.go | 4 + itest/lnd_sweep_test.go | 253 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 257 insertions(+) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 2cf71feec..9bb611935 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -662,6 +662,10 @@ var allTestCases = []*lntest.TestCase{ Name: "invoice migration", TestFunc: testInvoiceMigration, }, + { + Name: "fee replacement", + TestFunc: testFeeReplacement, + }, } // appendPrefixed is used to add a prefix to each test name in the subtests diff --git a/itest/lnd_sweep_test.go b/itest/lnd_sweep_test.go index b911b4a27..377915412 100644 --- a/itest/lnd_sweep_test.go +++ b/itest/lnd_sweep_test.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" @@ -2081,3 +2082,255 @@ func testBumpForceCloseFee(ht *lntest.HarnessTest) { // This is needed to clean up the mempool. ht.MineBlocksAndAssertNumTxes(1, 2) } + +// testFeeReplacement tests that when a sweeping txns aggregates multiple +// outgoing HTLCs, and one of the outgoing HTLCs has been spent via the direct +// preimage path by the remote peer, the remaining HTLCs will be grouped again +// and swept immediately. +// +// Setup: +// 1. Fund Alice with 1 UTXOs - she only needs one for the funding process, +// 2. Fund Bob with 3 UTXOs - he needs one for the funding process, one for +// his CPFP anchor sweeping, and one for sweeping his outgoing HTLCs. +// 3. Create a linear network from Alice -> Bob -> Carol. +// 4. Alice pays two invoices to Carol, with Carol holding the settlement. +// 5. Bob goes offline. +// 6. Carol settles one of the invoices, so she can later spend Bob's outgoing +// HTLC via the direct preimage path. +// 7. Carol goes offline and Bob comes online. +// 8. Mine enough blocks so Bob will force close Bob=>Carol to claim his +// outgoing HTLCs. +// 9. Carol comes online, sweeps one of Bob's outgoing HTLCs and it confirms. +// 10. Bob creates a new sweeping tx to sweep his remaining HTLC with a +// previous fee rate. +// +// Test: +// 1. Bob will immediately sweeps his remaining outgoing HTLC given that the +// other one has been spent by Carol. +// 2. Bob's new sweeping tx will use the previous fee rate instead of +// initializing a new starting fee rate. +func testFeeReplacement(ht *lntest.HarnessTest) { + // Set the min relay feerate to be 10 sat/vbyte so the non-CPFP anchor + // is never swept. + // + // TODO(yy): delete this line once the normal anchor sweeping is + // removed. + ht.SetMinRelayFeerate(10_000) + + // Setup testing params. + // + // Invoice is 100k sats. + invoiceAmt := btcutil.Amount(100_000) + + // Alice will send two payments. + numPayments := 2 + + // Use the smallest CLTV so we can mine fewer blocks. + cltvDelta := routing.MinCLTVDelta + + // Prepare params. + cfg := []string{ + "--protocol.anchors", + // Use a small CLTV to mine less blocks. + fmt.Sprintf("--bitcoin.timelockdelta=%d", cltvDelta), + // Use a very large CSV, this way to_local outputs are never + // swept so we can focus on testing HTLCs. + fmt.Sprintf("--bitcoin.defaultremotedelay=%v", cltvDelta*10), + } + cfgs := [][]string{cfg, cfg, cfg} + + openChannelParams := lntest.OpenChannelParams{ + Amt: invoiceAmt * 100, + } + + // Create a three hop network: Alice -> Bob -> Carol. + _, nodes := ht.CreateSimpleNetwork(cfgs, openChannelParams) + + // Unwrap the results. + alice, bob, carol := nodes[0], nodes[1], nodes[2] + + // Bob needs two more wallet utxos: + // - when sweeping anchors, he needs one utxo for each sweep. + // - when sweeping HTLCs, he needs one utxo for each sweep. + numUTXOs := 2 + + // Bob should have enough wallet UTXOs here to sweep the HTLC in the + // end of this test. However, due to a known issue, Bob's wallet may + // report there's no UTXO available. For details, + // - https://github.com/lightningnetwork/lnd/issues/8786 + // + // TODO(yy): remove this extra UTXO once the issue is resolved. + numUTXOs++ + + // For neutrino backend, we need two more UTXOs for Bob to create his + // sweeping txns. + if ht.IsNeutrinoBackend() { + numUTXOs += 2 + } + + ht.FundNumCoins(bob, numUTXOs) + + // We also give Carol 2 coins to create her sweeping txns. + ht.FundNumCoins(carol, 2) + + // Create numPayments HTLCs on Bob's incoming and outgoing channels. + preimages := make([][]byte, 0, numPayments) + streams := make([]rpc.SingleInvoiceClient, 0, numPayments) + for i := 0; i < numPayments; i++ { + // Create the preimage. + var preimage lntypes.Preimage + copy(preimage[:], ht.Random32Bytes()) + payHashHold := preimage.Hash() + preimages = append(preimages, preimage[:]) + + // Subscribe the invoices. + stream := carol.RPC.SubscribeSingleInvoice(payHashHold[:]) + streams = append(streams, stream) + + // Carol create the hold invoice. + invoiceReqHold := &invoicesrpc.AddHoldInvoiceRequest{ + Value: int64(invoiceAmt), + CltvExpiry: finalCltvDelta, + Hash: payHashHold[:], + } + invoiceHold := carol.RPC.AddHoldInvoice(invoiceReqHold) + + // Let Alice pay the invoices. + req := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoiceHold.PaymentRequest, + TimeoutSeconds: 60, + FeeLimitMsat: noFeeLimitMsat, + } + + // Assert the payments are inflight. + ht.SendPaymentAndAssertStatus( + alice, req, lnrpc.Payment_IN_FLIGHT, + ) + + // Wait for Carol to mark invoice as accepted. There is a small + // gap to bridge between adding the htlc to the channel and + // executing the exit hop logic. + ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED) + } + + // At this point, all 3 nodes should now have an active channel with + // the created HTLCs pending on all of them. + // + // Alice should have numPayments outgoing HTLCs on channel Alice -> Bob. + ht.AssertNumActiveHtlcs(alice, numPayments) + + // Bob should have 2 * numPayments HTLCs, + // - numPayments incoming HTLCs on channel Alice -> Bob. + // - numPayments outgoing HTLCs on channel Bob -> Carol. + ht.AssertNumActiveHtlcs(bob, numPayments*2) + + // Carol should have numPayments incoming HTLCs on channel Bob -> Carol. + ht.AssertNumActiveHtlcs(carol, numPayments) + + // Suspend Bob so he won't get the preimage from Carol. + restartBob := ht.SuspendNode(bob) + + // Carol settles the first invoice. + carol.RPC.SettleInvoice(preimages[0]) + ht.AssertInvoiceState(streams[0], lnrpc.Invoice_SETTLED) + + // Carol goes offline so the preimage won't be sent to Bob. + restartCarol := ht.SuspendNode(carol) + + // Bob comes online. + require.NoError(ht, restartBob()) + + // We'll now mine enough blocks to trigger Bob to force close channel + // Bob->Carol due to his outgoing HTLC is about to timeout. With the + // default outgoing broadcast delta of zero, this will be the same + // height as the outgoing htlc's expiry height. + numBlocks := padCLTV(uint32( + finalCltvDelta - lncfg.DefaultOutgoingBroadcastDelta, + )) + ht.MineEmptyBlocks(int(numBlocks)) + + // Assert Bob's force closing tx has been broadcast. We should see two + // txns in the mempool: + // 1. Bob's force closing tx. + // 2. Bob's anchor sweeping tx CPFPing the force close tx. + ht.AssertForceCloseAndAnchorTxnsInMempool() + + // Mine a block to confirm Bob's force close tx and anchor sweeping tx + // so we can focus on testing his outgoing HTLCs. + ht.MineBlocksAndAssertNumTxes(1, 2) + + // Bob should have numPayments pending sweep for the outgoing HTLCs. + ht.AssertNumPendingSweeps(bob, numPayments) + + // Bob should have one sweeping tx in the mempool, which sweeps all his + // outgoing HTLCs. + outgoingSweep0 := ht.GetNumTxsFromMempool(1)[0] + + // We now mine one empty block so Bob will perform one fee bump, after + // which his sweeping tx should be updated with a new fee rate. We do + // this so we can test later when Bob sweeps his remaining HTLC, the new + // sweeping tx will start with the current fee rate. + // + // Calculate Bob's initial sweeping fee rate. + initialFeeRate := ht.CalculateTxFeeRate(outgoingSweep0) + + // Mine one block to trigger Bob's RBF. + ht.MineEmptyBlocks(1) + + // Make sure Bob's old sweeping tx has been removed from the mempool. + ht.AssertTxNotInMempool(outgoingSweep0.TxHash()) + + // Get the feerate of Bob's current sweeping tx. + outgoingSweep1 := ht.GetNumTxsFromMempool(1)[0] + currentFeeRate := ht.CalculateTxFeeRate(outgoingSweep1) + + // Assert the Bob has updated the fee rate. + require.Greater(ht, currentFeeRate, initialFeeRate) + + delta := currentFeeRate - initialFeeRate + + // Check the shape of the sweeping tx - we expect it to be + // 3-input-3-output as a wallet utxo is used and a required output is + // made. + require.Len(ht, outgoingSweep1.TxIn, numPayments+1) + require.Len(ht, outgoingSweep1.TxOut, numPayments+1) + + // Restart Carol, once she is online, she will try to settle the HTLCs + // via the direct preimage spend. + require.NoError(ht, restartCarol()) + + // Carol should have 1 incoming HTLC and 1 anchor output to sweep. + ht.AssertNumPendingSweeps(carol, 2) + + // Assert Bob's sweeping tx has been replaced by Carol's. + ht.AssertTxNotInMempool(outgoingSweep1.TxHash()) + carolSweepTx := ht.GetNumTxsFromMempool(1)[0] + + // Assume the miner is now happy with Carol's fee, and it gets included + // in the next block. + ht.MineBlockWithTx(carolSweepTx) + + // Upon receiving the above block, Bob should immediately create a + // sweeping tx and broadcast it using the remaining outgoing HTLC. + // + // Bob should have numPayments-1 pending sweep for the outgoing HTLCs. + ht.AssertNumPendingSweeps(bob, numPayments-1) + + // Assert Bob immediately sweeps his remaining HTLC with the previous + // fee rate. + outgoingSweep2 := ht.GetNumTxsFromMempool(1)[0] + + // Calculate the fee rate. + feeRate := ht.CalculateTxFeeRate(outgoingSweep2) + + // We expect the current fee rate to be equal to the last fee rate he + // used plus the delta, as we expect the fee rate to stay on the initial + // line given by his fee function. + expectedFeeRate := currentFeeRate + delta + require.InEpsilonf(ht, uint64(expectedFeeRate), + uint64(feeRate), 0.02, "want %d, got %d in tx=%v", + currentFeeRate, feeRate, outgoingSweep2.TxHash()) + + // Finally, clean the mempol. + ht.MineBlocksAndAssertNumTxes(1, 1) +} From 2f1205a394b7105dfb7c94d1aa8bc981302f421d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 05:49:19 +0800 Subject: [PATCH 07/19] sweep: start tracking inputs spent by unknown tx This commit adds a new field `InputsSpent` to the `BumpResult` so they can be used to track inputs spent by txns not recoginized by the fee bumper. --- sweep/fee_bumper.go | 80 ++++++++++++++++++++++++++++++++++++---- sweep/fee_bumper_test.go | 10 +++++ 2 files changed, 83 insertions(+), 7 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 5cd08fc88..93aee912d 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -273,6 +273,10 @@ type BumpResult struct { // Err is the error that occurred during the broadcast. Err error + // SpentInputs are the inputs spent by another tx which caused the + // current tx to be failed. + SpentInputs map[wire.OutPoint]*wire.MsgTx + // requestID is the ID of the request that created this record. requestID uint64 } @@ -812,6 +816,10 @@ type monitorRecord struct { // outpointToTxIndex is a map of outpoint to tx index. outpointToTxIndex map[wire.OutPoint]int + + // spentInputs are the inputs spent by another tx which caused the + // current tx failed. + spentInputs map[wire.OutPoint]*wire.MsgTx } // Start starts the publisher by subscribing to block epoch updates and kicking @@ -910,6 +918,9 @@ func (t *TxPublisher) processRecords() { // If the any of the inputs has been spent, the record will be // marked as failed or confirmed. if len(spends) != 0 { + // Attach the spending txns. + r.spentInputs = spends + // When tx is nil, it means we haven't tried the initial // broadcast yet the input is already spent. This could // happen when the node shuts down, a previous sweeping @@ -1159,16 +1170,71 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { "bumper, failing it now:\n%v", r.requestID, inputTypeSummary(r.req.Inputs)) - // Create a result that will be sent to the resultChan which is - // listened by the caller. + // Create a result that will be sent to the resultChan which is listened + // by the caller. result := &BumpResult{ - Event: TxUnknownSpend, - Tx: r.tx, - requestID: r.requestID, - Err: ErrUnknownSpent, + Event: TxUnknownSpend, + Tx: r.tx, + requestID: r.requestID, + Err: ErrUnknownSpent, + SpentInputs: r.spentInputs, } - // Notify that this tx is confirmed and remove the record from the map. + // Get the fee function, which will be used to decided the next fee rate + // to use if the sweeper decides to retry sweeping this input. + feeFunc := r.feeFunction + + // When the record is failed before the initial broadcast is attempted, + // it will have a nil fee func. In this case, we'll create the fee func + // here. + // + // NOTE: Since the current record is failed and will be deleted, we + // don't need to update the record on this fee function. We only need + // the fee rate data so the sweeper can pick up where we left off. + if feeFunc == nil { + f, err := t.initializeFeeFunction(r.req) + // TODO(yy): The only error we would receive here is when the + // pkScript is not recognized by the weightEstimator. What we + // should do instead is to check the pkScript immediately after + // receiving a sweep request so we don't need to check it again, + // which will also save us from error checking from several + // callsites. + if err != nil { + log.Errorf("Failed to create fee func for record %v: "+ + "%v", r.requestID, err) + + // Overwrite the event and error so the sweeper will + // remove this input. + result.Event = TxFatal + result.Err = err + + // Notify the sweeper about this result in the end. + t.handleResult(result) + + return + } + + feeFunc = f + } + + // Since the sweeping tx has been replaced by another party's tx, we + // missed this block window to increase its fee rate. To make sure the + // fee rate stays in the initial line, we now ask the fee function to + // give us the next fee rate as if the sweeping tx were RBFed. This new + // fee rate will be used as the starting fee rate if the upper system + // decides to continue sweeping the rest of the inputs. + _, err := feeFunc.Increment() + if err != nil { + // The fee function has reached its max position - nothing we + // can do here other than letting the user increase the budget. + log.Errorf("Failed to calculate the next fee rate for "+ + "Record(%v): %v", r.requestID, err) + } + + // Attach the new fee rate to be used for the next sweeping attempt. + result.FeeRate = feeFunc.FeeRate() + + // Notify the sweeper about this result in the end. t.handleResult(result) } diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index ea694281b..a11f4b7f5 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -1772,6 +1772,13 @@ func TestProcessRecordsSpent(t *testing.T) { tp.subscriberChans.Store(requestID, subscriber) tp.records.Store(requestID, recordConfirmed) + // Mock the fee function to increase feerate. + m.feeFunc.On("Increment").Return(true, nil).Once() + + // Create a test feerate and return it from the mock fee function. + feerate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feerate) + // Call processRecords and expect the results are notified back. tp.processRecords() @@ -1785,6 +1792,9 @@ func TestProcessRecordsSpent(t *testing.T) { require.Equal(t, TxUnknownSpend, result.Event) require.Equal(t, tx, result.Tx) + // We expect the fee rate to be updated. + require.Equal(t, feerate, result.FeeRate) + // No error should be set. require.ErrorIs(t, result.Err, ErrUnknownSpent) require.Equal(t, requestID, result.requestID) From 42818949dc26b801a23b408889ebb00d976ec1ee Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 06:05:19 +0800 Subject: [PATCH 08/19] sweep: retry sweeping inputs upon `TxUnknownSpend` We now start handling `TxUnknownSpend` in our sweeper to make sure the failed inputs are retried when possible. --- sweep/sweeper.go | 144 +++++++++++++++++++++++++ sweep/sweeper_test.go | 243 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 387 insertions(+) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8f74a6da9..e1e870828 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1847,6 +1847,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { case TxReplaced: return s.handleBumpEventTxReplaced(r) + // There are inputs being spent in a tx which the fee bumper doesn't + // understand. We will remove the tx from the sweeper db and mark the + // inputs as swept. + case TxUnknownSpend: + s.handleBumpEventTxUnknownSpend(r) + // There's a fatal error in creating the tx, we will remove the tx from // the sweeper db and mark the inputs as failed. case TxFatal: @@ -1878,3 +1884,141 @@ func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool { return found } + +// markInputSwept marks the given input as swept by the tx. It will also notify +// all the subscribers of this input. +func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) { + log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(), + inp.state) + + inp.state = Swept + + // Signal result channels. + s.signalResult(inp, Result{ + Tx: tx, + }) + + // Remove all other inputs in this exclusive group. + if inp.params.ExclusiveGroup != nil { + s.removeExclusiveGroup(*inp.params.ExclusiveGroup) + } +} + +// handleUnknownSpendTx takes an input and its spending tx. If the spending tx +// cannot be found in the sweeper store, the input will be marked as fatal, +// otherwise it will be marked as swept. +func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { + op := inp.OutPoint() + txid := tx.TxHash() + + isOurTx, err := s.cfg.Store.IsOurTx(txid) + if err != nil { + log.Errorf("Cannot determine if tx %v is ours: %v", txid, err) + return + } + + // If this is our tx, it means it's a previous sweeping tx that got + // confirmed, which could happen when a restart happens during the + // sweeping process. + if isOurTx { + log.Debugf("Found our sweeping tx %v, marking input %v as "+ + "swept", txid, op) + + // We now use the spending tx to update the state of the inputs. + s.markInputSwept(inp, tx) + + return + } + + // Since the input is spent by others, we now mark it as fatal and won't + // be retried. + s.markInputFatal(inp, ErrRemoteSpend) + + log.Debugf("Removing descendant txns invalidated by (txid=%v): %v", + txid, lnutils.SpewLogClosure(tx)) + + // Construct a map of the inputs this transaction spends. + spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn)) + for _, txIn := range tx.TxIn { + spentInputs[txIn.PreviousOutPoint] = struct{}{} + } + + err = s.removeConflictSweepDescendants(spentInputs) + if err != nil { + log.Warnf("unable to remove descendant transactions "+ + "due to tx %v: ", txid) + } +} + +// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is +// unknown to the fee bumper. In the case when the sweeping tx has been replaced +// by another party with their tx being confirmed. It will retry sweeping the +// "good" inputs once the "bad" ones are kicked out. +func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) { + // Mark the inputs as publish failed, which means they will be retried + // later. + s.markInputsPublishFailed(r.set) + + // Get all the inputs that are not spent in the current sweeping tx. + spentInputs := r.result.SpentInputs + + // Create a slice to track inputs to be retried. + inputsToRetry := make([]input.Input, 0, len(r.set.Inputs())) + + // Iterate all the inputs found in this bump and mark the ones spent by + // the third party as failed. The rest of inputs will then be updated + // with a new fee rate and be retried immediately. + for _, inp := range r.set.Inputs() { + op := inp.OutPoint() + input, ok := s.inputs[op] + + // Wallet inputs are not tracked so we will not find them from + // the inputs map. + if !ok { + log.Debugf("Skipped marking input: %v not found in "+ + "pending inputs", op) + + continue + } + + // Check whether this input has been spent, if so we mark it as + // fatal or swept based on whether this is one of our previous + // sweeping txns, then move to the next. + tx, spent := spentInputs[op] + if spent { + s.handleUnknownSpendTx(input, tx) + + continue + } + + log.Debugf("Input(%v): updating params: starting fee rate "+ + "[%v -> %v], immediate [%v -> true]", op, + input.params.StartingFeeRate, r.result.FeeRate, + input.params.Immediate) + + // Update the input using the fee rate specified from the + // BumpResult, which should be the starting fee rate to use for + // the next sweeping attempt. + input.params.StartingFeeRate = fn.Some(r.result.FeeRate) + input.params.Immediate = true + inputsToRetry = append(inputsToRetry, input) + } + + // Exit early if there are no inputs to be retried. + if len(inputsToRetry) == 0 { + return + } + + log.Debugf("Retry sweeping inputs with updated params: %v", + inputTypeSummary(inputsToRetry)) + + // Get the latest inputs, which should put the PublishFailed inputs back + // to the sweeping queue. + inputs := s.updateSweeperInputs() + + // Immediately sweep the remaining inputs - the previous inputs should + // now be swept with the updated StartingFeeRate immediately. We may + // also include more inputs in the new sweeping tx if new ones with the + // same deadline are offered. + s.sweepPendingInputs(inputs) +} diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 40b25425d..da49c601e 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1199,3 +1199,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) { err = s.handleBumpEventTxFatal(resp) rt.NoError(err) } + +// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly +// marks an input as swept given the tx is ours. +func TestHandleUnknownSpendTxOurs(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx` +// correctly marks an input as fatal given the tx is not ours. +func TestHandleInputSpendTxThirdParty(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return false when calling IsOurTx. + store.On("IsOurTx", txid).Return(false, nil).Once() + + // Mock `ListSweeps` to return an empty slice as we are testing the + // workflow here, not the method `removeConflictSweepDescendants`. + store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Fatal, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs +// are failed due to them being spent by another party. +func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp}) + + op := inp.OutPoint() + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the +// inputs are retried after the bad inputs are filtered out. +func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock wallet and aggregator. + wallet := &MockWallet{} + defer wallet.AssertExpectations(t) + + aggregator := &mockUtxoAggregator{} + defer aggregator.AssertExpectations(t) + + publisher := &MockBumper{} + defer publisher.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Wallet: wallet, + Aggregator: aggregator, + Publisher: publisher, + GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] { + //nolint:ll + return fn.Ok(lnwallet.AddrWithKey{ + DeliveryAddress: testPubKey.SerializeCompressed(), + }) + }, + NoDeadlineConfTarget: uint32(DefaultDeadlineDelta), + Store: store, + }) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create mock inputs - inp1 will be the bad input, and inp2 will be + // retried. + inp1 := createMockInput(t, s, PendingPublish) + inp2 := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp1, inp2}) + + op1 := inp1.OutPoint() + op2 := inp2.OutPoint() + + inp2.On("RequiredLockTime").Return( + uint32(s.currentHeight), false).Once() + inp2.On("BlocksToMaturity").Return(uint32(0)).Once() + inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once() + + // Create a testing tx that spends inp1. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op1}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op1: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Mock the aggregator to return an empty slice as we are not testing + // the actual sweeping behavior. + aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{}) + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the first input is removed. + require.NotContains(t, s.inputs, op1) + + // Assert the state of the input is updated. + require.Equal(t, PublishFailed, s.inputs[op2].state) +} From db8319d70b6f8e10c4559c64ef0df46f16357e27 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Feb 2025 18:22:55 +0800 Subject: [PATCH 09/19] sweep: add method `handleReplacementTxError` This is a minor refactor so the `createAndPublishTx` flow becomes more clear, also prepares for the following commit where we start to handle missing inputs. --- sweep/fee_bumper.go | 88 +++++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 93aee912d..38983f8fe 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -1254,46 +1254,10 @@ func (t *TxPublisher) createAndPublishTx( // directly here. sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction) - // If the error is fee related, we will return no error and let the fee - // bumper retry it at next block. - // - // NOTE: we can check the RBF error here and ask the fee function to - // recalculate the fee rate. However, this would defeat the purpose of - // using a deadline based fee function: - // - if the deadline is far away, there's no rush to RBF the tx. - // - if the deadline is close, we expect the fee function to give us a - // higher fee rate. If the fee rate cannot satisfy the RBF rules, it - // means the budget is not enough. - if errors.Is(err, chain.ErrInsufficientFee) || - errors.Is(err, lnwallet.ErrMempoolFee) { - - log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) - return fn.None[BumpResult]() - } - - // If the error is not fee related, we will return a `TxFailed` event - // so this input can be retried. + // If there's an error creating the replacement tx, we need to abort the + // flow and handle it. if err != nil { - // If the tx doesn't not have enought budget, we will return a - // result so the sweeper can handle it by re-clustering the - // utxos. - if errors.Is(err, ErrNotEnoughBudget) { - log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), - err) - } else { - // Otherwise, an unexpected error occurred, we will - // fail the tx and let the sweeper retry the whole - // process. - log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), - err) - } - - return fn.Some(BumpResult{ - Event: TxFailed, - Tx: oldTx, - Err: err, - requestID: r.requestID, - }) + return t.handleReplacementTxError(r, oldTx, err) } // The tx has been created without any errors, we now register a new @@ -1786,3 +1750,49 @@ func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey, return txFee, changeOutsOpt, locktimeOpt, nil } + +// handleReplacementTxError handles the error returned from creating the +// replacement tx. It returns a BumpResult that should be notified to the +// sweeper. +func (t *TxPublisher) handleReplacementTxError(r *monitorRecord, + oldTx *wire.MsgTx, err error) fn.Option[BumpResult] { + + // If the error is fee related, we will return no error and let the fee + // bumper retry it at next block. + // + // NOTE: we can check the RBF error here and ask the fee function to + // recalculate the fee rate. However, this would defeat the purpose of + // using a deadline based fee function: + // - if the deadline is far away, there's no rush to RBF the tx. + // - if the deadline is close, we expect the fee function to give us a + // higher fee rate. If the fee rate cannot satisfy the RBF rules, it + // means the budget is not enough. + if errors.Is(err, chain.ErrInsufficientFee) || + errors.Is(err, lnwallet.ErrMempoolFee) { + + log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + return fn.None[BumpResult]() + } + + // If the error is not fee related, we will return a `TxFailed` event + // so this input can be retried. + result := fn.Some(BumpResult{ + Event: TxFailed, + Tx: oldTx, + Err: err, + requestID: r.requestID, + }) + + // If the tx doesn't not have enought budget, we will return a result so + // the sweeper can handle it by re-clustering the utxos. + if errors.Is(err, ErrNotEnoughBudget) { + log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err) + return result + } + + // Otherwise, an unexpected error occurred, we will log an error and let + // the sweeper retry the whole process. + log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + + return result +} From f614e7aed97d037b440ad40814b80f991180ebcb Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Feb 2025 19:10:25 +0800 Subject: [PATCH 10/19] sweep: add `createUnknownSpentBumpResult` A minor refactor to break the method `handleUnknownSpent` into two steps, which prepares the following commit where we start handling missing inputs. --- sweep/fee_bumper.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 38983f8fe..67a3663df 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -1170,6 +1170,19 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { "bumper, failing it now:\n%v", r.requestID, inputTypeSummary(r.req.Inputs)) + // Create a result that will be sent to the resultChan which is listened + // by the caller. + result := t.createUnknownSpentBumpResult(r) + + // Notify the sweeper about this result in the end. + t.handleResult(result) +} + +// createUnknownSpentBumpResult creates and returns a BumpResult given the +// monitored record has unknown spends. +func (t *TxPublisher) createUnknownSpentBumpResult( + r *monitorRecord) *BumpResult { + // Create a result that will be sent to the resultChan which is listened // by the caller. result := &BumpResult{ @@ -1208,10 +1221,7 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { result.Event = TxFatal result.Err = err - // Notify the sweeper about this result in the end. - t.handleResult(result) - - return + return result } feeFunc = f @@ -1234,8 +1244,7 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { // Attach the new fee rate to be used for the next sweeping attempt. result.FeeRate = feeFunc.FeeRate() - // Notify the sweeper about this result in the end. - t.handleResult(result) + return result } // createAndPublishTx creates a new tx with a higher fee rate and publishes it From 4f469de18e2db23430642003c5846806c4cfdc71 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Feb 2025 19:42:23 +0800 Subject: [PATCH 11/19] sweep: refactor `handleInitialTxError` and `createAndCheckTx` This commit refactors `handleInitialTxError` and `createAndCheckTx` to take a `monitorRecord` param, which prepares for the following commit where we start handling missing inputs. --- sweep/fee_bumper.go | 35 +++++++++++++++++------------------ sweep/fee_bumper_test.go | 7 ++++++- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 67a3663df..e0a67658c 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -544,7 +544,7 @@ func (t *TxPublisher) createRBFCompliantTx( for { // Create a new tx with the given fee rate and check its // mempool acceptance. - sweepCtx, err := t.createAndCheckTx(r.req, f) + sweepCtx, err := t.createAndCheckTx(r) switch { case err == nil: @@ -607,8 +607,9 @@ func (t *TxPublisher) createRBFCompliantTx( // script, and the fee rate. In addition, it validates the tx's mempool // acceptance before returning a tx that can be published directly, along with // its fee. -func (t *TxPublisher) createAndCheckTx(req *BumpRequest, - f FeeFunction) (*sweepTxCtx, error) { +func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { + req := r.req + f := r.feeFunction // Create the sweep tx with max fee rate of 0 as the fee function // guarantees the fee rate used here won't exceed the max fee rate. @@ -1025,27 +1026,31 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { // handleInitialTxError takes the error from `initializeTx` and decides the // bump event. It will construct a BumpResult and handles it. -func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { - // We now decide what type of event to send. - var event BumpEvent +func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) { + // Create a bump result to be sent to the sweeper. + result := &BumpResult{ + Err: err, + requestID: r.requestID, + } + // We now decide what type of event to send. switch { // When the error is due to a dust output, we'll send a TxFailed so // these inputs can be retried with a different group in the next // block. case errors.Is(err, ErrTxNoOutput): - event = TxFailed + result.Event = TxFailed // When the error is due to budget being used up, we'll send a TxFailed // so these inputs can be retried with a different group in the next // block. case errors.Is(err, ErrMaxPosition): - event = TxFailed + result.Event = TxFailed // When the error is due to zero fee rate delta, we'll send a TxFailed // so these inputs can be retried in the next block. case errors.Is(err, ErrZeroFeeRateDelta): - event = TxFailed + result.Event = TxFailed // Otherwise this is not a fee-related error and the tx cannot be // retried. In that case we will fail ALL the inputs in this tx, which @@ -1055,13 +1060,7 @@ func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { // TODO(yy): Find out which input is causing the failure and fail that // one only. default: - event = TxFatal - } - - result := &BumpResult{ - Event: event, - Err: err, - requestID: requestID, + result.Event = TxFatal } t.handleResult(result) @@ -1089,7 +1088,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { log.Errorf("Initial broadcast failed: %v", err) // We now handle the initialization error and exit. - t.handleInitialTxError(r.requestID, err) + t.handleInitialTxError(r, err) return } @@ -1261,7 +1260,7 @@ func (t *TxPublisher) createAndPublishTx( // NOTE: The fee function is expected to have increased its returned // fee rate after calling the SkipFeeBump method. So we can use it // directly here. - sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction) + sweepCtx, err := t.createAndCheckTx(r) // If there's an error creating the replacement tx, we need to abort the // flow and handle it. diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index a11f4b7f5..64695dbfe 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -504,9 +504,14 @@ func TestCreateAndCheckTx(t *testing.T) { for _, tc := range testCases { tc := tc + r := &monitorRecord{ + req: tc.req, + feeFunction: m.feeFunc, + } + t.Run(tc.name, func(t *testing.T) { // Call the method under test. - _, err := tp.createAndCheckTx(tc.req, m.feeFunc) + _, err := tp.createAndCheckTx(r) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) From b184afe22723a161e0ae5b04f1ee7849b9b22a6e Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Feb 2025 21:36:25 +0800 Subject: [PATCH 12/19] sweep: handle missing inputs during fee bumping This commit handles the case when the input is missing during the RBF process, which could happen when the bumped tx has inputs being spent by a third party. Normally we should be able to catch the spend early via the spending notification and never attempt to fee bump the record. However, due to the possible race between block notification and spend notification, this cannot be guaranteed. Thus, we need to handle the case during the RBF when seeing a `ErrMissingInputs`, which can only happen when the inputs are spent by others. --- contractcourt/anchor_resolver.go | 2 +- sweep/fee_bumper.go | 77 ++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/contractcourt/anchor_resolver.go b/contractcourt/anchor_resolver.go index f0f6e2f5a..765a0f6ca 100644 --- a/contractcourt/anchor_resolver.go +++ b/contractcourt/anchor_resolver.go @@ -108,7 +108,7 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) { // Anchor was swept by someone else. This is possible after the // 16 block csv lock. - case sweep.ErrRemoteSpend: + case sweep.ErrRemoteSpend, sweep.ErrInputMissing: c.log.Warnf("our anchor spent by someone else") outcome = channeldb.ResolverOutcomeUnclaimed diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index e0a67658c..934d2b653 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -43,6 +43,10 @@ var ( // ErrUnknownSpent is returned when an unknown tx has spent an input in // the sweeping tx. ErrUnknownSpent = errors.New("unknown spend of input") + + // ErrInputMissing is returned when a given input no longer exists, + // e.g., spending from an orphan tx. + ErrInputMissing = errors.New("input no longer exists") ) var ( @@ -653,10 +657,68 @@ func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { return sweepCtx, nil } + // If the inputs are spent by another tx, we will exit with the latest + // sweepCtx and an error. + if errors.Is(err, chain.ErrMissingInputs) { + log.Debugf("Tx %v missing inputs, it's likely the input has "+ + "been spent by others", sweepCtx.tx.TxHash()) + + // Make sure to update the record with the latest attempt. + t.updateRecord(r, sweepCtx) + + return sweepCtx, ErrInputMissing + } + return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w", sweepCtx.tx.TxHash(), err) } +// handleMissingInputs handles the case when the chain backend reports back a +// missing inputs error, which could happen when one of the input has been spent +// in another tx, or the input is referencing an orphan. When the input is +// spent, it will be handled via the TxUnknownSpend flow by creating a +// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned. +func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult { + // Get the spending txns. + spends := t.getSpentInputs(r) + + // Attach the spending txns. + r.spentInputs = spends + + // If there are no spending txns found and the input is missing, the + // input is referencing an orphan tx that's no longer valid, e.g., the + // spending the anchor output from the remote commitment after the local + // commitment has confirmed. In this case we will mark it as fatal and + // exit. + if len(spends) == 0 { + log.Warnf("Failing record=%v: found orphan inputs: %v\n", + r.requestID, inputTypeSummary(r.req.Inputs)) + + // Create a result that will be sent to the resultChan which is + // listened by the caller. + result := &BumpResult{ + Event: TxFatal, + Tx: r.tx, + requestID: r.requestID, + Err: ErrInputMissing, + } + + return result + } + + // Check that the spending tx matches the sweeping tx - given that the + // current sweeping tx has been failed due to missing inputs, the + // spending tx must be a different tx, thus it should NOT be matched. We + // perform a sanity check here to catch the unexpected state. + if !t.isUnknownSpent(r, spends) { + log.Errorf("Sweeping tx %v has missing inputs, yet the "+ + "spending tx is the sweeping tx itself: %v", + r.tx.TxHash(), r.spentInputs) + } + + return t.createUnknownSpentBumpResult(r) +} + // broadcast takes a monitored tx and publishes it to the network. Prior to the // broadcast, it will subscribe the tx's confirmation notification and attach // the event channel to the record. Any broadcast-related errors will not be @@ -1052,6 +1114,11 @@ func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) { case errors.Is(err, ErrZeroFeeRateDelta): result.Event = TxFailed + // When there are missing inputs, we'll create a TxUnknownSpend bump + // result here so the rest of the inputs can be retried. + case errors.Is(err, ErrInputMissing): + result = t.handleMissingInputs(r) + // Otherwise this is not a fee-related error and the tx cannot be // retried. In that case we will fail ALL the inputs in this tx, which // means they will be removed from the sweeper and never be tried @@ -1782,6 +1849,16 @@ func (t *TxPublisher) handleReplacementTxError(r *monitorRecord, return fn.None[BumpResult]() } + // At least one of the inputs is missing, which means it has already + // been spent by another tx and confirmed. In this case we will handle + // it by returning a TxUnknownSpend bump result. + if errors.Is(err, ErrInputMissing) { + log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err) + bumpResult := t.handleMissingInputs(r) + + return fn.Some(*bumpResult) + } + // If the error is not fee related, we will return a `TxFailed` event // so this input can be retried. result := fn.Some(BumpResult{ From 4bd1a344b95c1c42079cbc1d8223b6cea1d96d68 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 19:28:08 +0800 Subject: [PATCH 13/19] sweep: signal tx in `markInputFatal` This commit adds the failed tx to the result when marking the input as fatal, which is used in the commit resolver when handling revoked outputs. --- sweep/sweeper.go | 15 ++++++++++----- sweep/sweeper_test.go | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index e1e870828..5b0601b14 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1267,7 +1267,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { ) if err != nil { err := fmt.Errorf("wait for spend: %w", err) - s.markInputFatal(pi, err) + s.markInputFatal(pi, nil, err) return err } @@ -1482,12 +1482,17 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) { // markInputFatal marks the given input as fatal and won't be retried. It // will also notify all the subscribers of this input. -func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) { +func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx, + err error) { + log.Errorf("Failed to sweep input: %v, error: %v", pi, err) pi.state = Fatal - s.signalResult(pi, Result{Err: err}) + s.signalResult(pi, Result{ + Tx: tx, + Err: err, + }) } // updateSweeperInputs updates the sweeper's internal state and returns a map @@ -1819,7 +1824,7 @@ func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) { continue } - s.markInputFatal(input, err) + s.markInputFatal(input, nil, err) } } @@ -1932,7 +1937,7 @@ func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { // Since the input is spent by others, we now mark it as fatal and won't // be retried. - s.markInputFatal(inp, ErrRemoteSpend) + s.markInputFatal(inp, tx, ErrRemoteSpend) log.Debugf("Removing descendant txns invalidated by (txid=%v): %v", txid, lnutils.SpewLogClosure(tx)) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index da49c601e..450654473 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -596,7 +596,7 @@ func TestMarkInputFailed(t *testing.T) { } // Call the method under test. - s.markInputFatal(pi, errors.New("dummy error")) + s.markInputFatal(pi, nil, errors.New("dummy error")) // Assert the state is updated. require.Equal(t, Fatal, pi.state) From 74161f0d57a1e4b8edf27360b55da24817ebcea5 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 11:14:12 +0800 Subject: [PATCH 14/19] sweep: make sure recovered inputs are retried Previously, when a given input is found spent in the mempool, we'd mark it as Published and never offer it to the fee bumper. This is dangerous as the input will never be fee bumped. We now fix it by always initializing the input with state Init, and only use mempool to check for fee and fee rate. This changes the current restart behavior - as previously when a sweeping tx is broadcast, the node shuts down, when it starts again, the input will be offered to the sweeper again, but not to the fee bumper, which means the sweeping tx will stay in the mempool with the last-tried fee rate. After this change, after a restart, the input will be swept again, and the fee bumper will monitor its status. The restart will also behave like a fee bump if there's already an existing sweeping tx in the mempool. --- sweep/sweeper.go | 37 ++++++++++++++++++++++--------------- sweep/sweeper_test.go | 28 ++++++++++------------------ 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 5b0601b14..730a75586 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1230,21 +1230,26 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { } // This is a new input, and we want to query the mempool to see if this - // input has already been spent. If so, we'll start the input with - // state Published and attach the RBFInfo. - state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint()) + // input has already been spent. If so, we'll start the input with the + // RBFInfo. + rbfInfo := s.decideRBFInfo(input.input.OutPoint()) // Create a new pendingInput and initialize the listeners slice with // the passed in result channel. If this input is offered for sweep // again, the result channel will be appended to this slice. pi = &SweeperInput{ - state: state, + state: Init, listeners: []chan Result{input.resultChan}, Input: input.input, params: input.params, rbf: rbfInfo, } + // Set the starting fee rate if a previous sweeping tx is found. + rbfInfo.WhenSome(func(info RBFInfo) { + pi.params.StartingFeeRate = fn.Some(info.FeeRate) + }) + // Set the acutal deadline height. pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr( s.calculateDefaultDeadline(pi), @@ -1277,13 +1282,12 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { return nil } -// decideStateAndRBFInfo queries the mempool to see whether the given input has -// already been spent. If so, the state Published will be returned, otherwise -// state Init. When spent, it will query the sweeper store to fetch the fee -// info of the spending transction, and construct an RBFInfo based on it. -// Suppose an error occurs, fn.None is returned. -func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( - SweepState, fn.Option[RBFInfo]) { +// decideRBFInfo queries the mempool to see whether the given input has already +// been spent. When spent, it will query the sweeper store to fetch the fee info +// of the spending transction, and construct an RBFInfo based on it. Suppose an +// error occurs, fn.None is returned. +func (s *UtxoSweeper) decideRBFInfo( + op wire.OutPoint) fn.Option[RBFInfo] { // Check if we can find the spending tx of this input in mempool. txOption := s.mempoolLookup(op) @@ -1301,7 +1305,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( // - for neutrino we don't have a mempool. // - for btcd below v0.24.1 we don't have `gettxspendingprevout`. if tx == nil { - return Init, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Otherwise the input is already spent in the mempool, so eventually @@ -1313,12 +1317,15 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( txid := tx.TxHash() tr, err := s.cfg.Store.GetTx(txid) + log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(), + op) + // If the tx is not found in the store, it means it's not broadcast by // us, hence we can't find the fee info. This is fine as, later on when // this tx is confirmed, we will remove the input from our inputs. if errors.Is(err, ErrTxNotFound) { log.Warnf("Spending tx %v not found in sweeper store", txid) - return Published, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Exit if we get an db error. @@ -1326,7 +1333,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( log.Errorf("Unable to get tx %v from sweeper store: %v", txid, err) - return Published, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Prepare the fee info and return it. @@ -1336,7 +1343,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) - return Published, rbf + return rbf } // handleExistingInput processes an input that is already known to the sweeper. diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 450654473..ff48d9e8b 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -497,10 +497,9 @@ func TestUpdateSweeperInputs(t *testing.T) { require.Equal(expectedInputs, s.inputs) } -// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are -// returned based on whether this input can be found both in mempool and the -// sweeper store. -func TestDecideStateAndRBFInfo(t *testing.T) { +// TestDecideRBFInfo checks that the expected RBFInfo is returned based on +// whether this input can be found both in mempool and the sweeper store. +func TestDecideRBFInfo(t *testing.T) { t.Parallel() require := require.New(t) @@ -524,11 +523,9 @@ func TestDecideStateAndRBFInfo(t *testing.T) { mockMempool.On("LookupInputMempoolSpend", op).Return( fn.None[wire.MsgTx]()).Once() - // Since the mempool lookup failed, we exepect state Init and no - // RBFInfo. - state, rbf := s.decideStateAndRBFInfo(op) + // Since the mempool lookup failed, we expect no RBFInfo. + rbf := s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Init, state) // Mock the mempool lookup to return a tx three times as we are calling // attachAvailableRBFInfo three times. @@ -539,19 +536,17 @@ func TestDecideStateAndRBFInfo(t *testing.T) { // Mock the store to return an error saying the tx cannot be found. mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once() - // Although the db lookup failed, we expect the state to be Published. - state, rbf = s.decideStateAndRBFInfo(op) + // The db lookup failed, we expect no RBFInfo. + rbf = s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Published, state) // Mock the store to return a db error. dummyErr := errors.New("dummy error") mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() - // Although the db lookup failed, we expect the state to be Published. - state, rbf = s.decideStateAndRBFInfo(op) + // The db lookup failed, we expect no RBFInfo. + rbf = s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Published, state) // Mock the store to return a record. tr := &TxRecord{ @@ -561,7 +556,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) { mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once() // Call the method again. - state, rbf = s.decideStateAndRBFInfo(op) + rbf = s.decideRBFInfo(op) // Assert that the RBF info is returned. rbfInfo := fn.Some(RBFInfo{ @@ -570,9 +565,6 @@ func TestDecideStateAndRBFInfo(t *testing.T) { FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) require.Equal(rbfInfo, rbf) - - // Assert the state is updated. - require.Equal(Published, state) } // TestMarkInputFatal checks that the input is marked as expected. From c61f781be7ff07a892eeba6460498e21aea61a67 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 17:04:29 +0800 Subject: [PATCH 15/19] itest: split up force close tests So we can focus on testing normal flow vs persistence flow. --- itest/lnd_channel_force_close_test.go | 978 ++++++++++++++++++++------ 1 file changed, 752 insertions(+), 226 deletions(-) diff --git a/itest/lnd_channel_force_close_test.go b/itest/lnd_channel_force_close_test.go index 6fcd8dd86..24868a019 100644 --- a/itest/lnd_channel_force_close_test.go +++ b/itest/lnd_channel_force_close_test.go @@ -6,7 +6,6 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/lnrpc" @@ -27,6 +26,15 @@ var channelForceCloseTestCases = []*lntest.TestCase{ Name: "simple taproot", TestFunc: testChannelForceClosureSimpleTaproot, }, + { + Name: "anchor restart", + TestFunc: testChannelForceClosureAnchorRestart, + }, + { + Name: "simple taproot restart", + TestFunc: testChannelForceClosureSimpleTaprootRestart, + }, + { Name: "wrong preimage", TestFunc: testFailingChannel, @@ -87,9 +95,7 @@ func testChannelForceClosureSimpleTaproot(ht *lntest.HarnessTest) { // commitment transaction, a transaction sweeping the local CSV delayed output, // a transaction sweeping the CSV delayed 2nd-layer htlcs outputs, and n htlc // timeout transactions, where n is the number of payments Alice attempted -// to send to Carol. This test includes several restarts to ensure that the -// transaction output states are persisted throughout the forced closure -// process. +// to send to Carol. func runChannelForceClosureTest(ht *lntest.HarnessTest, cfgs [][]string, params lntest.OpenChannelParams) { @@ -131,6 +137,597 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, ht.SendPaymentAssertInflight(alice, req) } + // Once the HTLC has cleared, all the nodes in our mini network should + // show that the HTLC has been locked in. + ht.AssertNumActiveHtlcs(alice, numInvoices) + ht.AssertNumActiveHtlcs(carol, numInvoices) + + // Fetch starting height of this test so we can compute the block + // heights we expect certain events to take place. + curHeight := int32(ht.CurrentHeight()) + + // Using the current height of the chain, derive the relevant heights + // for sweeping two-stage htlcs. + var ( + startHeight = uint32(curHeight) + commCsvMaturityHeight = startHeight + 1 + defaultCSV + htlcExpiryHeight = padCLTV(startHeight + finalCltvDelta) + htlcCsvMaturityHeight = padCLTV( + startHeight + finalCltvDelta + 1 + defaultCSV, + ) + ) + + aliceChan := ht.QueryChannelByChanPoint(alice, chanPoint) + require.NotZero(ht, aliceChan.NumUpdates, + "alice should see at least one update to her channel") + + // Now that the channel is open and we have unsettled htlcs, + // immediately execute a force closure of the channel. This will also + // assert that the commitment transaction was immediately broadcast in + // order to fulfill the force closure request. + ht.CloseChannelAssertPending(alice, chanPoint, true) + + // Now that the channel has been force closed, it should show up in the + // PendingChannels RPC under the waiting close section. + waitingClose := ht.AssertChannelWaitingClose(alice, chanPoint) + + // Immediately after force closing, all of the funds should be in + // limbo. + require.NotZero(ht, waitingClose.LimboBalance, + "all funds should still be in limbo") + + // Create a map of outpoints to expected resolutions for alice and + // carol which we will add reports to as we sweep outputs. + var ( + aliceReports = make(map[string]*lnrpc.Resolution) + carolReports = make(map[string]*lnrpc.Resolution) + ) + + // We expect to see Alice's force close tx in the mempool. + ht.AssertNumTxsInMempool(1) + + // Mine a block which should confirm the commitment transaction + // broadcast as a result of the force closure. Once mined, we also + // expect Alice's anchor sweeping tx being published. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Assert Alice's has one pending anchor output - because she doesn't + // have incoming HTLCs, her outgoing HTLC won't have a deadline, thus + // she won't use the anchor to perform CPFP. + aliceAnchor := ht.AssertNumPendingSweeps(alice, 1)[0] + require.Equal(ht, aliceAnchor.Outpoint.TxidStr, + waitingClose.Commitments.LocalTxid) + + // Now that the commitment has been confirmed, the channel should be + // marked as force closed. + forceClose := ht.AssertChannelPendingForceClose(alice, chanPoint) + + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + err := checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + require.NoError(ht, err) + + // None of our outputs have been swept, so they should all be in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) + + // Carol should offer her commit and anchor outputs to the sweeper. + sweepTxns := ht.AssertNumPendingSweeps(carol, 2) + + // Identify Carol's pending sweeps. + var carolAnchor, carolCommit = sweepTxns[0], sweepTxns[1] + if carolAnchor.AmountSat != uint32(anchorSize) { + carolCommit = carolAnchor + } + + // Carol's sweep tx should be in the mempool already, as her output is + // not timelocked. This sweep tx should spend her to_local output as + // the anchor output is not economical to spend. + carolTx := ht.GetNumTxsFromMempool(1)[0] + + // Carol's sweeping tx should have 1-input-1-output shape. + require.Len(ht, carolTx.TxIn, 1) + require.Len(ht, carolTx.TxOut, 1) + + // Calculate the total fee Carol paid. + totalFeeCarol := ht.CalculateTxFee(carolTx) + + // Carol's anchor report won't be created since it's uneconomical to + // sweep. So we expect to see only the commit sweep report. + op := fmt.Sprintf("%v:%v", carolCommit.Outpoint.TxidStr, + carolCommit.Outpoint.OutputIndex) + carolReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_COMMIT, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + Outpoint: carolCommit.Outpoint, + AmountSat: uint64(pushAmt), + SweepTxid: carolTx.TxHash().String(), + } + + // We also expect Carol to broadcast her sweeping tx which spends her + // commit and anchor outputs. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Alice should still have the anchor sweeping request. + ht.AssertNumPendingSweeps(alice, 1) + + // Alice should see the channel in her set of pending force closed + // channels with her funds still in limbo. + forceClose = ht.AssertChannelPendingForceClose(alice, chanPoint) + + // Make a record of the balances we expect for alice and carol. + aliceBalance := forceClose.Channel.LocalBalance + + // Get the closing txid. + txid, err := chainhash.NewHashFromStr(forceClose.ClosingTxid) + require.NoError(ht, err) + closingTxID := txid + + // At this point, the nursery should show that the commitment output has + // 3 block left before its CSV delay expires. In total, we have mined + // exactly defaultCSV blocks, so the htlc outputs should also reflect + // that this many blocks have passed. + err = checkCommitmentMaturity(forceClose, commCsvMaturityHeight, 3) + require.NoError(ht, err) + + // All funds should still be shown in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) + + // Generate two blocks, which should cause the CSV delayed output from + // the commitment txn to expire. + ht.MineEmptyBlocks(2) + + // At this point, the CSV will expire in the next block, meaning that + // the output should be offered to the sweeper. + sweeps := ht.AssertNumPendingSweeps(alice, 2) + commitSweep, anchorSweep := sweeps[0], sweeps[1] + if commitSweep.AmountSat < anchorSweep.AmountSat { + commitSweep = anchorSweep + } + + // Alice's sweeping transaction should now be broadcast. So we fetch the + // node's mempool to ensure it has been properly broadcast. + sweepTx := ht.GetNumTxsFromMempool(1)[0] + sweepTxid := sweepTx.TxHash() + + // The sweep transaction's inputs spending should be from the commitment + // transaction. + for _, txIn := range sweepTx.TxIn { + require.Equal(ht, &txIn.PreviousOutPoint.Hash, closingTxID, + "sweep transaction not spending from commit") + } + + // Alice's anchor report won't be created since it's uneconomical to + // sweep. We expect a resolution which spends our commit output. + op = fmt.Sprintf("%v:%v", commitSweep.Outpoint.TxidStr, + commitSweep.Outpoint.OutputIndex) + aliceReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_COMMIT, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + SweepTxid: sweepTxid.String(), + Outpoint: commitSweep.Outpoint, + AmountSat: uint64(aliceBalance), + } + + // Check that we can find the commitment sweep in our set of known + // sweeps, using the simple transaction id ListSweeps output. + ht.AssertSweepFound(alice, sweepTxid.String(), false, 0) + + // Next, we mine an additional block which should include the sweep + // transaction as the input scripts and the sequence locks on the + // inputs should be properly met. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Update current height + curHeight = int32(ht.CurrentHeight()) + + // checkForceClosedChannelNumHtlcs verifies that a force closed channel + // has the proper number of htlcs. + checkPendingChannelNumHtlcs := func( + forceClose lntest.PendingForceClose) error { + + if len(forceClose.PendingHtlcs) != numInvoices { + return fmt.Errorf("expected force closed channel to "+ + "have %d pending htlcs, found %d instead", + numInvoices, len(forceClose.PendingHtlcs)) + } + + return nil + } + + err = wait.NoError(func() error { + // Now that the commit output has been fully swept, check to + // see that the channel remains open for the pending htlc + // outputs. + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + // The commitment funds will have been recovered after the + // commit txn was included in the last block. The htlc funds + // will be shown in limbo. + err := checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + + err = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, + int32(htlcExpiryHeight)-curHeight, + ) + if err != nil { + return err + } + + if forceClose.LimboBalance == 0 { + return fmt.Errorf("expected funds in limbo, found 0") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout checking pending force close channel") + + // Compute the height preceding that which will cause the htlc CLTV + // timeouts will expire. The outputs entered at the same height as the + // output spending from the commitment txn, so we must deduct the + // number of blocks we have generated since adding it to the nursery, + // and take an additional block off so that we end up one block shy of + // the expiry height, and add the block padding. + cltvHeightDelta := int(htlcExpiryHeight - uint32(curHeight) - 1) + + // Advance the blockchain until just before the CLTV expires, nothing + // exciting should have happened during this time. + ht.MineEmptyBlocks(cltvHeightDelta) + + // Alice should now see the channel in her set of pending force closed + // channels with one pending HTLC. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + // We should now be at the block just before the utxo nursery + // will attempt to broadcast the htlc timeout transactions. + err = checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + err = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, 1, + ) + if err != nil { + return err + } + + // Now that our commitment confirmation depth has been + // surpassed, we should now see a non-zero recovered balance. + // All htlc outputs are still left in limbo, so it should be + // non-zero as well. + if forceClose.LimboBalance == 0 { + return errors.New("htlc funds should still be in limbo") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + // Now, generate the block which will cause Alice to offer the presigned + // htlc timeout txns to the sweeper. + ht.MineEmptyBlocks(1) + + // Since Alice had numInvoices (6) htlcs extended to Carol before force + // closing, we expect Alice to broadcast an htlc timeout txn for each + // one. We also expect Alice to still have her anchor since it's not + // swept. + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Wait for them all to show up in the mempool + htlcTx := ht.GetNumTxsFromMempool(1)[0] + htlcTxid := htlcTx.TxHash() + + // Retrieve each htlc timeout txn from the mempool, and ensure it is + // well-formed. The sweeping tx should spend all the htlc outputs. + // + // NOTE: We also add 1 output as the outgoing HTLC needs a wallet utxo + // to pay for its fee. + numInputs := 6 + 1 + + var htlcLessFees uint64 + + // Ensure the htlc transaction has the expected number of inputs. + require.Len(ht, htlcTx.TxIn, numInputs, "num inputs mismatch") + + // The number of outputs should be the same. + require.Len(ht, htlcTx.TxOut, numInputs, "num outputs mismatch") + + // Ensure all the htlc transaction inputs are spending from the + // commitment transaction, except if this is an extra input added to pay + // for fees for anchor channels. + for _, txIn := range htlcTx.TxIn { + if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { + // This was an extra input added to pay fees, + // continue to the next one. + continue + } + + // For each htlc timeout transaction, we expect a resolver + // report recording this on chain resolution for both alice and + // carol. + outpoint := txIn.PreviousOutPoint + resolutionOutpoint := &lnrpc.OutPoint{ + TxidBytes: outpoint.Hash[:], + TxidStr: outpoint.Hash.String(), + OutputIndex: outpoint.Index, + } + + // We expect alice to have a timeout tx resolution with an + // amount equal to the payment amount. + aliceReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, + SweepTxid: htlcTxid.String(), + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + + // We expect carol to have a resolution with an incoming htlc + // timeout which reflects the full amount of the htlc. It has no + // spend tx, because carol stops monitoring the htlc once it has + // timed out. + carolReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: "", + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + } + + // We record the htlc amount less fees here, so that we know what value + // to expect for the second stage of our htlc resolution. + htlcLessFees = uint64(htlcTx.TxOut[0].Value) + + // Generate a block that mines the htlc timeout txns. Doing so now + // activates the 2nd-stage CSV delayed outputs. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Advance the chain until just before the 2nd-layer CSV delays expire. + // For anchor channels this is one block earlier. + curHeight = int32(ht.CurrentHeight()) + ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", curHeight, + htlcCsvMaturityHeight) + numBlocks := int(htlcCsvMaturityHeight - uint32(curHeight) - 1) + ht.MineEmptyBlocks(numBlocks) + + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Now that the channel has been fully swept, it should no longer show + // incubated, check to see that Alice's node still reports the channel + // as pending force closed. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + if forceClose.LimboBalance == 0 { + return fmt.Errorf("htlc funds should still be in limbo") + } + + return checkPendingChannelNumHtlcs(forceClose) + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Wait for the single sweep txn to appear in the mempool. + htlcSweepTx := ht.GetNumTxsFromMempool(1)[0] + htlcSweepTxid := htlcSweepTx.TxHash() + + // Ensure the htlc sweep transaction only has one input for each htlc + // Alice extended before force closing. + require.Len(ht, htlcSweepTx.TxIn, numInvoices, + "htlc transaction has wrong num of inputs") + require.Len(ht, htlcSweepTx.TxOut, 1, + "htlc sweep transaction should have one output") + + // Ensure that each output spends from exactly one htlc timeout output. + for _, txIn := range htlcSweepTx.TxIn { + op := txIn.PreviousOutPoint + + // Since we have now swept our htlc timeout tx, we expect to + // have timeout resolutions for each of our htlcs. + aliceReports[op.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: htlcSweepTxid.String(), + Outpoint: &lnrpc.OutPoint{ + TxidBytes: op.Hash[:], + TxidStr: op.Hash.String(), + OutputIndex: op.Index, + }, + AmountSat: htlcLessFees, + } + } + + // Check that we can find the htlc sweep in our set of sweeps using + // the verbose output of the listsweeps output. + ht.AssertSweepFound(alice, htlcSweepTxid.String(), true, 0) + + // Now that the channel has been fully swept, it should no longer show + // incubated, check to see that Alice's node still reports the channel + // as pending force closed. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + err := checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + + err = checkPendingHtlcStageAndMaturity( + forceClose, 2, htlcCsvMaturityHeight-1, 0, + ) + if err != nil { + return err + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + // Generate the final block that sweeps all htlc funds into the user's + // wallet, and make sure the sweep is in this block. + block := ht.MineBlocksAndAssertNumTxes(1, 1)[0] + ht.AssertTxInBlock(block, htlcSweepTxid) + + // Now that the channel has been fully swept, it should no longer show + // up within the pending channels RPC. + err = wait.NoError(func() error { + ht.AssertNumPendingForceClose(alice, 0) + + // In addition to there being no pending channels, we verify + // that pending channels does not report any money still in + // limbo. + pendingChanResp := alice.RPC.PendingChannels() + if pendingChanResp.TotalLimboBalance != 0 { + return errors.New("no user funds should be left " + + "in limbo after incubation") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout checking limbo balance") + + // At this point, Carol should now be aware of her new immediately + // spendable on-chain balance, as it was Alice who broadcast the + // commitment transaction. + carolBalResp = carol.RPC.WalletBalance() + + // Carol's expected balance should be its starting balance plus the + // push amount sent by Alice and minus the miner fee paid. + carolExpectedBalance := btcutil.Amount(carolStartingBalance) + + pushAmt - totalFeeCarol + + require.Equal(ht, carolExpectedBalance, + btcutil.Amount(carolBalResp.ConfirmedBalance), + "carol's balance is incorrect") + + // Finally, we check that alice and carol have the set of resolutions + // we expect. + assertReports(ht, alice, chanPoint, aliceReports) + assertReports(ht, carol, chanPoint, carolReports) +} + +// testChannelForceClosureAnchor runs `runChannelForceClosureTestRestart` with +// anchor channels. +func testChannelForceClosureAnchorRestart(ht *lntest.HarnessTest) { + // Create a simple network: Alice -> Carol, using anchor channels. + // + // Prepare params. + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + PushAmt: pushAmt, + CommitmentType: lnrpc.CommitmentType_ANCHORS, + } + + cfg := node.CfgAnchor + cfgCarol := append([]string{"--hodl.exit-settle"}, cfg...) + cfgs := [][]string{cfg, cfgCarol} + + runChannelForceClosureTestRestart(ht, cfgs, openChannelParams) +} + +// testChannelForceClosureSimpleTaprootRestart runs +// `runChannelForceClosureTestRestart` with simple taproot channels. +func testChannelForceClosureSimpleTaprootRestart(ht *lntest.HarnessTest) { + // Create a simple network: Alice -> Carol, using simple taproot + // channels. + // + // Prepare params. + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + PushAmt: pushAmt, + // If the channel is a taproot channel, then we'll need to + // create a private channel. + // + // TODO(roasbeef): lift after G175 + CommitmentType: lnrpc.CommitmentType_SIMPLE_TAPROOT, + Private: true, + } + + cfg := node.CfgSimpleTaproot + cfgCarol := append([]string{"--hodl.exit-settle"}, cfg...) + cfgs := [][]string{cfg, cfgCarol} + + runChannelForceClosureTestRestart(ht, cfgs, openChannelParams) +} + +// runChannelForceClosureTestRestart performs a test to exercise the behavior of +// "force" closing a channel or unilaterally broadcasting the latest local +// commitment state on-chain. The test creates a new channel between Alice and +// Carol, then force closes the channel after some cursory assertions. Within +// the test, a total of 3 + n transactions will be broadcast, representing the +// commitment transaction, a transaction sweeping the local CSV delayed output, +// a transaction sweeping the CSV delayed 2nd-layer htlcs outputs, and n htlc +// timeout transactions, where n is the number of payments Alice attempted +// to send to Carol. This test includes several restarts to ensure that the +// transaction output states are persisted throughout the forced closure +// process. +func runChannelForceClosureTestRestart(ht *lntest.HarnessTest, + cfgs [][]string, params lntest.OpenChannelParams) { + + // Skip this test for neutrino, as it cannot create RBF-compliant + // sweeping txns due to no access to `testmempoolaccept`. For neutrino + // nodes connected to a btcd node, they can rely on a reject msg being + // sent back to decide how do perform the fee replacement. + // + // TODO(yy): refactor `createRBFCompliantTx` to also work for neutrino. + if ht.IsNeutrinoBackend() { + ht.Skipf("skipping persistence test for neutrino backend") + } + + const ( + numInvoices = 6 + commitFeeRate = 20000 + ) + + ht.SetFeeEstimate(commitFeeRate) + + // Create a three hop network: Alice -> Carol. + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, params) + alice, carol := nodes[0], nodes[1] + chanPoint := chanPoints[0] + + // We need one additional UTXO for sweeping the remote anchor. + if ht.IsNeutrinoBackend() { + ht.FundCoins(btcutil.SatoshiPerBitcoin, alice) + } + + // Before we start, obtain Carol's current wallet balance, we'll check + // to ensure that at the end of the force closure by Alice, Carol + // recognizes his new on-chain output. + carolBalResp := carol.RPC.WalletBalance() + carolStartingBalance := carolBalResp.ConfirmedBalance + + // Send payments from Alice to Carol, since Carol is htlchodl mode, the + // htlc outputs should be left unsettled, and should be swept by the + // utxo nursery. + carolPubKey := carol.PubKey[:] + for i := 0; i < numInvoices; i++ { + req := &routerrpc.SendPaymentRequest{ + Dest: carolPubKey, + Amt: int64(paymentAmt), + PaymentHash: ht.Random32Bytes(), + FinalCltvDelta: finalCltvDelta, + FeeLimitMsat: noFeeLimitMsat, + } + ht.SendPaymentAssertInflight(alice, req) + } + // Once the HTLC has cleared, all the nodes n our mini network should // show that the HTLC has been locked in. ht.AssertNumActiveHtlcs(alice, numInvoices) @@ -202,32 +799,18 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Now that the commitment has been confirmed, the channel should be // marked as force closed. - err := wait.NoError(func() error { - forceClose := ht.AssertChannelPendingForceClose( - alice, chanPoint, - ) + forceClose := ht.AssertChannelPendingForceClose(alice, chanPoint) - // Now that the channel has been force closed, it should now - // have the height and number of blocks to confirm populated. - err := checkCommitmentMaturity( - forceClose, commCsvMaturityHeight, int32(defaultCSV), - ) - if err != nil { - return err - } + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + err := checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + require.NoError(ht, err) - // None of our outputs have been swept, so they should all be - // in limbo. - if forceClose.LimboBalance == 0 { - return errors.New("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - return errors.New("no funds should be recovered") - } - - return nil - }, defaultTimeout) - require.NoError(ht, err, "timeout while checking force closed channel") + // None of our outputs have been swept, so they should all be in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) // The following restart is intended to ensure that outputs from the // force close commitment transaction have been persisted once the @@ -241,7 +824,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Identify Carol's pending sweeps. var carolAnchor, carolCommit = sweepTxns[0], sweepTxns[1] if carolAnchor.AmountSat != uint32(anchorSize) { - carolAnchor, carolCommit = carolCommit, carolAnchor + carolCommit = carolAnchor } // Carol's sweep tx should be in the mempool already, as her output is @@ -331,7 +914,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Generate two blocks, which should cause the CSV delayed output from // the commitment txn to expire. - ht.MineBlocks(2) + ht.MineEmptyBlocks(2) // At this point, the CSV will expire in the next block, meaning that // the output should be offered to the sweeper. @@ -341,54 +924,55 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, commitSweep, anchorSweep = anchorSweep, commitSweep } - // Mine one block and the sweeping transaction should now be broadcast. - // So we fetch the node's mempool to ensure it has been properly - // broadcast. - sweepingTXID := ht.AssertNumTxsInMempool(1)[0] + // Alice's sweeping transaction should now be broadcast. So we fetch the + // node's mempool to ensure it has been properly broadcast. + sweepTx := ht.GetNumTxsFromMempool(1)[0] + sweepTxid := sweepTx.TxHash() // Fetch the sweep transaction, all input it's spending should be from // the commitment transaction which was broadcast on-chain. - sweepTx := ht.GetRawTransaction(sweepingTXID) - for _, txIn := range sweepTx.MsgTx().TxIn { + for _, txIn := range sweepTx.TxIn { require.Equal(ht, &txIn.PreviousOutPoint.Hash, closingTxID, "sweep transaction not spending from commit") } - // For neutrino backend, due to it has no mempool, we need to check the - // sweep tx has already been saved to db before restarting. This is due - // to the possible race, - // - the fee bumper returns a TxPublished event, which is received by - // the sweeper and the sweep tx is saved to db. - // - the sweeper receives a shutdown signal before it receives the - // above event. - // - // TODO(yy): fix the above race. - if ht.IsNeutrinoBackend() { - // Check that we can find the commitment sweep in our set of - // known sweeps, using the simple transaction id ListSweeps - // output. - ht.AssertSweepFound(alice, sweepingTXID.String(), false, 0) - } - // Restart Alice to ensure that she resumes watching the finalized // commitment sweep txid. ht.RestartNode(alice) - // Alice's anchor report won't be created since it's uneconomical to - // sweep. We expect a resolution which spends our commit output. + // Once restarted, Alice will offer her anchor and to_local outputs to + // the sweeper again. This time the two inputs will be swept using the + // same tx as they both have None deadline height, the sweeper will + // group these two inputs together. + // + // The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(sweepTxid) + sweepTxid = ht.AssertNumTxsInMempool(1)[0] + + // We expect two resolutions - anchor and commit outputs. op = fmt.Sprintf("%v:%v", commitSweep.Outpoint.TxidStr, commitSweep.Outpoint.OutputIndex) aliceReports[op] = &lnrpc.Resolution{ ResolutionType: lnrpc.ResolutionType_COMMIT, Outcome: lnrpc.ResolutionOutcome_CLAIMED, - SweepTxid: sweepingTXID.String(), + SweepTxid: sweepTxid.String(), Outpoint: commitSweep.Outpoint, AmountSat: uint64(aliceBalance), } + op = fmt.Sprintf("%v:%v", anchorSweep.Outpoint.TxidStr, + anchorSweep.Outpoint.OutputIndex) + aliceReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_ANCHOR, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + SweepTxid: sweepTxid.String(), + Outpoint: anchorSweep.Outpoint, + AmountSat: uint64(anchorSweep.AmountSat), + } // Check that we can find the commitment sweep in our set of known // sweeps, using the simple transaction id ListSweeps output. - ht.AssertSweepFound(alice, sweepingTXID.String(), false, 0) + ht.AssertSweepFound(alice, sweepTxid.String(), false, 0) // Next, we mine an additional block which should include the sweep // transaction as the input scripts and the sequence locks on the @@ -450,12 +1034,11 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // number of blocks we have generated since adding it to the nursery, // and take an additional block off so that we end up one block shy of // the expiry height, and add the block padding. - currentHeight := int32(ht.CurrentHeight()) - cltvHeightDelta := int(htlcExpiryHeight - uint32(currentHeight) - 1) + cltvHeightDelta := int(htlcExpiryHeight - uint32(curHeight) - 1) // Advance the blockchain until just before the CLTV expires, nothing // exciting should have happened during this time. - ht.MineBlocks(cltvHeightDelta) + ht.MineEmptyBlocks(cltvHeightDelta) // We now restart Alice, to ensure that she will broadcast the // presigned htlc timeout txns after the delay expires after @@ -496,127 +1079,94 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Now, generate the block which will cause Alice to offer the // presigned htlc timeout txns to the sweeper. - ht.MineBlocks(1) + ht.MineEmptyBlocks(1) // Since Alice had numInvoices (6) htlcs extended to Carol before force // closing, we expect Alice to broadcast an htlc timeout txn for each - // one. We also expect Alice to still have her anchor since it's not - // swept. - ht.AssertNumPendingSweeps(alice, numInvoices+1) + // one. + ht.AssertNumPendingSweeps(alice, numInvoices) // Wait for them all to show up in the mempool - htlcTxIDs := ht.AssertNumTxsInMempool(1) - - // Retrieve each htlc timeout txn from the mempool, and ensure it is - // well-formed. The sweeping tx should spend all the htlc outputs. - // - // NOTE: We also add 1 output as the outgoing HTLC is swept using twice - // its value as its budget, so a wallet utxo is used. - numInputs := 6 + 1 - - // Construct a map of the already confirmed htlc timeout outpoints, - // that will count the number of times each is spent by the sweep txn. - // We prepopulate it in this way so that we can later detect if we are - // spending from an output that was not a confirmed htlc timeout txn. - var htlcTxOutpointSet = make(map[wire.OutPoint]int) - - var htlcLessFees uint64 - - //nolint:ll - for _, htlcTxID := range htlcTxIDs { - // Fetch the sweep transaction, all input it's spending should - // be from the commitment transaction which was broadcast - // on-chain. In case of an anchor type channel, we expect one - // extra input that is not spending from the commitment, that - // is added for fees. - htlcTx := ht.GetRawTransaction(htlcTxID) - - // Ensure the htlc transaction has the expected number of - // inputs. - inputs := htlcTx.MsgTx().TxIn - require.Len(ht, inputs, numInputs, "num inputs mismatch") - - // The number of outputs should be the same. - outputs := htlcTx.MsgTx().TxOut - require.Len(ht, outputs, numInputs, "num outputs mismatch") - - // Ensure all the htlc transaction inputs are spending from the - // commitment transaction, except if this is an extra input - // added to pay for fees for anchor channels. - nonCommitmentInputs := 0 - for i, txIn := range inputs { - if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { - nonCommitmentInputs++ - - require.Lessf(ht, nonCommitmentInputs, 2, - "htlc transaction not "+ - "spending from commit "+ - "tx %v, instead spending %v", - closingTxID, txIn.PreviousOutPoint) - - // This was an extra input added to pay fees, - // continue to the next one. - continue - } - - // For each htlc timeout transaction, we expect a - // resolver report recording this on chain resolution - // for both alice and carol. - outpoint := txIn.PreviousOutPoint - resolutionOutpoint := &lnrpc.OutPoint{ - TxidBytes: outpoint.Hash[:], - TxidStr: outpoint.Hash.String(), - OutputIndex: outpoint.Index, - } - - // We expect alice to have a timeout tx resolution with - // an amount equal to the payment amount. - //nolint:ll - aliceReports[outpoint.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, - Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, - SweepTxid: htlcTx.Hash().String(), - Outpoint: resolutionOutpoint, - AmountSat: uint64(paymentAmt), - } - - // We expect carol to have a resolution with an - // incoming htlc timeout which reflects the full amount - // of the htlc. It has no spend tx, because carol stops - // monitoring the htlc once it has timed out. - //nolint:ll - carolReports[outpoint.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, - Outcome: lnrpc.ResolutionOutcome_TIMEOUT, - SweepTxid: "", - Outpoint: resolutionOutpoint, - AmountSat: uint64(paymentAmt), - } - - // Recorf the HTLC outpoint, such that we can later - // check whether it gets swept - op := wire.OutPoint{ - Hash: htlcTxID, - Index: uint32(i), - } - htlcTxOutpointSet[op] = 0 - } - - // We record the htlc amount less fees here, so that we know - // what value to expect for the second stage of our htlc - // resolution. - htlcLessFees = uint64(outputs[0].Value) - } + htlcTxid := ht.AssertNumTxsInMempool(1)[0] // With the htlc timeout txns still in the mempool, we restart Alice to // verify that she can resume watching the htlc txns she broadcasted // before crashing. ht.RestartNode(alice) + // Once restarted, Alice will offer her HTLC outputs to the sweeper + // again. The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(htlcTxid) + htlcTx := ht.GetNumTxsFromMempool(1)[0] + htlcTxid = htlcTx.TxHash() + // Generate a block that mines the htlc timeout txns. Doing so now // activates the 2nd-stage CSV delayed outputs. ht.MineBlocksAndAssertNumTxes(1, 1) + // Retrieve each htlc timeout txn from the mempool, and ensure it is + // well-formed. The sweeping tx should spend all the htlc outputs. + // + // NOTE: We also add 1 output as the outgoing HTLC needs a wallet utxo + // to pay for its fee. + numInputs := 6 + 1 + + var htlcLessFees uint64 + + // Ensure the htlc transaction has the expected number of inputs. + require.Len(ht, htlcTx.TxIn, numInputs, "num inputs mismatch") + + // The number of outputs should be the same. + require.Len(ht, htlcTx.TxOut, numInputs, "num outputs mismatch") + + // Ensure all the htlc transaction inputs are spending from the + // commitment transaction, except if this is an extra input added to pay + // for fees for anchor channels. + for _, txIn := range htlcTx.TxIn { + if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { + // This was an extra input added to pay fees, + // continue to the next one. + continue + } + + // For each htlc timeout transaction, we expect a resolver + // report recording this on chain resolution for both alice and + // carol. + outpoint := txIn.PreviousOutPoint + resolutionOutpoint := &lnrpc.OutPoint{ + TxidBytes: outpoint.Hash[:], + TxidStr: outpoint.Hash.String(), + OutputIndex: outpoint.Index, + } + + // We expect alice to have a timeout tx resolution with an + // amount equal to the payment amount. + aliceReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, + SweepTxid: htlcTxid.String(), + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + + // We expect carol to have a resolution with an incoming htlc + // timeout which reflects the full amount of the htlc. It has no + // spend tx, because carol stops monitoring the htlc once it has + // timed out. + carolReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: "", + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + } + + // We record the htlc amount less fees here, so that we know what value + // to expect for the second stage of our htlc resolution. + htlcLessFees = uint64(htlcTx.TxOut[0].Value) + // Alice is restarted here to ensure that her contract court properly // handles the 2nd-stage sweeps after the htlc timeout txns were // confirmed. @@ -624,22 +1174,17 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Advance the chain until just before the 2nd-layer CSV delays expire. // For anchor channels this is one block earlier. - currentHeight = int32(ht.CurrentHeight()) - ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", currentHeight, + curHeight = int32(ht.CurrentHeight()) + ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", curHeight, htlcCsvMaturityHeight) - numBlocks := int(htlcCsvMaturityHeight - uint32(currentHeight) - 1) - ht.MineBlocks(numBlocks) + numBlocks := int(htlcCsvMaturityHeight - uint32(curHeight) - 1) + ht.MineEmptyBlocks(numBlocks) - ht.AssertNumPendingSweeps(alice, numInvoices+1) + ht.AssertNumPendingSweeps(alice, numInvoices) - // Restart Alice to ensure that she can recover from a failure. - // - // TODO(yy): Skip this step for neutrino as it cannot recover the - // sweeping txns from the mempool. We need to also store the txns in - // the sweeper store to make it work for the neutrino case. - if !ht.IsNeutrinoBackend() { - ht.RestartNode(alice) - } + // Fetch the htlc sweep transaction from the mempool. + htlcSweepTx := ht.GetNumTxsFromMempool(1)[0] + htlcSweepTxid := htlcSweepTx.TxHash() // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel @@ -657,60 +1202,15 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, }, defaultTimeout) require.NoError(ht, err, "timeout while checking force closed channel") - ht.AssertNumPendingSweeps(alice, numInvoices+1) - - // Wait for the single sweep txn to appear in the mempool. - htlcSweepTxid := ht.AssertNumTxsInMempool(1)[0] - - // Fetch the htlc sweep transaction from the mempool. - htlcSweepTx := ht.GetRawTransaction(htlcSweepTxid) + ht.AssertNumPendingSweeps(alice, numInvoices) // Ensure the htlc sweep transaction only has one input for each htlc // Alice extended before force closing. - require.Len(ht, htlcSweepTx.MsgTx().TxIn, numInvoices, + require.Len(ht, htlcSweepTx.TxIn, numInvoices, "htlc transaction has wrong num of inputs") - require.Len(ht, htlcSweepTx.MsgTx().TxOut, 1, + require.Len(ht, htlcSweepTx.TxOut, 1, "htlc sweep transaction should have one output") - // Ensure that each output spends from exactly one htlc timeout output. - for _, txIn := range htlcSweepTx.MsgTx().TxIn { - outpoint := txIn.PreviousOutPoint - - // Check that the input is a confirmed htlc timeout txn. - _, ok := htlcTxOutpointSet[outpoint] - require.Truef(ht, ok, "htlc sweep output not spending from "+ - "htlc tx, instead spending output %v", outpoint) - - // Increment our count for how many times this output was spent. - htlcTxOutpointSet[outpoint]++ - - // Check that each is only spent once. - require.Lessf(ht, htlcTxOutpointSet[outpoint], 2, - "htlc sweep tx has multiple spends from "+ - "outpoint %v", outpoint) - - // Since we have now swept our htlc timeout tx, we expect to - // have timeout resolutions for each of our htlcs. - output := txIn.PreviousOutPoint - aliceReports[output.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, - Outcome: lnrpc.ResolutionOutcome_TIMEOUT, - SweepTxid: htlcSweepTx.Hash().String(), - Outpoint: &lnrpc.OutPoint{ - TxidBytes: output.Hash[:], - TxidStr: output.Hash.String(), - OutputIndex: output.Index, - }, - AmountSat: htlcLessFees, - } - } - - // Check that each HTLC output was spent exactly once. - for op, num := range htlcTxOutpointSet { - require.Equalf(ht, 1, num, - "HTLC outpoint:%s was spent times", op) - } - // Check that we can find the htlc sweep in our set of sweeps using // the verbose output of the listsweeps output. ht.AssertSweepFound(alice, htlcSweepTxid.String(), true, 0) @@ -720,6 +1220,31 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // begins watching that txid after restarting. ht.RestartNode(alice) + // Once restarted, Alice will offer her HTLC outputs to the sweeper + // again. The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(htlcSweepTxid) + htlcSweepTxid = ht.AssertNumTxsInMempool(1)[0] + + // Ensure that each output spends from exactly one htlc timeout output. + for _, txIn := range htlcSweepTx.TxIn { + op := txIn.PreviousOutPoint + + // Since we have now swept our htlc timeout tx, we expect to + // have timeout resolutions for each of our htlcs. + aliceReports[op.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: htlcSweepTxid.String(), + Outpoint: &lnrpc.OutPoint{ + TxidBytes: op.Hash[:], + TxidStr: op.Hash.String(), + OutputIndex: op.Index, + }, + AmountSat: htlcLessFees, + } + } + // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel // as pending force closed. @@ -752,6 +1277,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // up within the pending channels RPC. err = wait.NoError(func() error { ht.AssertNumPendingForceClose(alice, 0) + // In addition to there being no pending channels, we verify // that pending channels does not report any money still in // limbo. From 8d49246a54b3403d5400590a946b3bb4bf6b4cb9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 26 Jan 2025 10:55:15 +0800 Subject: [PATCH 16/19] docs: add release notes --- docs/release-notes/release-notes-0.19.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index b664800fd..6c736a9d2 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -344,6 +344,11 @@ The underlying functionality between those two options remain the same. * A code refactor that [replaces min/max helpers with built-in min/max functions](https://github.com/lightningnetwork/lnd/pull/9451). +* [Unified](https://github.com/lightningnetwork/lnd/pull/9447) the monitoring + inputs spending logic in the sweeper so it can properly handle missing inputs + and recover from restart. + + ## Tooling and Documentation * [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077) From 353f208031e5047207f101ba2e956f504df3f657 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Feb 2025 20:04:40 +0800 Subject: [PATCH 17/19] sweep: refactor `IsOurTx` to not return an error Before this commit, the only error returned from `IsOurTx` is when the root bucket was not created. In that case, we should consider the tx to be not found in our db, since technically our db is empty. A future PR may consider treating our wallet as the single source of truth and query the wallet instead to check for past sweeping txns. --- sweep/mock_test.go | 4 ++-- sweep/store.go | 16 +++++++++------- sweep/store_test.go | 9 +++------ sweep/sweeper.go | 32 ++++---------------------------- sweep/sweeper_test.go | 8 ++++---- 5 files changed, 22 insertions(+), 47 deletions(-) diff --git a/sweep/mock_test.go b/sweep/mock_test.go index f9471f22a..9312b7e28 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -24,10 +24,10 @@ func NewMockSweeperStore() *MockSweeperStore { } // IsOurTx determines whether a tx is published by us, based on its hash. -func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { +func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) bool { args := s.Called(hash) - return args.Bool(0), args.Error(1) + return args.Bool(0) } // StoreTx stores a tx we are about to publish. diff --git a/sweep/store.go b/sweep/store.go index cfab66381..e3e97908b 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -121,7 +121,7 @@ func deserializeTxRecord(r io.Reader) (*TxRecord, error) { type SweeperStore interface { // IsOurTx determines whether a tx is published by us, based on its // hash. - IsOurTx(hash chainhash.Hash) (bool, error) + IsOurTx(hash chainhash.Hash) bool // StoreTx stores a tx hash we are about to publish. StoreTx(*TxRecord) error @@ -276,15 +276,17 @@ func (s *sweeperStore) StoreTx(tr *TxRecord) error { }, func() {}) } -// IsOurTx determines whether a tx is published by us, based on its -// hash. -func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { +// IsOurTx determines whether a tx is published by us, based on its hash. +func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool { var ours bool err := kvdb.View(s.db, func(tx kvdb.RTx) error { txHashesBucket := tx.ReadBucket(txHashesBucketKey) + // If the root bucket cannot be found, we consider the tx to be + // not found in our db. if txHashesBucket == nil { - return errNoTxHashesBucket + log.Error("Tx hashes bucket not found in sweeper store") + return nil } ours = txHashesBucket.Get(hash[:]) != nil @@ -294,10 +296,10 @@ func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { ours = false }) if err != nil { - return false, err + return false } - return ours, nil + return ours } // ListSweeps lists all the sweep transactions we have in the sweeper store. diff --git a/sweep/store_test.go b/sweep/store_test.go index ea65b0177..e9d4db125 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -57,18 +57,15 @@ func TestStore(t *testing.T) { require.NoError(t, err) // Assert that both txes are recognized as our own. - ours, err := store.IsOurTx(tx1.TxHash()) - require.NoError(t, err) + ours := store.IsOurTx(tx1.TxHash()) require.True(t, ours, "expected tx to be ours") - ours, err = store.IsOurTx(tx2.TxHash()) - require.NoError(t, err) + ours = store.IsOurTx(tx2.TxHash()) require.True(t, ours, "expected tx to be ours") // An different hash should be reported as not being ours. var unknownHash chainhash.Hash - ours, err = store.IsOurTx(unknownHash) - require.NoError(t, err) + ours = store.IsOurTx(unknownHash) require.False(t, ours, "expected tx to not be ours") txns, err := store.ListSweeps() diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 730a75586..9684039e4 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1394,12 +1394,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) { // Query store to find out if we ever published this tx. spendHash := *spend.SpenderTxHash - isOurTx, err := s.cfg.Store.IsOurTx(spendHash) - if err != nil { - log.Errorf("cannot determine if tx %v is ours: %v", - spendHash, err) - return - } + isOurTx := s.cfg.Store.IsOurTx(spendHash) // If this isn't our transaction, it means someone else swept outputs // that we were attempting to sweep. This can happen for anchor outputs @@ -1879,22 +1874,7 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { // NOTE: It is enough to check the txid because the sweeper will create // outpoints which solely belong to the internal LND wallet. func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool { - found, err := s.cfg.Store.IsOurTx(op.Hash) - // In case there is an error fetching the transaction details from the - // sweeper store we assume the outpoint is still used by the sweeper - // (worst case scenario). - // - // TODO(ziggie): Ensure that confirmed outpoints are deleted from the - // bucket. - if err != nil && !errors.Is(err, errNoTxHashesBucket) { - log.Errorf("failed to fetch info for outpoint(%v:%d) "+ - "with: %v, we assume it is still in use by the sweeper", - op.Hash, op.Index, err) - - return true - } - - return found + return s.cfg.Store.IsOurTx(op.Hash) } // markInputSwept marks the given input as swept by the tx. It will also notify @@ -1923,11 +1903,7 @@ func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { op := inp.OutPoint() txid := tx.TxHash() - isOurTx, err := s.cfg.Store.IsOurTx(txid) - if err != nil { - log.Errorf("Cannot determine if tx %v is ours: %v", txid, err) - return - } + isOurTx := s.cfg.Store.IsOurTx(txid) // If this is our tx, it means it's a previous sweeping tx that got // confirmed, which could happen when a restart happens during the @@ -1955,7 +1931,7 @@ func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { spentInputs[txIn.PreviousOutPoint] = struct{}{} } - err = s.removeConflictSweepDescendants(spentInputs) + err := s.removeConflictSweepDescendants(spentInputs) if err != nil { log.Warnf("unable to remove descendant transactions "+ "due to tx %v: ", txid) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index ff48d9e8b..d1fbee1f6 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1227,7 +1227,7 @@ func TestHandleUnknownSpendTxOurs(t *testing.T) { txid := tx.TxHash() // Mock the store to return true when calling IsOurTx. - store.On("IsOurTx", txid).Return(true, nil).Once() + store.On("IsOurTx", txid).Return(true).Once() // Call the method under test. s.handleUnknownSpendTx(si, tx) @@ -1271,7 +1271,7 @@ func TestHandleInputSpendTxThirdParty(t *testing.T) { txid := tx.TxHash() // Mock the store to return false when calling IsOurTx. - store.On("IsOurTx", txid).Return(false, nil).Once() + store.On("IsOurTx", txid).Return(false).Once() // Mock `ListSweeps` to return an empty slice as we are testing the // workflow here, not the method `removeConflictSweepDescendants`. @@ -1333,7 +1333,7 @@ func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) { } // Mock the store to return true when calling IsOurTx. - store.On("IsOurTx", txid).Return(true, nil).Once() + store.On("IsOurTx", txid).Return(true).Once() // Call the method under test. s.handleBumpEventTxUnknownSpend(resp) @@ -1419,7 +1419,7 @@ func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) { } // Mock the store to return true when calling IsOurTx. - store.On("IsOurTx", txid).Return(true, nil).Once() + store.On("IsOurTx", txid).Return(true).Once() // Mock the aggregator to return an empty slice as we are not testing // the actual sweeping behavior. From 7ab0e15937025defff3e1e1005f157bc4ce6e4ac Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 13 Feb 2025 12:36:57 +0800 Subject: [PATCH 18/19] sweep: fix error logging --- sweep/fee_bumper.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 934d2b653..6a796e22d 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -1356,7 +1356,9 @@ func (t *TxPublisher) createAndPublishTx( if errors.Is(result.Err, chain.ErrInsufficientFee) || errors.Is(result.Err, lnwallet.ErrMempoolFee) { - log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), + result.Err) + return fn.None[BumpResult]() } From 9f7e2bfd9653d4954ee3284d6b101186d51a6d2f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 20 Feb 2025 14:53:11 +0800 Subject: [PATCH 19/19] contractcourt: fix `errorlint` --- contractcourt/anchor_resolver.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/contractcourt/anchor_resolver.go b/contractcourt/anchor_resolver.go index 765a0f6ca..c59e5d063 100644 --- a/contractcourt/anchor_resolver.go +++ b/contractcourt/anchor_resolver.go @@ -98,9 +98,11 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) { select { case sweepRes := <-c.sweepResultChan: - switch sweepRes.Err { + err := sweepRes.Err + + switch { // Anchor was swept successfully. - case nil: + case err == nil: sweepTxID := sweepRes.Tx.TxHash() spendTx = &sweepTxID @@ -108,7 +110,9 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) { // Anchor was swept by someone else. This is possible after the // 16 block csv lock. - case sweep.ErrRemoteSpend, sweep.ErrInputMissing: + case errors.Is(err, sweep.ErrRemoteSpend), + errors.Is(err, sweep.ErrInputMissing): + c.log.Warnf("our anchor spent by someone else") outcome = channeldb.ResolverOutcomeUnclaimed