diff --git a/chainntnfs/mempool.go b/chainntnfs/mempool.go index 4cd181df8..2e31751fc 100644 --- a/chainntnfs/mempool.go +++ b/chainntnfs/mempool.go @@ -211,7 +211,7 @@ func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx, // If found, save it to watchedInputs to notify the // subscriber later. - Log.Infof("Found input %s, spent in %s", op, txid) + Log.Debugf("Found input %s, spent in %s", op, txid) // Construct the spend details. details := &SpendDetail{ diff --git a/contractcourt/anchor_resolver.go b/contractcourt/anchor_resolver.go index f0f6e2f5a..c59e5d063 100644 --- a/contractcourt/anchor_resolver.go +++ b/contractcourt/anchor_resolver.go @@ -98,9 +98,11 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) { select { case sweepRes := <-c.sweepResultChan: - switch sweepRes.Err { + err := sweepRes.Err + + switch { // Anchor was swept successfully. - case nil: + case err == nil: sweepTxID := sweepRes.Tx.TxHash() spendTx = &sweepTxID @@ -108,7 +110,9 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) { // Anchor was swept by someone else. This is possible after the // 16 block csv lock. - case sweep.ErrRemoteSpend: + case errors.Is(err, sweep.ErrRemoteSpend), + errors.Is(err, sweep.ErrInputMissing): + c.log.Warnf("our anchor spent by someone else") outcome = channeldb.ResolverOutcomeUnclaimed diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index b664800fd..6c736a9d2 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -344,6 +344,11 @@ The underlying functionality between those two options remain the same. * A code refactor that [replaces min/max helpers with built-in min/max functions](https://github.com/lightningnetwork/lnd/pull/9451). +* [Unified](https://github.com/lightningnetwork/lnd/pull/9447) the monitoring + inputs spending logic in the sweeper so it can properly handle missing inputs + and recover from restart. + + ## Tooling and Documentation * [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 2cf71feec..9bb611935 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -662,6 +662,10 @@ var allTestCases = []*lntest.TestCase{ Name: "invoice migration", TestFunc: testInvoiceMigration, }, + { + Name: "fee replacement", + TestFunc: testFeeReplacement, + }, } // appendPrefixed is used to add a prefix to each test name in the subtests diff --git a/itest/lnd_channel_force_close_test.go b/itest/lnd_channel_force_close_test.go index 6fcd8dd86..24868a019 100644 --- a/itest/lnd_channel_force_close_test.go +++ b/itest/lnd_channel_force_close_test.go @@ -6,7 +6,6 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/lnrpc" @@ -27,6 +26,15 @@ var channelForceCloseTestCases = []*lntest.TestCase{ Name: "simple taproot", TestFunc: testChannelForceClosureSimpleTaproot, }, + { + Name: "anchor restart", + TestFunc: testChannelForceClosureAnchorRestart, + }, + { + Name: "simple taproot restart", + TestFunc: testChannelForceClosureSimpleTaprootRestart, + }, + { Name: "wrong preimage", TestFunc: testFailingChannel, @@ -87,9 +95,7 @@ func testChannelForceClosureSimpleTaproot(ht *lntest.HarnessTest) { // commitment transaction, a transaction sweeping the local CSV delayed output, // a transaction sweeping the CSV delayed 2nd-layer htlcs outputs, and n htlc // timeout transactions, where n is the number of payments Alice attempted -// to send to Carol. This test includes several restarts to ensure that the -// transaction output states are persisted throughout the forced closure -// process. +// to send to Carol. func runChannelForceClosureTest(ht *lntest.HarnessTest, cfgs [][]string, params lntest.OpenChannelParams) { @@ -131,6 +137,597 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, ht.SendPaymentAssertInflight(alice, req) } + // Once the HTLC has cleared, all the nodes in our mini network should + // show that the HTLC has been locked in. + ht.AssertNumActiveHtlcs(alice, numInvoices) + ht.AssertNumActiveHtlcs(carol, numInvoices) + + // Fetch starting height of this test so we can compute the block + // heights we expect certain events to take place. + curHeight := int32(ht.CurrentHeight()) + + // Using the current height of the chain, derive the relevant heights + // for sweeping two-stage htlcs. + var ( + startHeight = uint32(curHeight) + commCsvMaturityHeight = startHeight + 1 + defaultCSV + htlcExpiryHeight = padCLTV(startHeight + finalCltvDelta) + htlcCsvMaturityHeight = padCLTV( + startHeight + finalCltvDelta + 1 + defaultCSV, + ) + ) + + aliceChan := ht.QueryChannelByChanPoint(alice, chanPoint) + require.NotZero(ht, aliceChan.NumUpdates, + "alice should see at least one update to her channel") + + // Now that the channel is open and we have unsettled htlcs, + // immediately execute a force closure of the channel. This will also + // assert that the commitment transaction was immediately broadcast in + // order to fulfill the force closure request. + ht.CloseChannelAssertPending(alice, chanPoint, true) + + // Now that the channel has been force closed, it should show up in the + // PendingChannels RPC under the waiting close section. + waitingClose := ht.AssertChannelWaitingClose(alice, chanPoint) + + // Immediately after force closing, all of the funds should be in + // limbo. + require.NotZero(ht, waitingClose.LimboBalance, + "all funds should still be in limbo") + + // Create a map of outpoints to expected resolutions for alice and + // carol which we will add reports to as we sweep outputs. + var ( + aliceReports = make(map[string]*lnrpc.Resolution) + carolReports = make(map[string]*lnrpc.Resolution) + ) + + // We expect to see Alice's force close tx in the mempool. + ht.AssertNumTxsInMempool(1) + + // Mine a block which should confirm the commitment transaction + // broadcast as a result of the force closure. Once mined, we also + // expect Alice's anchor sweeping tx being published. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Assert Alice's has one pending anchor output - because she doesn't + // have incoming HTLCs, her outgoing HTLC won't have a deadline, thus + // she won't use the anchor to perform CPFP. + aliceAnchor := ht.AssertNumPendingSweeps(alice, 1)[0] + require.Equal(ht, aliceAnchor.Outpoint.TxidStr, + waitingClose.Commitments.LocalTxid) + + // Now that the commitment has been confirmed, the channel should be + // marked as force closed. + forceClose := ht.AssertChannelPendingForceClose(alice, chanPoint) + + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + err := checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + require.NoError(ht, err) + + // None of our outputs have been swept, so they should all be in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) + + // Carol should offer her commit and anchor outputs to the sweeper. + sweepTxns := ht.AssertNumPendingSweeps(carol, 2) + + // Identify Carol's pending sweeps. + var carolAnchor, carolCommit = sweepTxns[0], sweepTxns[1] + if carolAnchor.AmountSat != uint32(anchorSize) { + carolCommit = carolAnchor + } + + // Carol's sweep tx should be in the mempool already, as her output is + // not timelocked. This sweep tx should spend her to_local output as + // the anchor output is not economical to spend. + carolTx := ht.GetNumTxsFromMempool(1)[0] + + // Carol's sweeping tx should have 1-input-1-output shape. + require.Len(ht, carolTx.TxIn, 1) + require.Len(ht, carolTx.TxOut, 1) + + // Calculate the total fee Carol paid. + totalFeeCarol := ht.CalculateTxFee(carolTx) + + // Carol's anchor report won't be created since it's uneconomical to + // sweep. So we expect to see only the commit sweep report. + op := fmt.Sprintf("%v:%v", carolCommit.Outpoint.TxidStr, + carolCommit.Outpoint.OutputIndex) + carolReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_COMMIT, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + Outpoint: carolCommit.Outpoint, + AmountSat: uint64(pushAmt), + SweepTxid: carolTx.TxHash().String(), + } + + // We also expect Carol to broadcast her sweeping tx which spends her + // commit and anchor outputs. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Alice should still have the anchor sweeping request. + ht.AssertNumPendingSweeps(alice, 1) + + // Alice should see the channel in her set of pending force closed + // channels with her funds still in limbo. + forceClose = ht.AssertChannelPendingForceClose(alice, chanPoint) + + // Make a record of the balances we expect for alice and carol. + aliceBalance := forceClose.Channel.LocalBalance + + // Get the closing txid. + txid, err := chainhash.NewHashFromStr(forceClose.ClosingTxid) + require.NoError(ht, err) + closingTxID := txid + + // At this point, the nursery should show that the commitment output has + // 3 block left before its CSV delay expires. In total, we have mined + // exactly defaultCSV blocks, so the htlc outputs should also reflect + // that this many blocks have passed. + err = checkCommitmentMaturity(forceClose, commCsvMaturityHeight, 3) + require.NoError(ht, err) + + // All funds should still be shown in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) + + // Generate two blocks, which should cause the CSV delayed output from + // the commitment txn to expire. + ht.MineEmptyBlocks(2) + + // At this point, the CSV will expire in the next block, meaning that + // the output should be offered to the sweeper. + sweeps := ht.AssertNumPendingSweeps(alice, 2) + commitSweep, anchorSweep := sweeps[0], sweeps[1] + if commitSweep.AmountSat < anchorSweep.AmountSat { + commitSweep = anchorSweep + } + + // Alice's sweeping transaction should now be broadcast. So we fetch the + // node's mempool to ensure it has been properly broadcast. + sweepTx := ht.GetNumTxsFromMempool(1)[0] + sweepTxid := sweepTx.TxHash() + + // The sweep transaction's inputs spending should be from the commitment + // transaction. + for _, txIn := range sweepTx.TxIn { + require.Equal(ht, &txIn.PreviousOutPoint.Hash, closingTxID, + "sweep transaction not spending from commit") + } + + // Alice's anchor report won't be created since it's uneconomical to + // sweep. We expect a resolution which spends our commit output. + op = fmt.Sprintf("%v:%v", commitSweep.Outpoint.TxidStr, + commitSweep.Outpoint.OutputIndex) + aliceReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_COMMIT, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + SweepTxid: sweepTxid.String(), + Outpoint: commitSweep.Outpoint, + AmountSat: uint64(aliceBalance), + } + + // Check that we can find the commitment sweep in our set of known + // sweeps, using the simple transaction id ListSweeps output. + ht.AssertSweepFound(alice, sweepTxid.String(), false, 0) + + // Next, we mine an additional block which should include the sweep + // transaction as the input scripts and the sequence locks on the + // inputs should be properly met. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Update current height + curHeight = int32(ht.CurrentHeight()) + + // checkForceClosedChannelNumHtlcs verifies that a force closed channel + // has the proper number of htlcs. + checkPendingChannelNumHtlcs := func( + forceClose lntest.PendingForceClose) error { + + if len(forceClose.PendingHtlcs) != numInvoices { + return fmt.Errorf("expected force closed channel to "+ + "have %d pending htlcs, found %d instead", + numInvoices, len(forceClose.PendingHtlcs)) + } + + return nil + } + + err = wait.NoError(func() error { + // Now that the commit output has been fully swept, check to + // see that the channel remains open for the pending htlc + // outputs. + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + // The commitment funds will have been recovered after the + // commit txn was included in the last block. The htlc funds + // will be shown in limbo. + err := checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + + err = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, + int32(htlcExpiryHeight)-curHeight, + ) + if err != nil { + return err + } + + if forceClose.LimboBalance == 0 { + return fmt.Errorf("expected funds in limbo, found 0") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout checking pending force close channel") + + // Compute the height preceding that which will cause the htlc CLTV + // timeouts will expire. The outputs entered at the same height as the + // output spending from the commitment txn, so we must deduct the + // number of blocks we have generated since adding it to the nursery, + // and take an additional block off so that we end up one block shy of + // the expiry height, and add the block padding. + cltvHeightDelta := int(htlcExpiryHeight - uint32(curHeight) - 1) + + // Advance the blockchain until just before the CLTV expires, nothing + // exciting should have happened during this time. + ht.MineEmptyBlocks(cltvHeightDelta) + + // Alice should now see the channel in her set of pending force closed + // channels with one pending HTLC. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + // We should now be at the block just before the utxo nursery + // will attempt to broadcast the htlc timeout transactions. + err = checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + err = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, 1, + ) + if err != nil { + return err + } + + // Now that our commitment confirmation depth has been + // surpassed, we should now see a non-zero recovered balance. + // All htlc outputs are still left in limbo, so it should be + // non-zero as well. + if forceClose.LimboBalance == 0 { + return errors.New("htlc funds should still be in limbo") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + // Now, generate the block which will cause Alice to offer the presigned + // htlc timeout txns to the sweeper. + ht.MineEmptyBlocks(1) + + // Since Alice had numInvoices (6) htlcs extended to Carol before force + // closing, we expect Alice to broadcast an htlc timeout txn for each + // one. We also expect Alice to still have her anchor since it's not + // swept. + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Wait for them all to show up in the mempool + htlcTx := ht.GetNumTxsFromMempool(1)[0] + htlcTxid := htlcTx.TxHash() + + // Retrieve each htlc timeout txn from the mempool, and ensure it is + // well-formed. The sweeping tx should spend all the htlc outputs. + // + // NOTE: We also add 1 output as the outgoing HTLC needs a wallet utxo + // to pay for its fee. + numInputs := 6 + 1 + + var htlcLessFees uint64 + + // Ensure the htlc transaction has the expected number of inputs. + require.Len(ht, htlcTx.TxIn, numInputs, "num inputs mismatch") + + // The number of outputs should be the same. + require.Len(ht, htlcTx.TxOut, numInputs, "num outputs mismatch") + + // Ensure all the htlc transaction inputs are spending from the + // commitment transaction, except if this is an extra input added to pay + // for fees for anchor channels. + for _, txIn := range htlcTx.TxIn { + if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { + // This was an extra input added to pay fees, + // continue to the next one. + continue + } + + // For each htlc timeout transaction, we expect a resolver + // report recording this on chain resolution for both alice and + // carol. + outpoint := txIn.PreviousOutPoint + resolutionOutpoint := &lnrpc.OutPoint{ + TxidBytes: outpoint.Hash[:], + TxidStr: outpoint.Hash.String(), + OutputIndex: outpoint.Index, + } + + // We expect alice to have a timeout tx resolution with an + // amount equal to the payment amount. + aliceReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, + SweepTxid: htlcTxid.String(), + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + + // We expect carol to have a resolution with an incoming htlc + // timeout which reflects the full amount of the htlc. It has no + // spend tx, because carol stops monitoring the htlc once it has + // timed out. + carolReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: "", + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + } + + // We record the htlc amount less fees here, so that we know what value + // to expect for the second stage of our htlc resolution. + htlcLessFees = uint64(htlcTx.TxOut[0].Value) + + // Generate a block that mines the htlc timeout txns. Doing so now + // activates the 2nd-stage CSV delayed outputs. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Advance the chain until just before the 2nd-layer CSV delays expire. + // For anchor channels this is one block earlier. + curHeight = int32(ht.CurrentHeight()) + ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", curHeight, + htlcCsvMaturityHeight) + numBlocks := int(htlcCsvMaturityHeight - uint32(curHeight) - 1) + ht.MineEmptyBlocks(numBlocks) + + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Now that the channel has been fully swept, it should no longer show + // incubated, check to see that Alice's node still reports the channel + // as pending force closed. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + + if forceClose.LimboBalance == 0 { + return fmt.Errorf("htlc funds should still be in limbo") + } + + return checkPendingChannelNumHtlcs(forceClose) + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + ht.AssertNumPendingSweeps(alice, numInvoices+1) + + // Wait for the single sweep txn to appear in the mempool. + htlcSweepTx := ht.GetNumTxsFromMempool(1)[0] + htlcSweepTxid := htlcSweepTx.TxHash() + + // Ensure the htlc sweep transaction only has one input for each htlc + // Alice extended before force closing. + require.Len(ht, htlcSweepTx.TxIn, numInvoices, + "htlc transaction has wrong num of inputs") + require.Len(ht, htlcSweepTx.TxOut, 1, + "htlc sweep transaction should have one output") + + // Ensure that each output spends from exactly one htlc timeout output. + for _, txIn := range htlcSweepTx.TxIn { + op := txIn.PreviousOutPoint + + // Since we have now swept our htlc timeout tx, we expect to + // have timeout resolutions for each of our htlcs. + aliceReports[op.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: htlcSweepTxid.String(), + Outpoint: &lnrpc.OutPoint{ + TxidBytes: op.Hash[:], + TxidStr: op.Hash.String(), + OutputIndex: op.Index, + }, + AmountSat: htlcLessFees, + } + } + + // Check that we can find the htlc sweep in our set of sweeps using + // the verbose output of the listsweeps output. + ht.AssertSweepFound(alice, htlcSweepTxid.String(), true, 0) + + // Now that the channel has been fully swept, it should no longer show + // incubated, check to see that Alice's node still reports the channel + // as pending force closed. + err = wait.NoError(func() error { + forceClose := ht.AssertChannelPendingForceClose( + alice, chanPoint, + ) + err := checkPendingChannelNumHtlcs(forceClose) + if err != nil { + return err + } + + err = checkPendingHtlcStageAndMaturity( + forceClose, 2, htlcCsvMaturityHeight-1, 0, + ) + if err != nil { + return err + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout while checking force closed channel") + + // Generate the final block that sweeps all htlc funds into the user's + // wallet, and make sure the sweep is in this block. + block := ht.MineBlocksAndAssertNumTxes(1, 1)[0] + ht.AssertTxInBlock(block, htlcSweepTxid) + + // Now that the channel has been fully swept, it should no longer show + // up within the pending channels RPC. + err = wait.NoError(func() error { + ht.AssertNumPendingForceClose(alice, 0) + + // In addition to there being no pending channels, we verify + // that pending channels does not report any money still in + // limbo. + pendingChanResp := alice.RPC.PendingChannels() + if pendingChanResp.TotalLimboBalance != 0 { + return errors.New("no user funds should be left " + + "in limbo after incubation") + } + + return nil + }, defaultTimeout) + require.NoError(ht, err, "timeout checking limbo balance") + + // At this point, Carol should now be aware of her new immediately + // spendable on-chain balance, as it was Alice who broadcast the + // commitment transaction. + carolBalResp = carol.RPC.WalletBalance() + + // Carol's expected balance should be its starting balance plus the + // push amount sent by Alice and minus the miner fee paid. + carolExpectedBalance := btcutil.Amount(carolStartingBalance) + + pushAmt - totalFeeCarol + + require.Equal(ht, carolExpectedBalance, + btcutil.Amount(carolBalResp.ConfirmedBalance), + "carol's balance is incorrect") + + // Finally, we check that alice and carol have the set of resolutions + // we expect. + assertReports(ht, alice, chanPoint, aliceReports) + assertReports(ht, carol, chanPoint, carolReports) +} + +// testChannelForceClosureAnchor runs `runChannelForceClosureTestRestart` with +// anchor channels. +func testChannelForceClosureAnchorRestart(ht *lntest.HarnessTest) { + // Create a simple network: Alice -> Carol, using anchor channels. + // + // Prepare params. + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + PushAmt: pushAmt, + CommitmentType: lnrpc.CommitmentType_ANCHORS, + } + + cfg := node.CfgAnchor + cfgCarol := append([]string{"--hodl.exit-settle"}, cfg...) + cfgs := [][]string{cfg, cfgCarol} + + runChannelForceClosureTestRestart(ht, cfgs, openChannelParams) +} + +// testChannelForceClosureSimpleTaprootRestart runs +// `runChannelForceClosureTestRestart` with simple taproot channels. +func testChannelForceClosureSimpleTaprootRestart(ht *lntest.HarnessTest) { + // Create a simple network: Alice -> Carol, using simple taproot + // channels. + // + // Prepare params. + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + PushAmt: pushAmt, + // If the channel is a taproot channel, then we'll need to + // create a private channel. + // + // TODO(roasbeef): lift after G175 + CommitmentType: lnrpc.CommitmentType_SIMPLE_TAPROOT, + Private: true, + } + + cfg := node.CfgSimpleTaproot + cfgCarol := append([]string{"--hodl.exit-settle"}, cfg...) + cfgs := [][]string{cfg, cfgCarol} + + runChannelForceClosureTestRestart(ht, cfgs, openChannelParams) +} + +// runChannelForceClosureTestRestart performs a test to exercise the behavior of +// "force" closing a channel or unilaterally broadcasting the latest local +// commitment state on-chain. The test creates a new channel between Alice and +// Carol, then force closes the channel after some cursory assertions. Within +// the test, a total of 3 + n transactions will be broadcast, representing the +// commitment transaction, a transaction sweeping the local CSV delayed output, +// a transaction sweeping the CSV delayed 2nd-layer htlcs outputs, and n htlc +// timeout transactions, where n is the number of payments Alice attempted +// to send to Carol. This test includes several restarts to ensure that the +// transaction output states are persisted throughout the forced closure +// process. +func runChannelForceClosureTestRestart(ht *lntest.HarnessTest, + cfgs [][]string, params lntest.OpenChannelParams) { + + // Skip this test for neutrino, as it cannot create RBF-compliant + // sweeping txns due to no access to `testmempoolaccept`. For neutrino + // nodes connected to a btcd node, they can rely on a reject msg being + // sent back to decide how do perform the fee replacement. + // + // TODO(yy): refactor `createRBFCompliantTx` to also work for neutrino. + if ht.IsNeutrinoBackend() { + ht.Skipf("skipping persistence test for neutrino backend") + } + + const ( + numInvoices = 6 + commitFeeRate = 20000 + ) + + ht.SetFeeEstimate(commitFeeRate) + + // Create a three hop network: Alice -> Carol. + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, params) + alice, carol := nodes[0], nodes[1] + chanPoint := chanPoints[0] + + // We need one additional UTXO for sweeping the remote anchor. + if ht.IsNeutrinoBackend() { + ht.FundCoins(btcutil.SatoshiPerBitcoin, alice) + } + + // Before we start, obtain Carol's current wallet balance, we'll check + // to ensure that at the end of the force closure by Alice, Carol + // recognizes his new on-chain output. + carolBalResp := carol.RPC.WalletBalance() + carolStartingBalance := carolBalResp.ConfirmedBalance + + // Send payments from Alice to Carol, since Carol is htlchodl mode, the + // htlc outputs should be left unsettled, and should be swept by the + // utxo nursery. + carolPubKey := carol.PubKey[:] + for i := 0; i < numInvoices; i++ { + req := &routerrpc.SendPaymentRequest{ + Dest: carolPubKey, + Amt: int64(paymentAmt), + PaymentHash: ht.Random32Bytes(), + FinalCltvDelta: finalCltvDelta, + FeeLimitMsat: noFeeLimitMsat, + } + ht.SendPaymentAssertInflight(alice, req) + } + // Once the HTLC has cleared, all the nodes n our mini network should // show that the HTLC has been locked in. ht.AssertNumActiveHtlcs(alice, numInvoices) @@ -202,32 +799,18 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Now that the commitment has been confirmed, the channel should be // marked as force closed. - err := wait.NoError(func() error { - forceClose := ht.AssertChannelPendingForceClose( - alice, chanPoint, - ) + forceClose := ht.AssertChannelPendingForceClose(alice, chanPoint) - // Now that the channel has been force closed, it should now - // have the height and number of blocks to confirm populated. - err := checkCommitmentMaturity( - forceClose, commCsvMaturityHeight, int32(defaultCSV), - ) - if err != nil { - return err - } + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + err := checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + require.NoError(ht, err) - // None of our outputs have been swept, so they should all be - // in limbo. - if forceClose.LimboBalance == 0 { - return errors.New("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - return errors.New("no funds should be recovered") - } - - return nil - }, defaultTimeout) - require.NoError(ht, err, "timeout while checking force closed channel") + // None of our outputs have been swept, so they should all be in limbo. + require.NotZero(ht, forceClose.LimboBalance) + require.Zero(ht, forceClose.RecoveredBalance) // The following restart is intended to ensure that outputs from the // force close commitment transaction have been persisted once the @@ -241,7 +824,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Identify Carol's pending sweeps. var carolAnchor, carolCommit = sweepTxns[0], sweepTxns[1] if carolAnchor.AmountSat != uint32(anchorSize) { - carolAnchor, carolCommit = carolCommit, carolAnchor + carolCommit = carolAnchor } // Carol's sweep tx should be in the mempool already, as her output is @@ -331,7 +914,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Generate two blocks, which should cause the CSV delayed output from // the commitment txn to expire. - ht.MineBlocks(2) + ht.MineEmptyBlocks(2) // At this point, the CSV will expire in the next block, meaning that // the output should be offered to the sweeper. @@ -341,54 +924,55 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, commitSweep, anchorSweep = anchorSweep, commitSweep } - // Mine one block and the sweeping transaction should now be broadcast. - // So we fetch the node's mempool to ensure it has been properly - // broadcast. - sweepingTXID := ht.AssertNumTxsInMempool(1)[0] + // Alice's sweeping transaction should now be broadcast. So we fetch the + // node's mempool to ensure it has been properly broadcast. + sweepTx := ht.GetNumTxsFromMempool(1)[0] + sweepTxid := sweepTx.TxHash() // Fetch the sweep transaction, all input it's spending should be from // the commitment transaction which was broadcast on-chain. - sweepTx := ht.GetRawTransaction(sweepingTXID) - for _, txIn := range sweepTx.MsgTx().TxIn { + for _, txIn := range sweepTx.TxIn { require.Equal(ht, &txIn.PreviousOutPoint.Hash, closingTxID, "sweep transaction not spending from commit") } - // For neutrino backend, due to it has no mempool, we need to check the - // sweep tx has already been saved to db before restarting. This is due - // to the possible race, - // - the fee bumper returns a TxPublished event, which is received by - // the sweeper and the sweep tx is saved to db. - // - the sweeper receives a shutdown signal before it receives the - // above event. - // - // TODO(yy): fix the above race. - if ht.IsNeutrinoBackend() { - // Check that we can find the commitment sweep in our set of - // known sweeps, using the simple transaction id ListSweeps - // output. - ht.AssertSweepFound(alice, sweepingTXID.String(), false, 0) - } - // Restart Alice to ensure that she resumes watching the finalized // commitment sweep txid. ht.RestartNode(alice) - // Alice's anchor report won't be created since it's uneconomical to - // sweep. We expect a resolution which spends our commit output. + // Once restarted, Alice will offer her anchor and to_local outputs to + // the sweeper again. This time the two inputs will be swept using the + // same tx as they both have None deadline height, the sweeper will + // group these two inputs together. + // + // The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(sweepTxid) + sweepTxid = ht.AssertNumTxsInMempool(1)[0] + + // We expect two resolutions - anchor and commit outputs. op = fmt.Sprintf("%v:%v", commitSweep.Outpoint.TxidStr, commitSweep.Outpoint.OutputIndex) aliceReports[op] = &lnrpc.Resolution{ ResolutionType: lnrpc.ResolutionType_COMMIT, Outcome: lnrpc.ResolutionOutcome_CLAIMED, - SweepTxid: sweepingTXID.String(), + SweepTxid: sweepTxid.String(), Outpoint: commitSweep.Outpoint, AmountSat: uint64(aliceBalance), } + op = fmt.Sprintf("%v:%v", anchorSweep.Outpoint.TxidStr, + anchorSweep.Outpoint.OutputIndex) + aliceReports[op] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_ANCHOR, + Outcome: lnrpc.ResolutionOutcome_CLAIMED, + SweepTxid: sweepTxid.String(), + Outpoint: anchorSweep.Outpoint, + AmountSat: uint64(anchorSweep.AmountSat), + } // Check that we can find the commitment sweep in our set of known // sweeps, using the simple transaction id ListSweeps output. - ht.AssertSweepFound(alice, sweepingTXID.String(), false, 0) + ht.AssertSweepFound(alice, sweepTxid.String(), false, 0) // Next, we mine an additional block which should include the sweep // transaction as the input scripts and the sequence locks on the @@ -450,12 +1034,11 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // number of blocks we have generated since adding it to the nursery, // and take an additional block off so that we end up one block shy of // the expiry height, and add the block padding. - currentHeight := int32(ht.CurrentHeight()) - cltvHeightDelta := int(htlcExpiryHeight - uint32(currentHeight) - 1) + cltvHeightDelta := int(htlcExpiryHeight - uint32(curHeight) - 1) // Advance the blockchain until just before the CLTV expires, nothing // exciting should have happened during this time. - ht.MineBlocks(cltvHeightDelta) + ht.MineEmptyBlocks(cltvHeightDelta) // We now restart Alice, to ensure that she will broadcast the // presigned htlc timeout txns after the delay expires after @@ -496,127 +1079,94 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Now, generate the block which will cause Alice to offer the // presigned htlc timeout txns to the sweeper. - ht.MineBlocks(1) + ht.MineEmptyBlocks(1) // Since Alice had numInvoices (6) htlcs extended to Carol before force // closing, we expect Alice to broadcast an htlc timeout txn for each - // one. We also expect Alice to still have her anchor since it's not - // swept. - ht.AssertNumPendingSweeps(alice, numInvoices+1) + // one. + ht.AssertNumPendingSweeps(alice, numInvoices) // Wait for them all to show up in the mempool - htlcTxIDs := ht.AssertNumTxsInMempool(1) - - // Retrieve each htlc timeout txn from the mempool, and ensure it is - // well-formed. The sweeping tx should spend all the htlc outputs. - // - // NOTE: We also add 1 output as the outgoing HTLC is swept using twice - // its value as its budget, so a wallet utxo is used. - numInputs := 6 + 1 - - // Construct a map of the already confirmed htlc timeout outpoints, - // that will count the number of times each is spent by the sweep txn. - // We prepopulate it in this way so that we can later detect if we are - // spending from an output that was not a confirmed htlc timeout txn. - var htlcTxOutpointSet = make(map[wire.OutPoint]int) - - var htlcLessFees uint64 - - //nolint:ll - for _, htlcTxID := range htlcTxIDs { - // Fetch the sweep transaction, all input it's spending should - // be from the commitment transaction which was broadcast - // on-chain. In case of an anchor type channel, we expect one - // extra input that is not spending from the commitment, that - // is added for fees. - htlcTx := ht.GetRawTransaction(htlcTxID) - - // Ensure the htlc transaction has the expected number of - // inputs. - inputs := htlcTx.MsgTx().TxIn - require.Len(ht, inputs, numInputs, "num inputs mismatch") - - // The number of outputs should be the same. - outputs := htlcTx.MsgTx().TxOut - require.Len(ht, outputs, numInputs, "num outputs mismatch") - - // Ensure all the htlc transaction inputs are spending from the - // commitment transaction, except if this is an extra input - // added to pay for fees for anchor channels. - nonCommitmentInputs := 0 - for i, txIn := range inputs { - if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { - nonCommitmentInputs++ - - require.Lessf(ht, nonCommitmentInputs, 2, - "htlc transaction not "+ - "spending from commit "+ - "tx %v, instead spending %v", - closingTxID, txIn.PreviousOutPoint) - - // This was an extra input added to pay fees, - // continue to the next one. - continue - } - - // For each htlc timeout transaction, we expect a - // resolver report recording this on chain resolution - // for both alice and carol. - outpoint := txIn.PreviousOutPoint - resolutionOutpoint := &lnrpc.OutPoint{ - TxidBytes: outpoint.Hash[:], - TxidStr: outpoint.Hash.String(), - OutputIndex: outpoint.Index, - } - - // We expect alice to have a timeout tx resolution with - // an amount equal to the payment amount. - //nolint:ll - aliceReports[outpoint.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, - Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, - SweepTxid: htlcTx.Hash().String(), - Outpoint: resolutionOutpoint, - AmountSat: uint64(paymentAmt), - } - - // We expect carol to have a resolution with an - // incoming htlc timeout which reflects the full amount - // of the htlc. It has no spend tx, because carol stops - // monitoring the htlc once it has timed out. - //nolint:ll - carolReports[outpoint.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, - Outcome: lnrpc.ResolutionOutcome_TIMEOUT, - SweepTxid: "", - Outpoint: resolutionOutpoint, - AmountSat: uint64(paymentAmt), - } - - // Recorf the HTLC outpoint, such that we can later - // check whether it gets swept - op := wire.OutPoint{ - Hash: htlcTxID, - Index: uint32(i), - } - htlcTxOutpointSet[op] = 0 - } - - // We record the htlc amount less fees here, so that we know - // what value to expect for the second stage of our htlc - // resolution. - htlcLessFees = uint64(outputs[0].Value) - } + htlcTxid := ht.AssertNumTxsInMempool(1)[0] // With the htlc timeout txns still in the mempool, we restart Alice to // verify that she can resume watching the htlc txns she broadcasted // before crashing. ht.RestartNode(alice) + // Once restarted, Alice will offer her HTLC outputs to the sweeper + // again. The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(htlcTxid) + htlcTx := ht.GetNumTxsFromMempool(1)[0] + htlcTxid = htlcTx.TxHash() + // Generate a block that mines the htlc timeout txns. Doing so now // activates the 2nd-stage CSV delayed outputs. ht.MineBlocksAndAssertNumTxes(1, 1) + // Retrieve each htlc timeout txn from the mempool, and ensure it is + // well-formed. The sweeping tx should spend all the htlc outputs. + // + // NOTE: We also add 1 output as the outgoing HTLC needs a wallet utxo + // to pay for its fee. + numInputs := 6 + 1 + + var htlcLessFees uint64 + + // Ensure the htlc transaction has the expected number of inputs. + require.Len(ht, htlcTx.TxIn, numInputs, "num inputs mismatch") + + // The number of outputs should be the same. + require.Len(ht, htlcTx.TxOut, numInputs, "num outputs mismatch") + + // Ensure all the htlc transaction inputs are spending from the + // commitment transaction, except if this is an extra input added to pay + // for fees for anchor channels. + for _, txIn := range htlcTx.TxIn { + if !closingTxID.IsEqual(&txIn.PreviousOutPoint.Hash) { + // This was an extra input added to pay fees, + // continue to the next one. + continue + } + + // For each htlc timeout transaction, we expect a resolver + // report recording this on chain resolution for both alice and + // carol. + outpoint := txIn.PreviousOutPoint + resolutionOutpoint := &lnrpc.OutPoint{ + TxidBytes: outpoint.Hash[:], + TxidStr: outpoint.Hash.String(), + OutputIndex: outpoint.Index, + } + + // We expect alice to have a timeout tx resolution with an + // amount equal to the payment amount. + aliceReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_FIRST_STAGE, + SweepTxid: htlcTxid.String(), + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + + // We expect carol to have a resolution with an incoming htlc + // timeout which reflects the full amount of the htlc. It has no + // spend tx, because carol stops monitoring the htlc once it has + // timed out. + carolReports[outpoint.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_INCOMING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: "", + Outpoint: resolutionOutpoint, + AmountSat: uint64(paymentAmt), + } + } + + // We record the htlc amount less fees here, so that we know what value + // to expect for the second stage of our htlc resolution. + htlcLessFees = uint64(htlcTx.TxOut[0].Value) + // Alice is restarted here to ensure that her contract court properly // handles the 2nd-stage sweeps after the htlc timeout txns were // confirmed. @@ -624,22 +1174,17 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // Advance the chain until just before the 2nd-layer CSV delays expire. // For anchor channels this is one block earlier. - currentHeight = int32(ht.CurrentHeight()) - ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", currentHeight, + curHeight = int32(ht.CurrentHeight()) + ht.Logf("current height: %v, htlcCsvMaturityHeight=%v", curHeight, htlcCsvMaturityHeight) - numBlocks := int(htlcCsvMaturityHeight - uint32(currentHeight) - 1) - ht.MineBlocks(numBlocks) + numBlocks := int(htlcCsvMaturityHeight - uint32(curHeight) - 1) + ht.MineEmptyBlocks(numBlocks) - ht.AssertNumPendingSweeps(alice, numInvoices+1) + ht.AssertNumPendingSweeps(alice, numInvoices) - // Restart Alice to ensure that she can recover from a failure. - // - // TODO(yy): Skip this step for neutrino as it cannot recover the - // sweeping txns from the mempool. We need to also store the txns in - // the sweeper store to make it work for the neutrino case. - if !ht.IsNeutrinoBackend() { - ht.RestartNode(alice) - } + // Fetch the htlc sweep transaction from the mempool. + htlcSweepTx := ht.GetNumTxsFromMempool(1)[0] + htlcSweepTxid := htlcSweepTx.TxHash() // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel @@ -657,60 +1202,15 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, }, defaultTimeout) require.NoError(ht, err, "timeout while checking force closed channel") - ht.AssertNumPendingSweeps(alice, numInvoices+1) - - // Wait for the single sweep txn to appear in the mempool. - htlcSweepTxid := ht.AssertNumTxsInMempool(1)[0] - - // Fetch the htlc sweep transaction from the mempool. - htlcSweepTx := ht.GetRawTransaction(htlcSweepTxid) + ht.AssertNumPendingSweeps(alice, numInvoices) // Ensure the htlc sweep transaction only has one input for each htlc // Alice extended before force closing. - require.Len(ht, htlcSweepTx.MsgTx().TxIn, numInvoices, + require.Len(ht, htlcSweepTx.TxIn, numInvoices, "htlc transaction has wrong num of inputs") - require.Len(ht, htlcSweepTx.MsgTx().TxOut, 1, + require.Len(ht, htlcSweepTx.TxOut, 1, "htlc sweep transaction should have one output") - // Ensure that each output spends from exactly one htlc timeout output. - for _, txIn := range htlcSweepTx.MsgTx().TxIn { - outpoint := txIn.PreviousOutPoint - - // Check that the input is a confirmed htlc timeout txn. - _, ok := htlcTxOutpointSet[outpoint] - require.Truef(ht, ok, "htlc sweep output not spending from "+ - "htlc tx, instead spending output %v", outpoint) - - // Increment our count for how many times this output was spent. - htlcTxOutpointSet[outpoint]++ - - // Check that each is only spent once. - require.Lessf(ht, htlcTxOutpointSet[outpoint], 2, - "htlc sweep tx has multiple spends from "+ - "outpoint %v", outpoint) - - // Since we have now swept our htlc timeout tx, we expect to - // have timeout resolutions for each of our htlcs. - output := txIn.PreviousOutPoint - aliceReports[output.String()] = &lnrpc.Resolution{ - ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, - Outcome: lnrpc.ResolutionOutcome_TIMEOUT, - SweepTxid: htlcSweepTx.Hash().String(), - Outpoint: &lnrpc.OutPoint{ - TxidBytes: output.Hash[:], - TxidStr: output.Hash.String(), - OutputIndex: output.Index, - }, - AmountSat: htlcLessFees, - } - } - - // Check that each HTLC output was spent exactly once. - for op, num := range htlcTxOutpointSet { - require.Equalf(ht, 1, num, - "HTLC outpoint:%s was spent times", op) - } - // Check that we can find the htlc sweep in our set of sweeps using // the verbose output of the listsweeps output. ht.AssertSweepFound(alice, htlcSweepTxid.String(), true, 0) @@ -720,6 +1220,31 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // begins watching that txid after restarting. ht.RestartNode(alice) + // Once restarted, Alice will offer her HTLC outputs to the sweeper + // again. The new sweeping tx will replace the old one. We check it by + // asserting the old one no longer exists in the mempool. + ht.AssertTxNotInMempool(htlcSweepTxid) + htlcSweepTxid = ht.AssertNumTxsInMempool(1)[0] + + // Ensure that each output spends from exactly one htlc timeout output. + for _, txIn := range htlcSweepTx.TxIn { + op := txIn.PreviousOutPoint + + // Since we have now swept our htlc timeout tx, we expect to + // have timeout resolutions for each of our htlcs. + aliceReports[op.String()] = &lnrpc.Resolution{ + ResolutionType: lnrpc.ResolutionType_OUTGOING_HTLC, + Outcome: lnrpc.ResolutionOutcome_TIMEOUT, + SweepTxid: htlcSweepTxid.String(), + Outpoint: &lnrpc.OutPoint{ + TxidBytes: op.Hash[:], + TxidStr: op.Hash.String(), + OutputIndex: op.Index, + }, + AmountSat: htlcLessFees, + } + } + // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel // as pending force closed. @@ -752,6 +1277,7 @@ func runChannelForceClosureTest(ht *lntest.HarnessTest, // up within the pending channels RPC. err = wait.NoError(func() error { ht.AssertNumPendingForceClose(alice, 0) + // In addition to there being no pending channels, we verify // that pending channels does not report any money still in // limbo. diff --git a/itest/lnd_sweep_test.go b/itest/lnd_sweep_test.go index b911b4a27..377915412 100644 --- a/itest/lnd_sweep_test.go +++ b/itest/lnd_sweep_test.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" @@ -2081,3 +2082,255 @@ func testBumpForceCloseFee(ht *lntest.HarnessTest) { // This is needed to clean up the mempool. ht.MineBlocksAndAssertNumTxes(1, 2) } + +// testFeeReplacement tests that when a sweeping txns aggregates multiple +// outgoing HTLCs, and one of the outgoing HTLCs has been spent via the direct +// preimage path by the remote peer, the remaining HTLCs will be grouped again +// and swept immediately. +// +// Setup: +// 1. Fund Alice with 1 UTXOs - she only needs one for the funding process, +// 2. Fund Bob with 3 UTXOs - he needs one for the funding process, one for +// his CPFP anchor sweeping, and one for sweeping his outgoing HTLCs. +// 3. Create a linear network from Alice -> Bob -> Carol. +// 4. Alice pays two invoices to Carol, with Carol holding the settlement. +// 5. Bob goes offline. +// 6. Carol settles one of the invoices, so she can later spend Bob's outgoing +// HTLC via the direct preimage path. +// 7. Carol goes offline and Bob comes online. +// 8. Mine enough blocks so Bob will force close Bob=>Carol to claim his +// outgoing HTLCs. +// 9. Carol comes online, sweeps one of Bob's outgoing HTLCs and it confirms. +// 10. Bob creates a new sweeping tx to sweep his remaining HTLC with a +// previous fee rate. +// +// Test: +// 1. Bob will immediately sweeps his remaining outgoing HTLC given that the +// other one has been spent by Carol. +// 2. Bob's new sweeping tx will use the previous fee rate instead of +// initializing a new starting fee rate. +func testFeeReplacement(ht *lntest.HarnessTest) { + // Set the min relay feerate to be 10 sat/vbyte so the non-CPFP anchor + // is never swept. + // + // TODO(yy): delete this line once the normal anchor sweeping is + // removed. + ht.SetMinRelayFeerate(10_000) + + // Setup testing params. + // + // Invoice is 100k sats. + invoiceAmt := btcutil.Amount(100_000) + + // Alice will send two payments. + numPayments := 2 + + // Use the smallest CLTV so we can mine fewer blocks. + cltvDelta := routing.MinCLTVDelta + + // Prepare params. + cfg := []string{ + "--protocol.anchors", + // Use a small CLTV to mine less blocks. + fmt.Sprintf("--bitcoin.timelockdelta=%d", cltvDelta), + // Use a very large CSV, this way to_local outputs are never + // swept so we can focus on testing HTLCs. + fmt.Sprintf("--bitcoin.defaultremotedelay=%v", cltvDelta*10), + } + cfgs := [][]string{cfg, cfg, cfg} + + openChannelParams := lntest.OpenChannelParams{ + Amt: invoiceAmt * 100, + } + + // Create a three hop network: Alice -> Bob -> Carol. + _, nodes := ht.CreateSimpleNetwork(cfgs, openChannelParams) + + // Unwrap the results. + alice, bob, carol := nodes[0], nodes[1], nodes[2] + + // Bob needs two more wallet utxos: + // - when sweeping anchors, he needs one utxo for each sweep. + // - when sweeping HTLCs, he needs one utxo for each sweep. + numUTXOs := 2 + + // Bob should have enough wallet UTXOs here to sweep the HTLC in the + // end of this test. However, due to a known issue, Bob's wallet may + // report there's no UTXO available. For details, + // - https://github.com/lightningnetwork/lnd/issues/8786 + // + // TODO(yy): remove this extra UTXO once the issue is resolved. + numUTXOs++ + + // For neutrino backend, we need two more UTXOs for Bob to create his + // sweeping txns. + if ht.IsNeutrinoBackend() { + numUTXOs += 2 + } + + ht.FundNumCoins(bob, numUTXOs) + + // We also give Carol 2 coins to create her sweeping txns. + ht.FundNumCoins(carol, 2) + + // Create numPayments HTLCs on Bob's incoming and outgoing channels. + preimages := make([][]byte, 0, numPayments) + streams := make([]rpc.SingleInvoiceClient, 0, numPayments) + for i := 0; i < numPayments; i++ { + // Create the preimage. + var preimage lntypes.Preimage + copy(preimage[:], ht.Random32Bytes()) + payHashHold := preimage.Hash() + preimages = append(preimages, preimage[:]) + + // Subscribe the invoices. + stream := carol.RPC.SubscribeSingleInvoice(payHashHold[:]) + streams = append(streams, stream) + + // Carol create the hold invoice. + invoiceReqHold := &invoicesrpc.AddHoldInvoiceRequest{ + Value: int64(invoiceAmt), + CltvExpiry: finalCltvDelta, + Hash: payHashHold[:], + } + invoiceHold := carol.RPC.AddHoldInvoice(invoiceReqHold) + + // Let Alice pay the invoices. + req := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoiceHold.PaymentRequest, + TimeoutSeconds: 60, + FeeLimitMsat: noFeeLimitMsat, + } + + // Assert the payments are inflight. + ht.SendPaymentAndAssertStatus( + alice, req, lnrpc.Payment_IN_FLIGHT, + ) + + // Wait for Carol to mark invoice as accepted. There is a small + // gap to bridge between adding the htlc to the channel and + // executing the exit hop logic. + ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED) + } + + // At this point, all 3 nodes should now have an active channel with + // the created HTLCs pending on all of them. + // + // Alice should have numPayments outgoing HTLCs on channel Alice -> Bob. + ht.AssertNumActiveHtlcs(alice, numPayments) + + // Bob should have 2 * numPayments HTLCs, + // - numPayments incoming HTLCs on channel Alice -> Bob. + // - numPayments outgoing HTLCs on channel Bob -> Carol. + ht.AssertNumActiveHtlcs(bob, numPayments*2) + + // Carol should have numPayments incoming HTLCs on channel Bob -> Carol. + ht.AssertNumActiveHtlcs(carol, numPayments) + + // Suspend Bob so he won't get the preimage from Carol. + restartBob := ht.SuspendNode(bob) + + // Carol settles the first invoice. + carol.RPC.SettleInvoice(preimages[0]) + ht.AssertInvoiceState(streams[0], lnrpc.Invoice_SETTLED) + + // Carol goes offline so the preimage won't be sent to Bob. + restartCarol := ht.SuspendNode(carol) + + // Bob comes online. + require.NoError(ht, restartBob()) + + // We'll now mine enough blocks to trigger Bob to force close channel + // Bob->Carol due to his outgoing HTLC is about to timeout. With the + // default outgoing broadcast delta of zero, this will be the same + // height as the outgoing htlc's expiry height. + numBlocks := padCLTV(uint32( + finalCltvDelta - lncfg.DefaultOutgoingBroadcastDelta, + )) + ht.MineEmptyBlocks(int(numBlocks)) + + // Assert Bob's force closing tx has been broadcast. We should see two + // txns in the mempool: + // 1. Bob's force closing tx. + // 2. Bob's anchor sweeping tx CPFPing the force close tx. + ht.AssertForceCloseAndAnchorTxnsInMempool() + + // Mine a block to confirm Bob's force close tx and anchor sweeping tx + // so we can focus on testing his outgoing HTLCs. + ht.MineBlocksAndAssertNumTxes(1, 2) + + // Bob should have numPayments pending sweep for the outgoing HTLCs. + ht.AssertNumPendingSweeps(bob, numPayments) + + // Bob should have one sweeping tx in the mempool, which sweeps all his + // outgoing HTLCs. + outgoingSweep0 := ht.GetNumTxsFromMempool(1)[0] + + // We now mine one empty block so Bob will perform one fee bump, after + // which his sweeping tx should be updated with a new fee rate. We do + // this so we can test later when Bob sweeps his remaining HTLC, the new + // sweeping tx will start with the current fee rate. + // + // Calculate Bob's initial sweeping fee rate. + initialFeeRate := ht.CalculateTxFeeRate(outgoingSweep0) + + // Mine one block to trigger Bob's RBF. + ht.MineEmptyBlocks(1) + + // Make sure Bob's old sweeping tx has been removed from the mempool. + ht.AssertTxNotInMempool(outgoingSweep0.TxHash()) + + // Get the feerate of Bob's current sweeping tx. + outgoingSweep1 := ht.GetNumTxsFromMempool(1)[0] + currentFeeRate := ht.CalculateTxFeeRate(outgoingSweep1) + + // Assert the Bob has updated the fee rate. + require.Greater(ht, currentFeeRate, initialFeeRate) + + delta := currentFeeRate - initialFeeRate + + // Check the shape of the sweeping tx - we expect it to be + // 3-input-3-output as a wallet utxo is used and a required output is + // made. + require.Len(ht, outgoingSweep1.TxIn, numPayments+1) + require.Len(ht, outgoingSweep1.TxOut, numPayments+1) + + // Restart Carol, once she is online, she will try to settle the HTLCs + // via the direct preimage spend. + require.NoError(ht, restartCarol()) + + // Carol should have 1 incoming HTLC and 1 anchor output to sweep. + ht.AssertNumPendingSweeps(carol, 2) + + // Assert Bob's sweeping tx has been replaced by Carol's. + ht.AssertTxNotInMempool(outgoingSweep1.TxHash()) + carolSweepTx := ht.GetNumTxsFromMempool(1)[0] + + // Assume the miner is now happy with Carol's fee, and it gets included + // in the next block. + ht.MineBlockWithTx(carolSweepTx) + + // Upon receiving the above block, Bob should immediately create a + // sweeping tx and broadcast it using the remaining outgoing HTLC. + // + // Bob should have numPayments-1 pending sweep for the outgoing HTLCs. + ht.AssertNumPendingSweeps(bob, numPayments-1) + + // Assert Bob immediately sweeps his remaining HTLC with the previous + // fee rate. + outgoingSweep2 := ht.GetNumTxsFromMempool(1)[0] + + // Calculate the fee rate. + feeRate := ht.CalculateTxFeeRate(outgoingSweep2) + + // We expect the current fee rate to be equal to the last fee rate he + // used plus the delta, as we expect the fee rate to stay on the initial + // line given by his fee function. + expectedFeeRate := currentFeeRate + delta + require.InEpsilonf(ht, uint64(expectedFeeRate), + uint64(feeRate), 0.02, "want %d, got %d in tx=%v", + currentFeeRate, feeRate, outgoingSweep2.TxHash()) + + // Finally, clean the mempol. + ht.MineBlocksAndAssertNumTxes(1, 1) +} diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 620ad7554..6a796e22d 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -40,9 +40,13 @@ var ( // preparation, usually due to the output being dust. ErrTxNoOutput = errors.New("tx has no output") - // ErrThirdPartySpent is returned when a third party has spent the - // input in the sweeping tx. - ErrThirdPartySpent = errors.New("third party spent the output") + // ErrUnknownSpent is returned when an unknown tx has spent an input in + // the sweeping tx. + ErrUnknownSpent = errors.New("unknown spend of input") + + // ErrInputMissing is returned when a given input no longer exists, + // e.g., spending from an orphan tx. + ErrInputMissing = errors.New("input no longer exists") ) var ( @@ -81,10 +85,6 @@ const ( // bumper. In either case the inputs in this tx should be retried with // either a different grouping strategy or an increased budget. // - // NOTE: We also send this event when there's a third party spend - // event, and the sweeper will handle cleaning this up once it's - // confirmed. - // // TODO(yy): Remove the above usage once we remove sweeping non-CPFP // anchors. TxFailed @@ -95,6 +95,17 @@ const ( // TxConfirmed is sent when the tx is confirmed. TxConfirmed + // TxUnknownSpend is sent when at least one of the inputs is spent but + // not by the current sweeping tx, this can happen when, + // - a remote party has replaced our sweeping tx by spending the + // input(s), e.g., via the direct preimage spend on our outgoing HTLC. + // - a third party has replaced our sweeping tx, e.g., the anchor output + // after 16 blocks. + // - A previous sweeping tx has confirmed but the fee bumper is not + // aware of it, e.g., a restart happens right after the sweeping tx is + // broadcast and confirmed. + TxUnknownSpend + // TxFatal is sent when the inputs in this tx cannot be retried. Txns // will end up in this state if they have encountered a non-fee related // error, which means they cannot be retried with increased budget. @@ -115,6 +126,8 @@ func (e BumpEvent) String() string { return "Replaced" case TxConfirmed: return "Confirmed" + case TxUnknownSpend: + return "UnknownSpend" case TxFatal: return "Fatal" default: @@ -264,6 +277,10 @@ type BumpResult struct { // Err is the error that occurred during the broadcast. Err error + // SpentInputs are the inputs spent by another tx which caused the + // current tx to be failed. + SpentInputs map[wire.OutPoint]*wire.MsgTx + // requestID is the ID of the request that created this record. requestID uint64 } @@ -280,7 +297,8 @@ func (b *BumpResult) String() string { // Validate validates the BumpResult so it's safe to use. func (b *BumpResult) Validate() error { - isFailureEvent := b.Event == TxFailed || b.Event == TxFatal + isFailureEvent := b.Event == TxFailed || b.Event == TxFatal || + b.Event == TxUnknownSpend // Every result must have a tx except the fatal or failed case. if b.Tx == nil && !isFailureEvent { @@ -530,7 +548,7 @@ func (t *TxPublisher) createRBFCompliantTx( for { // Create a new tx with the given fee rate and check its // mempool acceptance. - sweepCtx, err := t.createAndCheckTx(r.req, f) + sweepCtx, err := t.createAndCheckTx(r) switch { case err == nil: @@ -593,8 +611,9 @@ func (t *TxPublisher) createRBFCompliantTx( // script, and the fee rate. In addition, it validates the tx's mempool // acceptance before returning a tx that can be published directly, along with // its fee. -func (t *TxPublisher) createAndCheckTx(req *BumpRequest, - f FeeFunction) (*sweepTxCtx, error) { +func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { + req := r.req + f := r.feeFunction // Create the sweep tx with max fee rate of 0 as the fee function // guarantees the fee rate used here won't exceed the max fee rate. @@ -638,10 +657,68 @@ func (t *TxPublisher) createAndCheckTx(req *BumpRequest, return sweepCtx, nil } + // If the inputs are spent by another tx, we will exit with the latest + // sweepCtx and an error. + if errors.Is(err, chain.ErrMissingInputs) { + log.Debugf("Tx %v missing inputs, it's likely the input has "+ + "been spent by others", sweepCtx.tx.TxHash()) + + // Make sure to update the record with the latest attempt. + t.updateRecord(r, sweepCtx) + + return sweepCtx, ErrInputMissing + } + return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w", sweepCtx.tx.TxHash(), err) } +// handleMissingInputs handles the case when the chain backend reports back a +// missing inputs error, which could happen when one of the input has been spent +// in another tx, or the input is referencing an orphan. When the input is +// spent, it will be handled via the TxUnknownSpend flow by creating a +// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned. +func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult { + // Get the spending txns. + spends := t.getSpentInputs(r) + + // Attach the spending txns. + r.spentInputs = spends + + // If there are no spending txns found and the input is missing, the + // input is referencing an orphan tx that's no longer valid, e.g., the + // spending the anchor output from the remote commitment after the local + // commitment has confirmed. In this case we will mark it as fatal and + // exit. + if len(spends) == 0 { + log.Warnf("Failing record=%v: found orphan inputs: %v\n", + r.requestID, inputTypeSummary(r.req.Inputs)) + + // Create a result that will be sent to the resultChan which is + // listened by the caller. + result := &BumpResult{ + Event: TxFatal, + Tx: r.tx, + requestID: r.requestID, + Err: ErrInputMissing, + } + + return result + } + + // Check that the spending tx matches the sweeping tx - given that the + // current sweeping tx has been failed due to missing inputs, the + // spending tx must be a different tx, thus it should NOT be matched. We + // perform a sanity check here to catch the unexpected state. + if !t.isUnknownSpent(r, spends) { + log.Errorf("Sweeping tx %v has missing inputs, yet the "+ + "spending tx is the sweeping tx itself: %v", + r.tx.TxHash(), r.spentInputs) + } + + return t.createUnknownSpentBumpResult(r) +} + // broadcast takes a monitored tx and publishes it to the network. Prior to the // broadcast, it will subscribe the tx's confirmation notification and attach // the event channel to the record. Any broadcast-related errors will not be @@ -754,6 +831,11 @@ func (t *TxPublisher) removeResult(result *BumpResult) { log.Debugf("Removing monitor record=%v due to fatal err: %v", id, result.Err) + case TxUnknownSpend: + // Remove the record if there's an unknown spend. + log.Debugf("Removing monitor record=%v due unknown spent: "+ + "%v", id, result.Err) + // Do nothing if it's neither failed or confirmed. default: log.Tracef("Skipping record removal for id=%v, event=%v", id, @@ -797,6 +879,10 @@ type monitorRecord struct { // outpointToTxIndex is a map of outpoint to tx index. outpointToTxIndex map[wire.OutPoint]int + + // spentInputs are the inputs spent by another tx which caused the + // current tx failed. + spentInputs map[wire.OutPoint]*wire.MsgTx } // Start starts the publisher by subscribing to block epoch updates and kicking @@ -878,8 +964,6 @@ func (t *TxPublisher) processRecords() { // failedRecords stores a map of records which has inputs being spent // by a third party. - // - // NOTE: this is only used for neutrino backend. failedRecords := make(map[uint64]*monitorRecord) // initialRecords stores a map of records which are being created and @@ -889,32 +973,58 @@ func (t *TxPublisher) processRecords() { // visitor is a helper closure that visits each record and divides them // into two groups. visitor := func(requestID uint64, r *monitorRecord) error { - if r.tx == nil { - initialRecords[requestID] = r - return nil - } + log.Tracef("Checking monitor recordID=%v", requestID) - log.Tracef("Checking monitor recordID=%v for tx=%v", requestID, - r.tx.TxHash()) + // Check whether the inputs have already been spent. + spends := t.getSpentInputs(r) - // If the tx is already confirmed, we can stop monitoring it. - if t.isConfirmed(r.tx.TxHash()) { + // If the any of the inputs has been spent, the record will be + // marked as failed or confirmed. + if len(spends) != 0 { + // Attach the spending txns. + r.spentInputs = spends + + // When tx is nil, it means we haven't tried the initial + // broadcast yet the input is already spent. This could + // happen when the node shuts down, a previous sweeping + // tx confirmed, then the node comes back online and + // reoffers the inputs. Another case is the remote node + // spends the input quickly before we even attempt the + // sweep. In either case we will fail the record and let + // the sweeper handles it. + if r.tx == nil { + failedRecords[requestID] = r + return nil + } + + // Check whether the inputs has been spent by a unknown + // tx. + if t.isUnknownSpent(r, spends) { + failedRecords[requestID] = r + + // Move to the next record. + return nil + } + + // The tx is ours, we can move it to the confirmed queue + // and stop monitoring it. confirmedRecords[requestID] = r // Move to the next record. return nil } - // Check whether the inputs has been spent by a third party. - // - // NOTE: this check is only done for neutrino backend. - if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) { - failedRecords[requestID] = r + // This is the first time we see this record, so we put it in + // the initial queue. + if r.tx == nil { + initialRecords[requestID] = r - // Move to the next record. return nil } + // We can only get here when the inputs are not spent and a + // previous sweeping tx has been attempted. In this case we will + // perform an RBF on it in the current block. feeBumpRecords[requestID] = r // Return nil to move to the next record. @@ -932,7 +1042,6 @@ func (t *TxPublisher) processRecords() { // For records that are confirmed, we'll notify the caller about this // result. for _, r := range confirmedRecords { - log.Debugf("Tx=%v is confirmed", r.tx.TxHash()) t.wg.Add(1) go t.handleTxConfirmed(r) } @@ -942,7 +1051,6 @@ func (t *TxPublisher) processRecords() { // For records that are not confirmed, we perform a fee bump if needed. for _, r := range feeBumpRecords { - log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash()) t.wg.Add(1) go t.handleFeeBumpTx(r, currentHeight) } @@ -950,10 +1058,8 @@ func (t *TxPublisher) processRecords() { // For records that are failed, we'll notify the caller about this // result. for _, r := range failedRecords { - log.Debugf("Tx=%v has inputs been spent by a third party, "+ - "failing it now", r.tx.TxHash()) t.wg.Add(1) - go t.handleThirdPartySpent(r) + go t.handleUnknownSpent(r) } } @@ -964,6 +1070,8 @@ func (t *TxPublisher) processRecords() { func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { defer t.wg.Done() + log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash()) + // Create a result that will be sent to the resultChan which is // listened by the caller. result := &BumpResult{ @@ -980,27 +1088,36 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { // handleInitialTxError takes the error from `initializeTx` and decides the // bump event. It will construct a BumpResult and handles it. -func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { - // We now decide what type of event to send. - var event BumpEvent +func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) { + // Create a bump result to be sent to the sweeper. + result := &BumpResult{ + Err: err, + requestID: r.requestID, + } + // We now decide what type of event to send. switch { // When the error is due to a dust output, we'll send a TxFailed so // these inputs can be retried with a different group in the next // block. case errors.Is(err, ErrTxNoOutput): - event = TxFailed + result.Event = TxFailed // When the error is due to budget being used up, we'll send a TxFailed // so these inputs can be retried with a different group in the next // block. case errors.Is(err, ErrMaxPosition): - event = TxFailed + result.Event = TxFailed // When the error is due to zero fee rate delta, we'll send a TxFailed // so these inputs can be retried in the next block. case errors.Is(err, ErrZeroFeeRateDelta): - event = TxFailed + result.Event = TxFailed + + // When there are missing inputs, we'll create a TxUnknownSpend bump + // result here so the rest of the inputs can be retried. + case errors.Is(err, ErrInputMissing): + result = t.handleMissingInputs(r) // Otherwise this is not a fee-related error and the tx cannot be // retried. In that case we will fail ALL the inputs in this tx, which @@ -1010,13 +1127,7 @@ func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { // TODO(yy): Find out which input is causing the failure and fail that // one only. default: - event = TxFatal - } - - result := &BumpResult{ - Event: event, - Err: err, - requestID: requestID, + result.Event = TxFatal } t.handleResult(result) @@ -1044,7 +1155,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { log.Errorf("Initial broadcast failed: %v", err) // We now handle the initialization error and exit. - t.handleInitialTxError(r.requestID, err) + t.handleInitialTxError(r, err) return } @@ -1073,6 +1184,9 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { defer t.wg.Done() + log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(), + r.requestID) + oldTxid := r.tx.TxHash() // Get the current conf target for this record. @@ -1110,29 +1224,93 @@ func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { }) } -// handleThirdPartySpent is called when the inputs in an unconfirmed tx is -// spent. It will notify the subscriber then remove the record from the maps -// and send a TxFailed event to the subscriber. +// handleUnknownSpent is called when the inputs are spent by a unknown tx. It +// will notify the subscriber then remove the record from the maps and send a +// TxUnknownSpend event to the subscriber. // // NOTE: Must be run as a goroutine to avoid blocking on sending the result. -func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { +func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) { defer t.wg.Done() - // Create a result that will be sent to the resultChan which is - // listened by the caller. - // - // TODO(yy): create a new state `TxThirdPartySpent` to notify the - // sweeper to remove the input, hence moving the monitoring of inputs - // spent inside the fee bumper. + log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+ + "bumper, failing it now:\n%v", r.requestID, + inputTypeSummary(r.req.Inputs)) + + // Create a result that will be sent to the resultChan which is listened + // by the caller. + result := t.createUnknownSpentBumpResult(r) + + // Notify the sweeper about this result in the end. + t.handleResult(result) +} + +// createUnknownSpentBumpResult creates and returns a BumpResult given the +// monitored record has unknown spends. +func (t *TxPublisher) createUnknownSpentBumpResult( + r *monitorRecord) *BumpResult { + + // Create a result that will be sent to the resultChan which is listened + // by the caller. result := &BumpResult{ - Event: TxFailed, - Tx: r.tx, - requestID: r.requestID, - Err: ErrThirdPartySpent, + Event: TxUnknownSpend, + Tx: r.tx, + requestID: r.requestID, + Err: ErrUnknownSpent, + SpentInputs: r.spentInputs, } - // Notify that this tx is confirmed and remove the record from the map. - t.handleResult(result) + // Get the fee function, which will be used to decided the next fee rate + // to use if the sweeper decides to retry sweeping this input. + feeFunc := r.feeFunction + + // When the record is failed before the initial broadcast is attempted, + // it will have a nil fee func. In this case, we'll create the fee func + // here. + // + // NOTE: Since the current record is failed and will be deleted, we + // don't need to update the record on this fee function. We only need + // the fee rate data so the sweeper can pick up where we left off. + if feeFunc == nil { + f, err := t.initializeFeeFunction(r.req) + // TODO(yy): The only error we would receive here is when the + // pkScript is not recognized by the weightEstimator. What we + // should do instead is to check the pkScript immediately after + // receiving a sweep request so we don't need to check it again, + // which will also save us from error checking from several + // callsites. + if err != nil { + log.Errorf("Failed to create fee func for record %v: "+ + "%v", r.requestID, err) + + // Overwrite the event and error so the sweeper will + // remove this input. + result.Event = TxFatal + result.Err = err + + return result + } + + feeFunc = f + } + + // Since the sweeping tx has been replaced by another party's tx, we + // missed this block window to increase its fee rate. To make sure the + // fee rate stays in the initial line, we now ask the fee function to + // give us the next fee rate as if the sweeping tx were RBFed. This new + // fee rate will be used as the starting fee rate if the upper system + // decides to continue sweeping the rest of the inputs. + _, err := feeFunc.Increment() + if err != nil { + // The fee function has reached its max position - nothing we + // can do here other than letting the user increase the budget. + log.Errorf("Failed to calculate the next fee rate for "+ + "Record(%v): %v", r.requestID, err) + } + + // Attach the new fee rate to be used for the next sweeping attempt. + result.FeeRate = feeFunc.FeeRate() + + return result } // createAndPublishTx creates a new tx with a higher fee rate and publishes it @@ -1149,48 +1327,12 @@ func (t *TxPublisher) createAndPublishTx( // NOTE: The fee function is expected to have increased its returned // fee rate after calling the SkipFeeBump method. So we can use it // directly here. - sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction) + sweepCtx, err := t.createAndCheckTx(r) - // If the error is fee related, we will return no error and let the fee - // bumper retry it at next block. - // - // NOTE: we can check the RBF error here and ask the fee function to - // recalculate the fee rate. However, this would defeat the purpose of - // using a deadline based fee function: - // - if the deadline is far away, there's no rush to RBF the tx. - // - if the deadline is close, we expect the fee function to give us a - // higher fee rate. If the fee rate cannot satisfy the RBF rules, it - // means the budget is not enough. - if errors.Is(err, chain.ErrInsufficientFee) || - errors.Is(err, lnwallet.ErrMempoolFee) { - - log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) - return fn.None[BumpResult]() - } - - // If the error is not fee related, we will return a `TxFailed` event - // so this input can be retried. + // If there's an error creating the replacement tx, we need to abort the + // flow and handle it. if err != nil { - // If the tx doesn't not have enought budget, we will return a - // result so the sweeper can handle it by re-clustering the - // utxos. - if errors.Is(err, ErrNotEnoughBudget) { - log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), - err) - } else { - // Otherwise, an unexpected error occurred, we will - // fail the tx and let the sweeper retry the whole - // process. - log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), - err) - } - - return fn.Some(BumpResult{ - Event: TxFailed, - Tx: oldTx, - Err: err, - requestID: r.requestID, - }) + return t.handleReplacementTxError(r, oldTx, err) } // The tx has been created without any errors, we now register a new @@ -1214,7 +1356,9 @@ func (t *TxPublisher) createAndPublishTx( if errors.Is(result.Err, chain.ErrInsufficientFee) || errors.Is(result.Err, lnwallet.ErrMempoolFee) { - log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), + result.Err) + return fn.None[BumpResult]() } @@ -1236,43 +1380,59 @@ func (t *TxPublisher) createAndPublishTx( return fn.Some(*result) } -// isConfirmed checks the btcwallet to see whether the tx is confirmed. -func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool { - details, err := t.cfg.Wallet.GetTransactionDetails(&txid) - if err != nil { - log.Warnf("Failed to get tx details for %v: %v", txid, err) - return false +// isUnknownSpent checks whether the inputs of the tx has already been spent by +// a tx not known to us. When a tx is not confirmed, yet its inputs has been +// spent, then it must be spent by a different tx other than the sweeping tx +// here. +func (t *TxPublisher) isUnknownSpent(r *monitorRecord, + spends map[wire.OutPoint]*wire.MsgTx) bool { + + txid := r.tx.TxHash() + + // Iterate all the spending txns and check if they match the sweeping + // tx. + for op, spendingTx := range spends { + spendingTxID := spendingTx.TxHash() + + // If the spending tx is the same as the sweeping tx then we are + // good. + if spendingTxID == txid { + continue + } + + log.Warnf("Detected unknown spend of input=%v in tx=%v", op, + spendingTx.TxHash()) + + return true } - return details.NumConfirmations > 0 + return false } -// isThirdPartySpent checks whether the inputs of the tx has already been spent -// by a third party. When a tx is not confirmed, yet its inputs has been spent, -// then it must be spent by a different tx other than the sweeping tx here. -// -// NOTE: this check is only performed for neutrino backend as it has no -// reliable way to tell a tx has been replaced. -func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, - inputs []input.Input) bool { +// getSpentInputs performs a non-blocking read on the spending subscriptions to +// see whether any of the monitored inputs has been spent. A map of inputs with +// their spending txns are returned if found. +func (t *TxPublisher) getSpentInputs( + r *monitorRecord) map[wire.OutPoint]*wire.MsgTx { - // Skip this check for if this is not neutrino backend. - if !t.isNeutrinoBackend() { - return false - } + // Create a slice to record the inputs spent. + spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs)) // Iterate all the inputs and check if they have been spent already. - for _, inp := range inputs { + for _, inp := range r.req.Inputs { op := inp.OutPoint() // For wallet utxos, the height hint is not set - we don't need // to monitor them for third party spend. + // + // TODO(yy): We need to properly lock wallet utxos before + // skipping this check as the same wallet utxo can be used by + // different sweeping txns. heightHint := inp.HeightHint() if heightHint == 0 { - log.Debugf("Skipped third party check for wallet "+ - "input %v", op) - - continue + heightHint = uint32(t.currentHeight.Load()) + log.Debugf("Checking wallet input %v using heightHint "+ + "%v", op, heightHint) } // If the input has already been spent after the height hint, a @@ -1283,7 +1443,8 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, if err != nil { log.Criticalf("Failed to register spend ntfn for "+ "input=%v: %v", op, err) - return false + + return nil } // Remove the subscription when exit. @@ -1294,28 +1455,24 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, case spend, ok := <-spendEvent.Spend: if !ok { log.Debugf("Spend ntfn for %v canceled", op) - return false - } - spendingTxID := spend.SpendingTx.TxHash() - - // If the spending tx is the same as the sweeping tx - // then we are good. - if spendingTxID == txid { continue } - log.Warnf("Detected third party spent of output=%v "+ - "in tx=%v", op, spend.SpendingTx.TxHash()) + spendingTx := spend.SpendingTx - return true + log.Debugf("Detected spent of input=%v in tx=%v", op, + spendingTx.TxHash()) + + spentInputs[op] = spendingTx // Move to the next input. default: + log.Tracef("Input %v not spent yet", op) } } - return false + return spentInputs } // calcCurrentConfTarget calculates the current confirmation target based on @@ -1670,3 +1827,59 @@ func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey, return txFee, changeOutsOpt, locktimeOpt, nil } + +// handleReplacementTxError handles the error returned from creating the +// replacement tx. It returns a BumpResult that should be notified to the +// sweeper. +func (t *TxPublisher) handleReplacementTxError(r *monitorRecord, + oldTx *wire.MsgTx, err error) fn.Option[BumpResult] { + + // If the error is fee related, we will return no error and let the fee + // bumper retry it at next block. + // + // NOTE: we can check the RBF error here and ask the fee function to + // recalculate the fee rate. However, this would defeat the purpose of + // using a deadline based fee function: + // - if the deadline is far away, there's no rush to RBF the tx. + // - if the deadline is close, we expect the fee function to give us a + // higher fee rate. If the fee rate cannot satisfy the RBF rules, it + // means the budget is not enough. + if errors.Is(err, chain.ErrInsufficientFee) || + errors.Is(err, lnwallet.ErrMempoolFee) { + + log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + return fn.None[BumpResult]() + } + + // At least one of the inputs is missing, which means it has already + // been spent by another tx and confirmed. In this case we will handle + // it by returning a TxUnknownSpend bump result. + if errors.Is(err, ErrInputMissing) { + log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err) + bumpResult := t.handleMissingInputs(r) + + return fn.Some(*bumpResult) + } + + // If the error is not fee related, we will return a `TxFailed` event + // so this input can be retried. + result := fn.Some(BumpResult{ + Event: TxFailed, + Tx: oldTx, + Err: err, + requestID: r.requestID, + }) + + // If the tx doesn't not have enought budget, we will return a result so + // the sweeper can handle it by re-clustering the utxos. + if errors.Is(err, ErrNotEnoughBudget) { + log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err) + return result + } + + // Otherwise, an unexpected error occurred, we will log an error and let + // the sweeper retry the whole process. + log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err) + + return result +} diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 0531dec8d..64695dbfe 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -55,7 +55,7 @@ func createTestInput(value int64, PubKey: testPubKey, }, }, - 0, + 1, nil, ) @@ -504,9 +504,14 @@ func TestCreateAndCheckTx(t *testing.T) { for _, tc := range testCases { tc := tc + r := &monitorRecord{ + req: tc.req, + feeFunction: m.feeFunc, + } + t.Run(tc.name, func(t *testing.T) { // Call the method under test. - _, err := tp.createAndCheckTx(tc.req, m.feeFunc) + _, err := tp.createAndCheckTx(r) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) @@ -1481,60 +1486,163 @@ func TestHandleFeeBumpTx(t *testing.T) { require.True(t, found) } -// TestProcessRecords validates processRecords behaves as expected. -func TestProcessRecords(t *testing.T) { +// TestProcessRecordsInitial validates processRecords behaves as expected when +// processing the initial broadcast. +func TestProcessRecordsInitial(t *testing.T) { t.Parallel() // Create a publisher using the mocks. tp, m := createTestPublisher(t) // Create testing objects. - requestID1 := uint64(1) - req1 := createTestBumpRequest() - tx1 := &wire.MsgTx{LockTime: 1} - txid1 := tx1.TxHash() + requestID := uint64(1) + req := createTestBumpRequest() + op := req.Inputs[0].OutPoint() - requestID2 := uint64(2) - req2 := createTestBumpRequest() - tx2 := &wire.MsgTx{LockTime: 2} - txid2 := tx2.TxHash() - - // Create a monitor record that's confirmed. - recordConfirmed := &monitorRecord{ - requestID: requestID1, - req: req1, - feeFunction: m.feeFunc, - tx: tx1, + // Mock RegisterSpendNtfn. + // + // Create the spending event that doesn't send an event. + se := &chainntnfs.SpendEvent{ + Cancel: func() {}, } - m.wallet.On("GetTransactionDetails", &txid1).Return( - &lnwallet.TransactionDetail{ - NumConfirmations: 1, - }, nil, - ).Once() + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() - // Create a monitor record that's not confirmed. We know it's not - // confirmed because the num of confirms is zero. - recordFeeBump := &monitorRecord{ - requestID: requestID2, - req: req2, - feeFunction: m.feeFunc, - tx: tx2, + // Create a monitor record that's broadcast the first time. + record := &monitorRecord{ + requestID: requestID, + req: req, } - m.wallet.On("GetTransactionDetails", &txid2).Return( - &lnwallet.TransactionDetail{ - NumConfirmations: 0, - }, nil, - ).Once() - m.wallet.On("BackEnd").Return("test-backend").Once() // Setup the initial publisher state by adding the records to the maps. - subscriberConfirmed := make(chan *BumpResult, 1) - tp.subscriberChans.Store(requestID1, subscriberConfirmed) - tp.records.Store(requestID1, recordConfirmed) + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) - subscriberReplaced := make(chan *BumpResult, 1) - tp.subscriberChans.Store(requestID2, subscriberReplaced) - tp.records.Store(requestID2, recordFeeBump) + // The following methods should only be called once when creating the + // initial broadcast tx. + // + // Mock the signer to always return a valid script. + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Once() + + // Mock the testmempoolaccept to return nil. + m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() + + // Mock the wallet to publish successfully. + m.wallet.On("PublishTransaction", + mock.Anything, mock.Anything).Return(nil).Once() + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // We expect the published tx to be notified back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxPublished. + require.Equal(t, TxPublished, result.Event) + + // Expect the tx to be set but not the replaced tx. + require.NotNil(t, result.Tx) + require.Nil(t, result.ReplacedTx) + + // No error should be set. + require.Nil(t, result.Err) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsInitialSpent validates processRecords behaves as expected +// when processing the initial broadcast when the input is spent. +func TestProcessRecordsInitialSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(tx) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's broadcast the first time. + record := &monitorRecord{ + requestID: requestID, + req: req, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // We expect the published tx to be notified back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxUnknownSpend. + require.Equal(t, TxUnknownSpend, result.Event) + + // Expect the tx and the replaced tx to be nil. + require.Nil(t, result.Tx) + require.Nil(t, result.ReplacedTx) + + // The error should be set. + require.ErrorIs(t, result.Err, ErrUnknownSpent) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsFeeBump validates processRecords behaves as expected when +// processing fee bump records. +func TestProcessRecordsFeeBump(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + // + // Create the spending event that doesn't send an event. + se := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's not confirmed. We know it's not + // confirmed because the `SpendEvent` is empty. + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) // Create a test feerate and return it from the mock fee function. feerate := chainfee.SatPerKWeight(1000) @@ -1560,40 +1668,141 @@ func TestProcessRecords(t *testing.T) { // Call processRecords and expect the results are notified back. tp.processRecords() - // We expect two results to be received. One for the confirmed tx and - // one for the replaced tx. - // - // Check the confirmed tx result. - select { - case <-time.After(time.Second): - t.Fatal("timeout waiting for subscriberConfirmed") - - case result := <-subscriberConfirmed: - // We expect the result to be TxConfirmed. - require.Equal(t, TxConfirmed, result.Event) - require.Equal(t, tx1, result.Tx) - - // No error should be set. - require.Nil(t, result.Err) - require.Equal(t, requestID1, result.requestID) - } - - // Now check the replaced tx result. + // We expect the replaced tx to be notified back. select { case <-time.After(time.Second): t.Fatal("timeout waiting for subscriberReplaced") - case result := <-subscriberReplaced: + case result := <-subscriber: // We expect the result to be TxReplaced. require.Equal(t, TxReplaced, result.Event) // The new tx and old tx should be properly set. - require.NotEqual(t, tx2, result.Tx) - require.Equal(t, tx2, result.ReplacedTx) + require.NotEqual(t, tx, result.Tx) + require.Equal(t, tx, result.ReplacedTx) // No error should be set. require.Nil(t, result.Err) - require.Equal(t, requestID2, result.requestID) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsConfirmed validates processRecords behaves as expected when +// processing confirmed records. +func TestProcessRecordsConfirmed(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(tx) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's confirmed. + recordConfirmed := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, recordConfirmed) + + // Create a test feerate and return it from the mock fee function. + feerate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feerate) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // Check the confirmed tx result. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxConfirmed. + require.Equal(t, TxConfirmed, result.Event) + require.Equal(t, tx, result.Tx) + + // No error should be set. + require.Nil(t, result.Err) + require.Equal(t, requestID, result.requestID) + } +} + +// TestProcessRecordsSpent validates processRecords behaves as expected when +// processing unknown spent records. +func TestProcessRecordsSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create testing objects. + requestID := uint64(1) + req := createTestBumpRequest() + tx := &wire.MsgTx{LockTime: 1} + op := req.Inputs[0].OutPoint() + + // Create a unknown tx. + txUnknown := &wire.MsgTx{LockTime: 2} + + // Mock RegisterSpendNtfn. + se := createTestSpendEvent(txUnknown) + m.notifier.On("RegisterSpendNtfn", + &op, mock.Anything, mock.Anything).Return(se, nil).Once() + + // Create a monitor record that's spent by txUnknown. + recordConfirmed := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + tx: tx, + } + + // Setup the initial publisher state by adding the records to the maps. + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, recordConfirmed) + + // Mock the fee function to increase feerate. + m.feeFunc.On("Increment").Return(true, nil).Once() + + // Create a test feerate and return it from the mock fee function. + feerate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feerate) + + // Call processRecords and expect the results are notified back. + tp.processRecords() + + // Check the unknown tx result. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber") + + case result := <-subscriber: + // We expect the result to be TxUnknownSpend. + require.Equal(t, TxUnknownSpend, result.Event) + require.Equal(t, tx, result.Tx) + + // We expect the fee rate to be updated. + require.Equal(t, feerate, result.FeeRate) + + // No error should be set. + require.ErrorIs(t, result.Err, ErrUnknownSpent) + require.Equal(t, requestID, result.requestID) } } @@ -1776,3 +1985,126 @@ func TestHandleInitialBroadcastFail(t *testing.T) { require.Equal(t, 0, tp.records.Len()) require.Equal(t, 0, tp.subscriberChans.Len()) } + +// TestHasInputsSpent checks the expected outpoint:tx map is returned. +func TestHasInputsSpent(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create mock inputs. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 1, + } + inp1 := &input.MockInput{} + heightHint1 := uint32(1) + defer inp1.AssertExpectations(t) + + op2 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 2, + } + inp2 := &input.MockInput{} + heightHint2 := uint32(2) + defer inp2.AssertExpectations(t) + + op3 := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 3, + } + walletInp := &input.MockInput{} + heightHint3 := uint32(0) + defer walletInp.AssertExpectations(t) + + // We expect all the inputs to call OutPoint and HeightHint. + inp1.On("OutPoint").Return(op1).Once() + inp2.On("OutPoint").Return(op2).Once() + walletInp.On("OutPoint").Return(op3).Once() + inp1.On("HeightHint").Return(heightHint1).Once() + inp2.On("HeightHint").Return(heightHint2).Once() + walletInp.On("HeightHint").Return(heightHint3).Once() + + // We expect the normal inputs to call SignDesc. + pkScript1 := []byte{1} + sd1 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript1, + }, + } + inp1.On("SignDesc").Return(sd1).Once() + + pkScript2 := []byte{1} + sd2 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript2, + }, + } + inp2.On("SignDesc").Return(sd2).Once() + + pkScript3 := []byte{3} + sd3 := &input.SignDescriptor{ + Output: &wire.TxOut{ + PkScript: pkScript3, + }, + } + walletInp.On("SignDesc").Return(sd3).Once() + + // Mock RegisterSpendNtfn. + // + // spendingTx1 is the tx spending op1. + spendingTx1 := &wire.MsgTx{} + se1 := createTestSpendEvent(spendingTx1) + m.notifier.On("RegisterSpendNtfn", + &op1, pkScript1, heightHint1).Return(se1, nil).Once() + + // Create the spending event that doesn't send an event. + se2 := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op2, pkScript2, heightHint2).Return(se2, nil).Once() + + se3 := &chainntnfs.SpendEvent{ + Cancel: func() {}, + } + m.notifier.On("RegisterSpendNtfn", + &op3, pkScript3, heightHint3).Return(se3, nil).Once() + + // Prepare the test inputs. + inputs := []input.Input{inp1, inp2, walletInp} + + // Prepare the test record. + record := &monitorRecord{ + req: &BumpRequest{ + Inputs: inputs, + }, + } + + // Call the method under test. + result := tp.getSpentInputs(record) + + // Assert the expected map is created. + expected := map[wire.OutPoint]*wire.MsgTx{ + op1: spendingTx1, + } + require.Equal(t, expected, result) +} + +// createTestSpendEvent creates a SpendEvent which places the specified tx in +// the channel, which can be read by a spending subscriber. +func createTestSpendEvent(tx *wire.MsgTx) *chainntnfs.SpendEvent { + // Create a monitor record that's confirmed. + spendDetails := chainntnfs.SpendDetail{ + SpendingTx: tx, + } + spendChan1 := make(chan *chainntnfs.SpendDetail, 1) + spendChan1 <- &spendDetails + + // Create the spend events. + return &chainntnfs.SpendEvent{ + Spend: spendChan1, + Cancel: func() {}, + } +} diff --git a/sweep/mock_test.go b/sweep/mock_test.go index f9471f22a..9312b7e28 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -24,10 +24,10 @@ func NewMockSweeperStore() *MockSweeperStore { } // IsOurTx determines whether a tx is published by us, based on its hash. -func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { +func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) bool { args := s.Called(hash) - return args.Bool(0), args.Error(1) + return args.Bool(0) } // StoreTx stores a tx we are about to publish. diff --git a/sweep/store.go b/sweep/store.go index cfab66381..e3e97908b 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -121,7 +121,7 @@ func deserializeTxRecord(r io.Reader) (*TxRecord, error) { type SweeperStore interface { // IsOurTx determines whether a tx is published by us, based on its // hash. - IsOurTx(hash chainhash.Hash) (bool, error) + IsOurTx(hash chainhash.Hash) bool // StoreTx stores a tx hash we are about to publish. StoreTx(*TxRecord) error @@ -276,15 +276,17 @@ func (s *sweeperStore) StoreTx(tr *TxRecord) error { }, func() {}) } -// IsOurTx determines whether a tx is published by us, based on its -// hash. -func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { +// IsOurTx determines whether a tx is published by us, based on its hash. +func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool { var ours bool err := kvdb.View(s.db, func(tx kvdb.RTx) error { txHashesBucket := tx.ReadBucket(txHashesBucketKey) + // If the root bucket cannot be found, we consider the tx to be + // not found in our db. if txHashesBucket == nil { - return errNoTxHashesBucket + log.Error("Tx hashes bucket not found in sweeper store") + return nil } ours = txHashesBucket.Get(hash[:]) != nil @@ -294,10 +296,10 @@ func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { ours = false }) if err != nil { - return false, err + return false } - return ours, nil + return ours } // ListSweeps lists all the sweep transactions we have in the sweeper store. diff --git a/sweep/store_test.go b/sweep/store_test.go index ea65b0177..e9d4db125 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -57,18 +57,15 @@ func TestStore(t *testing.T) { require.NoError(t, err) // Assert that both txes are recognized as our own. - ours, err := store.IsOurTx(tx1.TxHash()) - require.NoError(t, err) + ours := store.IsOurTx(tx1.TxHash()) require.True(t, ours, "expected tx to be ours") - ours, err = store.IsOurTx(tx2.TxHash()) - require.NoError(t, err) + ours = store.IsOurTx(tx2.TxHash()) require.True(t, ours, "expected tx to be ours") // An different hash should be reported as not being ours. var unknownHash chainhash.Hash - ours, err = store.IsOurTx(unknownHash) - require.NoError(t, err) + ours = store.IsOurTx(unknownHash) require.False(t, ours, "expected tx to not be ours") txns, err := store.ListSweeps() diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8f74a6da9..9684039e4 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1230,21 +1230,26 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { } // This is a new input, and we want to query the mempool to see if this - // input has already been spent. If so, we'll start the input with - // state Published and attach the RBFInfo. - state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint()) + // input has already been spent. If so, we'll start the input with the + // RBFInfo. + rbfInfo := s.decideRBFInfo(input.input.OutPoint()) // Create a new pendingInput and initialize the listeners slice with // the passed in result channel. If this input is offered for sweep // again, the result channel will be appended to this slice. pi = &SweeperInput{ - state: state, + state: Init, listeners: []chan Result{input.resultChan}, Input: input.input, params: input.params, rbf: rbfInfo, } + // Set the starting fee rate if a previous sweeping tx is found. + rbfInfo.WhenSome(func(info RBFInfo) { + pi.params.StartingFeeRate = fn.Some(info.FeeRate) + }) + // Set the acutal deadline height. pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr( s.calculateDefaultDeadline(pi), @@ -1267,7 +1272,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { ) if err != nil { err := fmt.Errorf("wait for spend: %w", err) - s.markInputFatal(pi, err) + s.markInputFatal(pi, nil, err) return err } @@ -1277,13 +1282,12 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { return nil } -// decideStateAndRBFInfo queries the mempool to see whether the given input has -// already been spent. If so, the state Published will be returned, otherwise -// state Init. When spent, it will query the sweeper store to fetch the fee -// info of the spending transction, and construct an RBFInfo based on it. -// Suppose an error occurs, fn.None is returned. -func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( - SweepState, fn.Option[RBFInfo]) { +// decideRBFInfo queries the mempool to see whether the given input has already +// been spent. When spent, it will query the sweeper store to fetch the fee info +// of the spending transction, and construct an RBFInfo based on it. Suppose an +// error occurs, fn.None is returned. +func (s *UtxoSweeper) decideRBFInfo( + op wire.OutPoint) fn.Option[RBFInfo] { // Check if we can find the spending tx of this input in mempool. txOption := s.mempoolLookup(op) @@ -1301,7 +1305,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( // - for neutrino we don't have a mempool. // - for btcd below v0.24.1 we don't have `gettxspendingprevout`. if tx == nil { - return Init, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Otherwise the input is already spent in the mempool, so eventually @@ -1313,12 +1317,15 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( txid := tx.TxHash() tr, err := s.cfg.Store.GetTx(txid) + log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(), + op) + // If the tx is not found in the store, it means it's not broadcast by // us, hence we can't find the fee info. This is fine as, later on when // this tx is confirmed, we will remove the input from our inputs. if errors.Is(err, ErrTxNotFound) { log.Warnf("Spending tx %v not found in sweeper store", txid) - return Published, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Exit if we get an db error. @@ -1326,7 +1333,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( log.Errorf("Unable to get tx %v from sweeper store: %v", txid, err) - return Published, fn.None[RBFInfo]() + return fn.None[RBFInfo]() } // Prepare the fee info and return it. @@ -1336,7 +1343,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) - return Published, rbf + return rbf } // handleExistingInput processes an input that is already known to the sweeper. @@ -1387,12 +1394,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) { // Query store to find out if we ever published this tx. spendHash := *spend.SpenderTxHash - isOurTx, err := s.cfg.Store.IsOurTx(spendHash) - if err != nil { - log.Errorf("cannot determine if tx %v is ours: %v", - spendHash, err) - return - } + isOurTx := s.cfg.Store.IsOurTx(spendHash) // If this isn't our transaction, it means someone else swept outputs // that we were attempting to sweep. This can happen for anchor outputs @@ -1482,12 +1484,17 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) { // markInputFatal marks the given input as fatal and won't be retried. It // will also notify all the subscribers of this input. -func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) { +func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx, + err error) { + log.Errorf("Failed to sweep input: %v, error: %v", pi, err) pi.state = Fatal - s.signalResult(pi, Result{Err: err}) + s.signalResult(pi, Result{ + Tx: tx, + Err: err, + }) } // updateSweeperInputs updates the sweeper's internal state and returns a map @@ -1819,7 +1826,7 @@ func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) { continue } - s.markInputFatal(input, err) + s.markInputFatal(input, nil, err) } } @@ -1847,6 +1854,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { case TxReplaced: return s.handleBumpEventTxReplaced(r) + // There are inputs being spent in a tx which the fee bumper doesn't + // understand. We will remove the tx from the sweeper db and mark the + // inputs as swept. + case TxUnknownSpend: + s.handleBumpEventTxUnknownSpend(r) + // There's a fatal error in creating the tx, we will remove the tx from // the sweeper db and mark the inputs as failed. case TxFatal: @@ -1861,20 +1874,139 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { // NOTE: It is enough to check the txid because the sweeper will create // outpoints which solely belong to the internal LND wallet. func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool { - found, err := s.cfg.Store.IsOurTx(op.Hash) - // In case there is an error fetching the transaction details from the - // sweeper store we assume the outpoint is still used by the sweeper - // (worst case scenario). - // - // TODO(ziggie): Ensure that confirmed outpoints are deleted from the - // bucket. - if err != nil && !errors.Is(err, errNoTxHashesBucket) { - log.Errorf("failed to fetch info for outpoint(%v:%d) "+ - "with: %v, we assume it is still in use by the sweeper", - op.Hash, op.Index, err) + return s.cfg.Store.IsOurTx(op.Hash) +} - return true +// markInputSwept marks the given input as swept by the tx. It will also notify +// all the subscribers of this input. +func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) { + log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(), + inp.state) + + inp.state = Swept + + // Signal result channels. + s.signalResult(inp, Result{ + Tx: tx, + }) + + // Remove all other inputs in this exclusive group. + if inp.params.ExclusiveGroup != nil { + s.removeExclusiveGroup(*inp.params.ExclusiveGroup) + } +} + +// handleUnknownSpendTx takes an input and its spending tx. If the spending tx +// cannot be found in the sweeper store, the input will be marked as fatal, +// otherwise it will be marked as swept. +func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { + op := inp.OutPoint() + txid := tx.TxHash() + + isOurTx := s.cfg.Store.IsOurTx(txid) + + // If this is our tx, it means it's a previous sweeping tx that got + // confirmed, which could happen when a restart happens during the + // sweeping process. + if isOurTx { + log.Debugf("Found our sweeping tx %v, marking input %v as "+ + "swept", txid, op) + + // We now use the spending tx to update the state of the inputs. + s.markInputSwept(inp, tx) + + return } - return found + // Since the input is spent by others, we now mark it as fatal and won't + // be retried. + s.markInputFatal(inp, tx, ErrRemoteSpend) + + log.Debugf("Removing descendant txns invalidated by (txid=%v): %v", + txid, lnutils.SpewLogClosure(tx)) + + // Construct a map of the inputs this transaction spends. + spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn)) + for _, txIn := range tx.TxIn { + spentInputs[txIn.PreviousOutPoint] = struct{}{} + } + + err := s.removeConflictSweepDescendants(spentInputs) + if err != nil { + log.Warnf("unable to remove descendant transactions "+ + "due to tx %v: ", txid) + } +} + +// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is +// unknown to the fee bumper. In the case when the sweeping tx has been replaced +// by another party with their tx being confirmed. It will retry sweeping the +// "good" inputs once the "bad" ones are kicked out. +func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) { + // Mark the inputs as publish failed, which means they will be retried + // later. + s.markInputsPublishFailed(r.set) + + // Get all the inputs that are not spent in the current sweeping tx. + spentInputs := r.result.SpentInputs + + // Create a slice to track inputs to be retried. + inputsToRetry := make([]input.Input, 0, len(r.set.Inputs())) + + // Iterate all the inputs found in this bump and mark the ones spent by + // the third party as failed. The rest of inputs will then be updated + // with a new fee rate and be retried immediately. + for _, inp := range r.set.Inputs() { + op := inp.OutPoint() + input, ok := s.inputs[op] + + // Wallet inputs are not tracked so we will not find them from + // the inputs map. + if !ok { + log.Debugf("Skipped marking input: %v not found in "+ + "pending inputs", op) + + continue + } + + // Check whether this input has been spent, if so we mark it as + // fatal or swept based on whether this is one of our previous + // sweeping txns, then move to the next. + tx, spent := spentInputs[op] + if spent { + s.handleUnknownSpendTx(input, tx) + + continue + } + + log.Debugf("Input(%v): updating params: starting fee rate "+ + "[%v -> %v], immediate [%v -> true]", op, + input.params.StartingFeeRate, r.result.FeeRate, + input.params.Immediate) + + // Update the input using the fee rate specified from the + // BumpResult, which should be the starting fee rate to use for + // the next sweeping attempt. + input.params.StartingFeeRate = fn.Some(r.result.FeeRate) + input.params.Immediate = true + inputsToRetry = append(inputsToRetry, input) + } + + // Exit early if there are no inputs to be retried. + if len(inputsToRetry) == 0 { + return + } + + log.Debugf("Retry sweeping inputs with updated params: %v", + inputTypeSummary(inputsToRetry)) + + // Get the latest inputs, which should put the PublishFailed inputs back + // to the sweeping queue. + inputs := s.updateSweeperInputs() + + // Immediately sweep the remaining inputs - the previous inputs should + // now be swept with the updated StartingFeeRate immediately. We may + // also include more inputs in the new sweeping tx if new ones with the + // same deadline are offered. + s.sweepPendingInputs(inputs) } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 40b25425d..d1fbee1f6 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -497,10 +497,9 @@ func TestUpdateSweeperInputs(t *testing.T) { require.Equal(expectedInputs, s.inputs) } -// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are -// returned based on whether this input can be found both in mempool and the -// sweeper store. -func TestDecideStateAndRBFInfo(t *testing.T) { +// TestDecideRBFInfo checks that the expected RBFInfo is returned based on +// whether this input can be found both in mempool and the sweeper store. +func TestDecideRBFInfo(t *testing.T) { t.Parallel() require := require.New(t) @@ -524,11 +523,9 @@ func TestDecideStateAndRBFInfo(t *testing.T) { mockMempool.On("LookupInputMempoolSpend", op).Return( fn.None[wire.MsgTx]()).Once() - // Since the mempool lookup failed, we exepect state Init and no - // RBFInfo. - state, rbf := s.decideStateAndRBFInfo(op) + // Since the mempool lookup failed, we expect no RBFInfo. + rbf := s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Init, state) // Mock the mempool lookup to return a tx three times as we are calling // attachAvailableRBFInfo three times. @@ -539,19 +536,17 @@ func TestDecideStateAndRBFInfo(t *testing.T) { // Mock the store to return an error saying the tx cannot be found. mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once() - // Although the db lookup failed, we expect the state to be Published. - state, rbf = s.decideStateAndRBFInfo(op) + // The db lookup failed, we expect no RBFInfo. + rbf = s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Published, state) // Mock the store to return a db error. dummyErr := errors.New("dummy error") mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() - // Although the db lookup failed, we expect the state to be Published. - state, rbf = s.decideStateAndRBFInfo(op) + // The db lookup failed, we expect no RBFInfo. + rbf = s.decideRBFInfo(op) require.True(rbf.IsNone()) - require.Equal(Published, state) // Mock the store to return a record. tr := &TxRecord{ @@ -561,7 +556,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) { mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once() // Call the method again. - state, rbf = s.decideStateAndRBFInfo(op) + rbf = s.decideRBFInfo(op) // Assert that the RBF info is returned. rbfInfo := fn.Some(RBFInfo{ @@ -570,9 +565,6 @@ func TestDecideStateAndRBFInfo(t *testing.T) { FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) require.Equal(rbfInfo, rbf) - - // Assert the state is updated. - require.Equal(Published, state) } // TestMarkInputFatal checks that the input is marked as expected. @@ -596,7 +588,7 @@ func TestMarkInputFailed(t *testing.T) { } // Call the method under test. - s.markInputFatal(pi, errors.New("dummy error")) + s.markInputFatal(pi, nil, errors.New("dummy error")) // Assert the state is updated. require.Equal(t, Fatal, pi.state) @@ -1199,3 +1191,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) { err = s.handleBumpEventTxFatal(resp) rt.NoError(err) } + +// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly +// marks an input as swept given the tx is ours. +func TestHandleUnknownSpendTxOurs(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx` +// correctly marks an input as fatal given the tx is not ours. +func TestHandleInputSpendTxThirdParty(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return false when calling IsOurTx. + store.On("IsOurTx", txid).Return(false).Once() + + // Mock `ListSweeps` to return an empty slice as we are testing the + // workflow here, not the method `removeConflictSweepDescendants`. + store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Fatal, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs +// are failed due to them being spent by another party. +func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp}) + + op := inp.OutPoint() + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true).Once() + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the +// inputs are retried after the bad inputs are filtered out. +func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock wallet and aggregator. + wallet := &MockWallet{} + defer wallet.AssertExpectations(t) + + aggregator := &mockUtxoAggregator{} + defer aggregator.AssertExpectations(t) + + publisher := &MockBumper{} + defer publisher.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Wallet: wallet, + Aggregator: aggregator, + Publisher: publisher, + GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] { + //nolint:ll + return fn.Ok(lnwallet.AddrWithKey{ + DeliveryAddress: testPubKey.SerializeCompressed(), + }) + }, + NoDeadlineConfTarget: uint32(DefaultDeadlineDelta), + Store: store, + }) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create mock inputs - inp1 will be the bad input, and inp2 will be + // retried. + inp1 := createMockInput(t, s, PendingPublish) + inp2 := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp1, inp2}) + + op1 := inp1.OutPoint() + op2 := inp2.OutPoint() + + inp2.On("RequiredLockTime").Return( + uint32(s.currentHeight), false).Once() + inp2.On("BlocksToMaturity").Return(uint32(0)).Once() + inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once() + + // Create a testing tx that spends inp1. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op1}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op1: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true).Once() + + // Mock the aggregator to return an empty slice as we are not testing + // the actual sweeping behavior. + aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{}) + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the first input is removed. + require.NotContains(t, s.inputs, op1) + + // Assert the state of the input is updated. + require.Equal(t, PublishFailed, s.inputs[op2].state) +}