From 39973687aa47c324ef42033a2d7e94b7a0e0d647 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 8 Aug 2023 14:04:22 +0800 Subject: [PATCH 01/11] sweep: add new method `handleNewInput` --- sweep/sweeper.go | 165 ++++++++++++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 73 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index cda02db9f..7347c37a8 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -607,80 +607,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { for { select { - // A new inputs is offered to the sweeper. We check to see if we - // are already trying to sweep this input and if not, set up a - // listener to spend and schedule a sweep. + // A new inputs is offered to the sweeper. We check to see if + // we are already trying to sweep this input and if not, set up + // a listener to spend and schedule a sweep. case input := <-s.newInputs: - outpoint := *input.input.OutPoint() - pendInput, pending := s.pendingInputs[outpoint] - if pending { - log.Debugf("Already pending input %v received", - outpoint) - - // Before updating the input details, check if - // an exclusive group was set, and if so, assume - // this input as finalized and remove all other - // inputs belonging to the same exclusive group. - var prevExclGroup *uint64 - if pendInput.params.ExclusiveGroup != nil && - input.params.ExclusiveGroup == nil { - - prevExclGroup = new(uint64) - *prevExclGroup = *pendInput.params.ExclusiveGroup - } - - // Update input details and sweep parameters. - // The re-offered input details may contain a - // change to the unconfirmed parent tx info. - pendInput.params = input.params - pendInput.Input = input.input - - // Add additional result channel to signal - // spend of this input. - pendInput.listeners = append( - pendInput.listeners, input.resultChan, - ) - - if prevExclGroup != nil { - s.removeExclusiveGroup(*prevExclGroup) - } - - continue - } - - // Create a new pendingInput and initialize the - // listeners slice with the passed in result channel. If - // this input is offered for sweep again, the result - // channel will be appended to this slice. - pendInput = &pendingInput{ - listeners: []chan Result{input.resultChan}, - Input: input.input, - minPublishHeight: bestHeight, - params: input.params, - } - s.pendingInputs[outpoint] = pendInput - log.Tracef("input %v added to pendingInputs", outpoint) - - // Start watching for spend of this input, either by us - // or the remote party. - cancel, err := s.waitForSpend( - outpoint, - input.input.SignDesc().Output.PkScript, - input.input.HeightHint(), - ) - if err != nil { - err := fmt.Errorf("wait for spend: %v", err) - s.signalAndRemove(&outpoint, Result{Err: err}) - continue - } - pendInput.ntfnRegCancel = cancel - - // Check to see if with this new input a sweep tx can be - // formed. - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } - log.Tracef("input %v scheduled", outpoint) + s.handleNewInput(input, bestHeight) // A spend of one of our inputs is detected. Signal sweep // results to the caller(s). @@ -1651,6 +1582,94 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) { return s.cfg.Store.ListSweeps() } +// handleNewInput processes a new input by registering spend notification and +// scheduling sweeping for it. +func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage, + bestHeight int32) { + + outpoint := *input.input.OutPoint() + pendInput, pending := s.pendingInputs[outpoint] + if pending { + log.Debugf("Already pending input %v received", outpoint) + + s.handleExistingInput(input, pendInput) + + return + } + + // Create a new pendingInput and initialize the listeners slice with + // the passed in result channel. If this input is offered for sweep + // again, the result channel will be appended to this slice. + pendInput = &pendingInput{ + listeners: []chan Result{input.resultChan}, + Input: input.input, + minPublishHeight: bestHeight, + params: input.params, + } + s.pendingInputs[outpoint] = pendInput + log.Tracef("input %v added to pendingInputs", outpoint) + + // Start watching for spend of this input, either by us or the remote + // party. + cancel, err := s.waitForSpend( + outpoint, input.input.SignDesc().Output.PkScript, + input.input.HeightHint(), + ) + if err != nil { + err := fmt.Errorf("wait for spend: %w", err) + s.signalAndRemove(&outpoint, Result{Err: err}) + + return + } + + pendInput.ntfnRegCancel = cancel + + // Check to see if with this new input a sweep tx can be formed. + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } + + log.Tracef("input %v scheduled", outpoint) +} + +// handleExistingInput processes an input that is already known to the sweeper. +// It will overwrite the params of the old input with the new ones. +func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, + oldInput *pendingInput) { + + // Before updating the input details, check if an exclusive group was + // set. In case the same input is registered again without an exclusive + // group set, the previous input and its sweep parameters are outdated + // hence need to be replaced. This scenario currently only happens for + // anchor outputs. When a channel is force closed, in the worst case 3 + // different sweeps with the same exclusive group are registered with + // the sweeper to bump the closing transaction (cpfp) when its time + // critical. Receiving an input which was already registered with the + // sweeper but now without an exclusive group means non of the previous + // inputs were used as CPFP, so we need to make sure we update the + // sweep parameters but also remove all inputs with the same exclusive + // group because the are outdated too. + var prevExclGroup *uint64 + if oldInput.params.ExclusiveGroup != nil && + input.params.ExclusiveGroup == nil { + + prevExclGroup = new(uint64) + *prevExclGroup = *oldInput.params.ExclusiveGroup + } + + // Update input details and sweep parameters. The re-offered input + // details may contain a change to the unconfirmed parent tx info. + oldInput.params = input.params + oldInput.Input = input.input + + // Add additional result channel to signal spend of this input. + oldInput.listeners = append(oldInput.listeners, input.resultChan) + + if prevExclGroup != nil { + s.removeExclusiveGroup(*prevExclGroup) + } +} + // init initializes the random generator for random input rescheduling. func init() { rand.Seed(time.Now().Unix()) From 39a7cc64346b26082d308dbfbf5c0f4457cc7efb Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 8 Aug 2023 14:09:57 +0800 Subject: [PATCH 02/11] sweep: rename `waitForSpend` to `monitorSpend` to avoid confusion The method `waitForSpend` makes it sounding like it's blocking while it's not, thus it's renamed to avoid confusion. --- sweep/sweeper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 7347c37a8..9d3731dee 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1348,9 +1348,9 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return nil } -// waitForSpend registers a spend notification with the chain notifier. It +// monitorSpend registers a spend notification with the chain notifier. It // returns a cancel function that can be used to cancel the registration. -func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint, +func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint, script []byte, heightHint uint32) (func(), error) { log.Tracef("Wait for spend of %v at heightHint=%v", @@ -1611,7 +1611,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage, // Start watching for spend of this input, either by us or the remote // party. - cancel, err := s.waitForSpend( + cancel, err := s.monitorSpend( outpoint, input.input.SignDesc().Output.PkScript, input.input.HeightHint(), ) From f1d0f9f74e0967401bc55e3239755d0e895b5c48 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Oct 2023 16:54:52 +0800 Subject: [PATCH 03/11] sweep: add new method `handleInputSpent` --- sweep/sweeper.go | 158 +++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 81 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 9d3731dee..70ca61fef 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -616,87 +616,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // A spend of one of our inputs is detected. Signal sweep // results to the caller(s). case spend := <-s.spendChan: - // For testing purposes. - if s.testSpendChan != nil { - s.testSpendChan <- *spend.SpentOutPoint - } - - // Query store to find out if we ever published this - // tx. - spendHash := *spend.SpenderTxHash - isOurTx, err := s.cfg.Store.IsOurTx(spendHash) - if err != nil { - log.Errorf("cannot determine if tx %v "+ - "is ours: %v", spendHash, err, - ) - continue - } - - // If this isn't our transaction, it means someone else - // swept outputs that we were attempting to sweep. This - // can happen for anchor outputs as well as justice - // transactions. In this case, we'll notify the wallet - // to remove any spends that a descent from this - // output. - if !isOurTx { - err := s.removeLastSweepDescendants( - spend.SpendingTx, - ) - if err != nil { - log.Warnf("unable to remove descendant "+ - "transactions due to tx %v: ", - spendHash) - } - - log.Debugf("Detected spend related to in flight inputs "+ - "(is_ours=%v): %v", - newLogClosure(func() string { - return spew.Sdump(spend.SpendingTx) - }), isOurTx, - ) - } - - // Signal sweep results for inputs in this confirmed - // tx. - for _, txIn := range spend.SpendingTx.TxIn { - outpoint := txIn.PreviousOutPoint - - // Check if this input is known to us. It could - // probably be unknown if we canceled the - // registration, deleted from pendingInputs but - // the ntfn was in-flight already. Or this could - // be not one of our inputs. - input, ok := s.pendingInputs[outpoint] - if !ok { - continue - } - - // Return either a nil or a remote spend result. - var err error - if !isOurTx { - err = ErrRemoteSpend - } - - // Signal result channels. - s.signalAndRemove(&outpoint, Result{ - Tx: spend.SpendingTx, - Err: err, - }) - - // Remove all other inputs in this exclusive - // group. - if input.params.ExclusiveGroup != nil { - s.removeExclusiveGroup( - *input.params.ExclusiveGroup, - ) - } - } - - // Now that an input of ours is spent, we can try to - // resweep the remaining inputs. - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } + s.handleInputSpent(spend, bestHeight) // A new external request has been received to retrieve all of // the inputs we're currently attempting to sweep. @@ -1670,6 +1590,82 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, } } +// handleInputSpent takes a spend event of our input and updates the sweeper's +// internal state to remove the input. +func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail, + bestHeight int32) { + + // For testing purposes. + if s.testSpendChan != nil { + s.testSpendChan <- *spend.SpentOutPoint + } + + // Query store to find out if we ever published this tx. + spendHash := *spend.SpenderTxHash + isOurTx, err := s.cfg.Store.IsOurTx(spendHash) + if err != nil { + log.Errorf("cannot determine if tx %v is ours: %v", + spendHash, err) + return + } + + // If this isn't our transaction, it means someone else swept outputs + // that we were attempting to sweep. This can happen for anchor outputs + // as well as justice transactions. In this case, we'll notify the + // wallet to remove any spends that descent from this output. + if !isOurTx { + err := s.removeLastSweepDescendants(spend.SpendingTx) + if err != nil { + log.Warnf("unable to remove descendant transactions "+ + "due to tx %v: ", spendHash) + } + + log.Debugf("Detected third party spend related to in flight "+ + "inputs (is_ours=%v): %v", + newLogClosure(func() string { + return spew.Sdump(spend.SpendingTx) + }), isOurTx, + ) + } + + // Signal sweep results for inputs in this confirmed tx. + for _, txIn := range spend.SpendingTx.TxIn { + outpoint := txIn.PreviousOutPoint + + // Check if this input is known to us. It could probably be + // unknown if we canceled the registration, deleted from + // pendingInputs but the ntfn was in-flight already. Or this + // could be not one of our inputs. + input, ok := s.pendingInputs[outpoint] + if !ok { + continue + } + + // Return either a nil or a remote spend result. + var err error + if !isOurTx { + err = ErrRemoteSpend + } + + // Signal result channels. + s.signalAndRemove(&outpoint, Result{ + Tx: spend.SpendingTx, + Err: err, + }) + + // Remove all other inputs in this exclusive group. + if input.params.ExclusiveGroup != nil { + s.removeExclusiveGroup(*input.params.ExclusiveGroup) + } + } + + // Now that an input of ours is spent, we can try to resweep the + // remaining inputs. + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } +} + // init initializes the random generator for random input rescheduling. func init() { rand.Seed(time.Now().Unix()) From 62b5869f87b35cd5dec863bd0e225b3bcd77cf08 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Oct 2023 17:02:40 +0800 Subject: [PATCH 04/11] sweep: add new method `handleSweep` --- sweep/sweeper.go | 49 +++++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 70ca61fef..6a99c9ac5 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -635,29 +635,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // The timer expires and we are going to (re)sweep. case <-s.timer: log.Debugf("Sweep timer expired") - - // Set timer to nil so we know that a new timer needs to - // be started when new inputs arrive. - s.timer = nil - - // We'll attempt to cluster all of our inputs with - // similar fee rates. Before attempting to sweep them, - // we'll sort them in descending fee rate order. We do - // this to ensure any inputs which have had their fee - // rate bumped are broadcast first in order enforce the - // RBF policy. - inputClusters := s.createInputClusters() - sort.Slice(inputClusters, func(i, j int) bool { - return inputClusters[i].sweepFeeRate > - inputClusters[j].sweepFeeRate - }) - for _, cluster := range inputClusters { - err := s.sweepCluster(cluster, bestHeight) - if err != nil { - log.Errorf("input cluster sweep: %v", - err) - } - } + s.handleSweep(bestHeight) // A new block comes in. Things may have changed, so we retry a // sweep. @@ -1666,6 +1644,31 @@ func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail, } } +// handleSweep is called when the ticker fires. It will create clusters and +// attempt to create and publish the sweeping transactions. +func (s *UtxoSweeper) handleSweep(bestHeight int32) { + // Set timer to nil so we know that a new timer needs to be started + // when new inputs arrive. + s.timer = nil + + // We'll attempt to cluster all of our inputs with similar fee rates. + // Before attempting to sweep them, we'll sort them in descending fee + // rate order. We do this to ensure any inputs which have had their fee + // rate bumped are broadcast first in order enforce the RBF policy. + inputClusters := s.createInputClusters() + sort.Slice(inputClusters, func(i, j int) bool { + return inputClusters[i].sweepFeeRate > + inputClusters[j].sweepFeeRate + }) + + for _, cluster := range inputClusters { + err := s.sweepCluster(cluster, bestHeight) + if err != nil { + log.Errorf("input cluster sweep: %v", err) + } + } +} + // init initializes the random generator for random input rescheduling. func init() { rand.Seed(time.Now().Unix()) From 4ba09098d1f9f18330875e537c0447552ab20f34 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Oct 2023 18:03:34 +0800 Subject: [PATCH 05/11] sweep+lnd: move ticker creation into sweeper --- server.go | 14 ++++++-------- sweep/sweeper.go | 11 ++++++----- sweep/sweeper_test.go | 14 +++++--------- 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/server.go b/server.go index c47f79757..f3419a446 100644 --- a/server.go +++ b/server.go @@ -1059,14 +1059,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{ - FeeEstimator: cc.FeeEstimator, - DetermineFeePerKw: sweep.DetermineFeePerKw, - GenSweepScript: newSweepPkScriptGen(cc.Wallet), - Signer: cc.Wallet.Cfg.Signer, - Wallet: newSweeperWallet(cc.Wallet), - NewBatchTimer: func() <-chan time.Time { - return time.NewTimer(cfg.Sweeper.BatchWindowDuration).C - }, + FeeEstimator: cc.FeeEstimator, + DetermineFeePerKw: sweep.DetermineFeePerKw, + GenSweepScript: newSweepPkScriptGen(cc.Wallet), + Signer: cc.Wallet.Cfg.Signer, + Wallet: newSweeperWallet(cc.Wallet), + TickerDuration: cfg.Sweeper.BatchWindowDuration, Notifier: cc.ChainNotifier, Store: sweeperStore, MaxInputsPerTx: sweep.DefaultMaxInputsPerTx, diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 6a99c9ac5..406cf725f 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -264,10 +264,11 @@ type UtxoSweeperConfig struct { // Wallet contains the wallet functions that sweeper requires. Wallet Wallet - // NewBatchTimer creates a channel that will be sent on when a certain - // time window has passed. During this time window, new inputs can still - // be added to the sweep tx that is about to be generated. - NewBatchTimer func() <-chan time.Time + // TickerDuration is used to create a channel that will be sent on when + // a certain time window has passed. During this time window, new + // inputs can still be added to the sweep tx that is about to be + // generated. + TickerDuration time.Duration // Notifier is an instance of a chain notifier we'll use to watch for // certain on-chain events. @@ -1019,7 +1020,7 @@ func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error { // Start sweep timer to create opportunity for more inputs to be added // before a tx is constructed. - s.timer = s.cfg.NewBatchTimer() + s.timer = time.NewTicker(s.cfg.TickerDuration).C log.Debugf("Sweep timer started") diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index df935aeff..81171f230 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -127,15 +127,11 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { } ctx.sweeper = New(&UtxoSweeperConfig{ - Notifier: notifier, - Wallet: backend, - NewBatchTimer: func() <-chan time.Time { - c := make(chan time.Time, 1) - ctx.timeoutChan <- c - return c - }, - Store: store, - Signer: &mock.DummySigner{}, + Notifier: notifier, + Wallet: backend, + TickerDuration: 100 * time.Millisecond, + Store: store, + Signer: &mock.DummySigner{}, GenSweepScript: func() ([]byte, error) { script := make([]byte, input.P2WPKHSize) script[0] = 0 From 7de418676641f44c71cb2f1aae064dfde1bd69b0 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Oct 2023 18:04:14 +0800 Subject: [PATCH 06/11] sweep: simplify polling logic in sweeper This commit attempts to make the polling logic in sweeper more linear. Previously, the sweep's timer is reset/restarted in multiple places, such as when a new input comes in, or a new block comes in, or a previous input being spent, making it difficult to follow. We now remove the old timer and replaces it with a simple polling logic - we will schedule sweeps every 5s(default), and if there's no input to be swept, we'd skip, just like the previous `scheduleSweep` does. It's also worthy noting that, although `scheduleSweep` triggers the timer to tick, by the time we do the actual sweep in `sweepCluster`, conditions may have changed. This is now also fixed because we only have one place to create the clusters and sweeps. --- sweep/sweeper.go | 92 +++++----------------------------------- sweep/sweeper_test.go | 99 ------------------------------------------- 2 files changed, 11 insertions(+), 180 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 406cf725f..a745f999c 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -228,9 +228,6 @@ type UtxoSweeper struct { // requested to sweep. pendingInputs pendingInputs - // timer is the channel that signals expiry of the sweep batch timer. - timer <-chan time.Time - testSpendChan chan wire.OutPoint currentOutputScript []byte @@ -606,6 +603,12 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { return } + // Create a ticker based on the config duration. + ticker := time.NewTicker(s.cfg.TickerDuration) + defer ticker.Stop() + + log.Debugf("Sweep ticker started") + for { select { // A new inputs is offered to the sweeper. We check to see if @@ -617,7 +620,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // A spend of one of our inputs is detected. Signal sweep // results to the caller(s). case spend := <-s.spendChan: - s.handleInputSpent(spend, bestHeight) + s.handleInputSpent(spend) // A new external request has been received to retrieve all of // the inputs we're currently attempting to sweep. @@ -634,12 +637,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { } // The timer expires and we are going to (re)sweep. - case <-s.timer: - log.Debugf("Sweep timer expired") + case <-ticker.C: + log.Debugf("Sweep ticker ticks, attempt sweeping...") s.handleSweep(bestHeight) - // A new block comes in. Things may have changed, so we retry a - // sweep. + // A new block comes in, update the bestHeight. case epoch, ok := <-blockEpochs: if !ok { return @@ -650,10 +652,6 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { log.Debugf("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } - case <-s.quit: return } @@ -982,51 +980,6 @@ func mergeClusters(a, b inputCluster) []inputCluster { return []inputCluster{newCluster} } -// scheduleSweep starts the sweep timer to create an opportunity for more inputs -// to be added. -func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error { - // The timer is already ticking, no action needed for the sweep to - // happen. - if s.timer != nil { - log.Debugf("Timer still ticking at height=%v", currentHeight) - return nil - } - - // We'll only start our timer once we have inputs we're able to sweep. - startTimer := false - for _, cluster := range s.createInputClusters() { - // Examine pending inputs and try to construct lists of inputs. - // We don't need to obtain the coin selection lock, because we - // just need an indication as to whether we can sweep. More - // inputs may be added until we publish the transaction and - // coins that we select now may be used in other transactions. - inputLists, err := s.getInputLists(cluster, currentHeight) - if err != nil { - return fmt.Errorf("get input lists: %v", err) - } - - log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+ - "yield %v distinct txns", currentHeight, - cluster.sweepFeeRate, len(inputLists)) - - if len(inputLists) != 0 { - startTimer = true - break - } - } - if !startTimer { - return nil - } - - // Start sweep timer to create opportunity for more inputs to be added - // before a tx is constructed. - s.timer = time.NewTicker(s.cfg.TickerDuration).C - - log.Debugf("Sweep timer started") - - return nil -} - // signalAndRemove notifies the listeners of the final result of the input // sweep. It cancels any pending spend notification and removes the input from // the list of pending inputs. When this function returns, the sweeper has @@ -1423,10 +1376,6 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq, bestHeight int32) ( pendingInput.minPublishHeight = bestHeight } - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("Unable to schedule sweep: %v", err) - } - resultChan := make(chan Result, 1) pendingInput.listeners = append(pendingInput.listeners, resultChan) @@ -1522,13 +1471,6 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage, } pendInput.ntfnRegCancel = cancel - - // Check to see if with this new input a sweep tx can be formed. - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } - - log.Tracef("input %v scheduled", outpoint) } // handleExistingInput processes an input that is already known to the sweeper. @@ -1571,9 +1513,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, // handleInputSpent takes a spend event of our input and updates the sweeper's // internal state to remove the input. -func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail, - bestHeight int32) { - +func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) { // For testing purposes. if s.testSpendChan != nil { s.testSpendChan <- *spend.SpentOutPoint @@ -1637,21 +1577,11 @@ func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail, s.removeExclusiveGroup(*input.params.ExclusiveGroup) } } - - // Now that an input of ours is spent, we can try to resweep the - // remaining inputs. - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } } // handleSweep is called when the ticker fires. It will create clusters and // attempt to create and publish the sweeping transactions. func (s *UtxoSweeper) handleSweep(bestHeight int32) { - // Set timer to nil so we know that a new timer needs to be started - // when new inputs arrive. - s.timer = nil - // We'll attempt to cluster all of our inputs with similar fee rates. // Before attempting to sweep them, we'll sort them in descending fee // rate order. We do this to ensure any inputs which have had their fee diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 81171f230..b10b22b8c 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -4,7 +4,6 @@ import ( "errors" "os" "reflect" - "runtime/debug" "runtime/pprof" "sort" "testing" @@ -44,7 +43,6 @@ type sweeperTestContext struct { backend *mockBackend store *MockSweeperStore - timeoutChan chan chan time.Time publishChan chan wire.MsgTx } @@ -123,7 +121,6 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { estimator: estimator, backend: backend, store: store, - timeoutChan: make(chan chan time.Time, 1), } ctx.sweeper = New(&UtxoSweeperConfig{ @@ -163,43 +160,6 @@ func (ctx *sweeperTestContext) restartSweeper() { ctx.sweeper.Start() } -func (ctx *sweeperTestContext) tick() { - testLog.Trace("Waiting for tick to be consumed") - select { - case c := <-ctx.timeoutChan: - select { - case c <- time.Time{}: - testLog.Trace("Tick") - case <-time.After(defaultTestTimeout): - debug.PrintStack() - ctx.t.Fatal("tick timeout - tick not consumed") - } - case <-time.After(defaultTestTimeout): - debug.PrintStack() - ctx.t.Fatal("tick timeout - no new timer created") - } -} - -// assertNoTick asserts that the sweeper does not wait for a tick. -func (ctx *sweeperTestContext) assertNoTick() { - ctx.t.Helper() - - select { - case <-ctx.timeoutChan: - ctx.t.Fatal("unexpected tick") - - case <-time.After(processingDelay): - } -} - -func (ctx *sweeperTestContext) assertNoNewTimer() { - select { - case <-ctx.timeoutChan: - ctx.t.Fatal("no new timer expected") - default: - } -} - func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) { // We assume that when finish is called, sweeper has finished all its // goroutines. This implies that the waitgroup is empty. @@ -233,7 +193,6 @@ func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) { // We should have consumed and asserted all published transactions in // our unit tests. ctx.assertNoTx() - ctx.assertNoNewTimer() if !ctx.backend.isDone() { ctx.t.Fatal("unconfirmed txes remaining") } @@ -383,8 +342,6 @@ func TestSuccess(t *testing.T) { t.Fatal(err) } - ctx.tick() - sweepTx := ctx.receiveTx() ctx.backend.mine() @@ -437,8 +394,6 @@ func TestDust(t *testing.T) { t.Fatal(err) } - ctx.tick() - // The second input brings the sweep output above the dust limit. We // expect a sweep tx now. @@ -478,8 +433,6 @@ func TestWalletUtxo(t *testing.T) { t.Fatal(err) } - ctx.tick() - sweepTx := ctx.receiveTx() if len(sweepTx.TxIn) != 2 { t.Fatalf("Expected tx to sweep 2 inputs, but contains %v "+ @@ -532,8 +485,6 @@ func TestNegativeInput(t *testing.T) { t.Fatal(err) } - ctx.tick() - // We expect that a sweep tx is published now, but it should only // contain the large input. The negative input should stay out of sweeps // until fees come down to get a positive net yield. @@ -557,8 +508,6 @@ func TestNegativeInput(t *testing.T) { t.Fatal(err) } - ctx.tick() - sweepTx2 := ctx.receiveTx() assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput) @@ -582,8 +531,6 @@ func TestChunks(t *testing.T) { } } - ctx.tick() - // We expect two txes to be published because of the max input count of // three. sweepTx1 := ctx.receiveTx() @@ -645,7 +592,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) { } if postSweep { - ctx.tick() // Tx publication by sweeper returns ErrDoubleSpend. Sweeper // will retry the inputs without reporting a result. It could be @@ -669,7 +615,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) { if !postSweep { // Assert that the sweeper sweeps the remaining input. - ctx.tick() sweepTx := ctx.receiveTx() if len(sweepTx.TxIn) != 1 { @@ -710,8 +655,6 @@ func TestIdempotency(t *testing.T) { t.Fatal(err) } - ctx.tick() - ctx.receiveTx() resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref) @@ -739,7 +682,6 @@ func TestIdempotency(t *testing.T) { // Timer is still running, but spend notification was delivered before // it expired. - ctx.tick() ctx.finish(1) } @@ -762,7 +704,6 @@ func TestRestart(t *testing.T) { if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { t.Fatal(err) } - ctx.tick() ctx.receiveTx() @@ -798,8 +739,6 @@ func TestRestart(t *testing.T) { // Timer tick should trigger republishing a sweep for the remaining // input. - ctx.tick() - ctx.receiveTx() ctx.backend.mine() @@ -837,8 +776,6 @@ func TestRestartRemoteSpend(t *testing.T) { t.Fatal(err) } - ctx.tick() - sweepTx := ctx.receiveTx() // Restart sweeper. @@ -869,8 +806,6 @@ func TestRestartRemoteSpend(t *testing.T) { // Expect sweeper to construct a new tx, because input 1 was spend // remotely. - ctx.tick() - ctx.receiveTx() ctx.backend.mine() @@ -891,8 +826,6 @@ func TestRestartConfirmed(t *testing.T) { t.Fatal(err) } - ctx.tick() - ctx.receiveTx() // Restart sweeper. @@ -910,9 +843,6 @@ func TestRestartConfirmed(t *testing.T) { // Here we expect again a successful sweep. ctx.expectResult(spendChan, nil) - // Timer started but not needed because spend ntfn was sent. - ctx.tick() - ctx.finish(1) } @@ -927,8 +857,6 @@ func TestRetry(t *testing.T) { t.Fatal(err) } - ctx.tick() - // We expect a sweep to be published. ctx.receiveTx() @@ -944,8 +872,6 @@ func TestRetry(t *testing.T) { t.Fatal(err) } - ctx.tick() - // Two txes are expected to be published, because new and retry inputs // are separated. ctx.receiveTx() @@ -971,8 +897,6 @@ func TestGiveUp(t *testing.T) { t.Fatal(err) } - ctx.tick() - // We expect a sweep to be published at height 100 (mockChainIOHeight). ctx.receiveTx() @@ -983,12 +907,10 @@ func TestGiveUp(t *testing.T) { // Second attempt ctx.notifier.NotifyEpoch(101) - ctx.tick() ctx.receiveTx() // Third attempt ctx.notifier.NotifyEpoch(103) - ctx.tick() ctx.receiveTx() ctx.expectResult(resultChan0, ErrTooManyAttempts) @@ -1038,10 +960,6 @@ func TestDifferentFeePreferences(t *testing.T) { t.Fatal(err) } - // Start the sweeper's batch ticker, which should cause the sweep - // transactions to be broadcast in order of high to low fee preference. - ctx.tick() - // Generate the same type of sweep script that was used for weight // estimation. changePk, err := ctx.sweeper.cfg.GenSweepScript() @@ -1121,7 +1039,6 @@ func TestPendingInputs(t *testing.T) { // rate sweep to ensure we can detect pending inputs after a sweep. // Once the higher fee rate sweep confirms, we should no longer see // those inputs pending. - ctx.tick() ctx.receiveTx() lowFeeRateTx := ctx.receiveTx() ctx.backend.deleteUnconfirmed(lowFeeRateTx.TxHash()) @@ -1133,7 +1050,6 @@ func TestPendingInputs(t *testing.T) { // sweep. Once again we'll ensure those inputs are no longer pending // once the sweep transaction confirms. ctx.backend.notifier.NotifyEpoch(101) - ctx.tick() ctx.receiveTx() ctx.backend.mine() ctx.expectResult(resultChan3, nil) @@ -1179,7 +1095,6 @@ func TestBumpFeeRBF(t *testing.T) { require.NoError(t, err) // Ensure that a transaction is broadcast with the lower fee preference. - ctx.tick() lowFeeTx := ctx.receiveTx() assertTxFeeRate(t, &lowFeeTx, lowFeeRate, changePk, &input) @@ -1200,7 +1115,6 @@ func TestBumpFeeRBF(t *testing.T) { require.NoError(t, err, "unable to bump input's fee") // A higher fee rate transaction should be immediately broadcast. - ctx.tick() highFeeTx := ctx.receiveTx() assertTxFeeRate(t, &highFeeTx, highFeeRate, changePk, &input) @@ -1234,7 +1148,6 @@ func TestExclusiveGroup(t *testing.T) { // We expect all inputs to be published in separate transactions, even // though they share the same fee preference. - ctx.tick() for i := 0; i < 3; i++ { sweepTx := ctx.receiveTx() if len(sweepTx.TxOut) != 1 { @@ -1306,10 +1219,6 @@ func TestCpfp(t *testing.T) { ) require.NoError(t, err) - // Because we sweep at 1000 sat/kw, the parent cannot be paid for. We - // expect the sweeper to remain idle. - ctx.assertNoTick() - // Increase the fee estimate to above the parent tx fee rate. ctx.estimator.updateFees(5000, chainfee.FeePerKwFloor) @@ -1319,7 +1228,6 @@ func TestCpfp(t *testing.T) { // Now we do expect a sweep transaction to be published with our input // and an attached wallet utxo. - ctx.tick() tx := ctx.receiveTx() require.Len(t, tx.TxIn, 2) require.Len(t, tx.TxOut, 1) @@ -1691,10 +1599,6 @@ func TestLockTimes(t *testing.T) { inputs[*op] = inp } - // We expect all inputs to be published in separate transactions, even - // though they share the same fee preference. - ctx.tick() - // Check the sweeps transactions, ensuring all inputs are there, and // all the locktimes are satisfied. for i := 0; i < numSweeps; i++ { @@ -2139,9 +2043,6 @@ func TestRequiredTxOuts(t *testing.T) { inputs[*op] = inp } - // Tick, which should trigger a sweep of all inputs. - ctx.tick() - // Check the sweeps transactions, ensuring all inputs // are there, and all the locktimes are satisfied. var sweeps []*wire.MsgTx From acc15d8113ce2413f0d7069d0155b68a36b96ff4 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 23 Oct 2023 10:44:46 +0800 Subject: [PATCH 07/11] sweep: add new method `rescheduleInputs` and refactor `sweep` --- sweep/sweeper.go | 47 +++++++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index a745f999c..70482f1af 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1124,38 +1124,39 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return fmt.Errorf("notify publish tx: %v", err) } - // Publish sweep tx. + // Reschedule the inputs that we just tried to sweep. This is done in + // case the following publish fails, we'd like to update the inputs' + // publish attempts and rescue them in the next sweep. + s.rescheduleInputs(tx.TxIn, currentHeight) + log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v", tx.TxHash(), len(tx.TxIn), currentHeight) - log.Tracef("Sweep tx at height=%v: %v", currentHeight, - newLogClosure(func() string { - return spew.Sdump(tx) - }), - ) - + // Publish the sweeping tx with customized label. err = s.cfg.Wallet.PublishTransaction( tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil), ) - - // In case of an unexpected error, don't try to recover. - if err != nil && err != lnwallet.ErrDoubleSpend { - return fmt.Errorf("publish tx: %v", err) - } - - // Otherwise log the error. if err != nil { - log.Errorf("Publish sweep tx %v got error: %v", tx.TxHash(), - err) - } else { - // If there's no error, remove the output script. Otherwise - // keep it so that it can be reused for the next transaction - // and causes no address inflation. - s.currentOutputScript = nil + return err } + // If there's no error, remove the output script. Otherwise keep it so + // that it can be reused for the next transaction and causes no address + // inflation. + s.currentOutputScript = nil + + return nil +} + +// rescheduleInputs updates the pending inputs with the given tx inputs. It +// increments the `publishAttempts` and calculates the next broadcast height +// for each input. When the publishAttempts exceeds MaxSweepAttemps(10), this +// input will be removed. +func (s *UtxoSweeper) rescheduleInputs(inputs []*wire.TxIn, + currentHeight int32) { + // Reschedule sweep. - for _, input := range tx.TxIn { + for _, input := range inputs { pi, ok := s.pendingInputs[input.PreviousOutPoint] if !ok { // It can be that the input has been removed because it @@ -1196,8 +1197,6 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, }) } } - - return nil } // monitorSpend registers a spend notification with the chain notifier. It From 92837621ecce89b64cd3873f5668b55fa2378f27 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Oct 2023 20:35:46 +0800 Subject: [PATCH 08/11] sweep: remove possible RBF when creating sweeping tx for new inputs This commit changes how we create the input sets which are used to construct the sweeping transactions. Assume the sweeper has two inputs, one is new and one is retried, we'd end up having two transactions, - tx1: which spends both the new and old inputs. - tx2: which spends the new inputs only. When publishing these txes, depending on which one gets into the mempool first, the other one will be viewed as an RBF for the first one since they both spending the same input(the new input). This is now fixed by only attempt to publish the second tx when there isn't a first tx - when there is a tx1, it means the new inputs are already used in this tx along with retried inputs, hence there's no need to publish tx2 which spends the new inputs only. --- sweep/sweeper.go | 97 ++++++++++++++++++++++++---------- sweep/sweeper_test.go | 117 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 180 insertions(+), 34 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 70482f1af..4d920846d 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -687,22 +687,64 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { func (s *UtxoSweeper) sweepCluster(cluster inputCluster, currentHeight int32) error { - // Execute the sweep within a coin select lock. Otherwise the coins that - // we are going to spend may be selected for other transactions like - // funding of a channel. + // Execute the sweep within a coin select lock. Otherwise the coins + // that we are going to spend may be selected for other transactions + // like funding of a channel. return s.cfg.Wallet.WithCoinSelectLock(func() error { - // Examine pending inputs and try to construct - // lists of inputs. - inputLists, err := s.getInputLists(cluster, currentHeight) + // Examine pending inputs and try to construct lists of inputs. + allSets, newSets, err := s.getInputLists(cluster, currentHeight) if err != nil { - return fmt.Errorf("unable to examine pending inputs: %v", err) + return fmt.Errorf("examine pending inputs: %w", err) } - // Sweep selected inputs. - for _, inputs := range inputLists { - err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight) + // errAllSets records the error from broadcasting the sweeping + // transactions for all input sets. + var errAllSets error + + // allSets contains retried inputs and new inputs. To avoid + // creating an RBF for the new inputs, we'd sweep this set + // first. + for _, inputs := range allSets { + errAllSets = s.sweep( + inputs, cluster.sweepFeeRate, currentHeight, + ) + // TODO(yy): we should also find out which set created + // this error. If there are new inputs in this set, we + // should give it a second chance by sweeping them + // below. To enable this, we need to provide richer + // state for each input other than just recording the + // publishAttempts. We'd also need to refactor how we + // create the input sets. Atm, the steps are, + // 1. create a list of input sets. + // 2. sweep each set by creating and publishing the tx. + // We should change the flow as, + // 1. create a list of input sets, and for each set, + // 2. when created, we create and publish the tx. + // 3. if the publish fails, find out which input is + // causing the failure and retry the rest of the + // inputs. + if errAllSets != nil { + log.Errorf("sweep all inputs: %w", err) + break + } + } + + // If we have successfully swept all inputs, there's no need to + // sweep the new inputs as it'd create an RBF case. + if allSets != nil && errAllSets == nil { + return nil + } + + // We'd end up there if there's no retried inputs. In this + // case, we'd sweep the new input sets. If there's an error + // when sweeping a given set, we'd log the error and sweep the + // next set. + for _, inputs := range newSets { + err := s.sweep( + inputs, cluster.sweepFeeRate, currentHeight, + ) if err != nil { - return fmt.Errorf("unable to sweep inputs: %v", err) + log.Errorf("sweep new inputs: %w", err) } } @@ -1017,23 +1059,25 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) { } // getInputLists goes through the given inputs and constructs multiple distinct -// sweep lists with the given fee rate, each up to the configured maximum number -// of inputs. Negative yield inputs are skipped. Transactions with an output -// below the dust limit are not published. Those inputs remain pending and will -// be bundled with future inputs if possible. +// sweep lists with the given fee rate, each up to the configured maximum +// number of inputs. Negative yield inputs are skipped. Transactions with an +// output below the dust limit are not published. Those inputs remain pending +// and will be bundled with future inputs if possible. It returns two list - +// one containing all inputs and the other containing only the new inputs. If +// there's no retried inputs, the first set returned will be empty. func (s *UtxoSweeper) getInputLists(cluster inputCluster, - currentHeight int32) ([]inputSet, error) { + currentHeight int32) ([]inputSet, []inputSet, error) { // Filter for inputs that need to be swept. Create two lists: all // sweepable inputs and a list containing only the new, never tried // inputs. // - // We want to create as large a tx as possible, so we return a final set - // list that starts with sets created from all inputs. However, there is - // a chance that those txes will not publish, because they already - // contain inputs that failed before. Therefore we also add sets - // consisting of only new inputs to the list, to make sure that new - // inputs are given a good, isolated chance of being published. + // We want to create as large a tx as possible, so we return a final + // set list that starts with sets created from all inputs. However, + // there is a chance that those txes will not publish, because they + // already contain inputs that failed before. Therefore we also add + // sets consisting of only new inputs to the list, to make sure that + // new inputs are given a good, isolated chance of being published. // // TODO(yy): this would lead to conflict transactions as the same input // can be used in two sweeping transactions, and our rebroadcaster will @@ -1070,7 +1114,8 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { - return nil, fmt.Errorf("input partitionings: %v", err) + return nil, nil, fmt.Errorf("input partitionings: %w", + err) } } @@ -1080,15 +1125,13 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { - return nil, fmt.Errorf("input partitionings: %v", err) + return nil, nil, fmt.Errorf("input partitionings: %w", err) } log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+ "total_num_new=%v", currentHeight, len(allSets), len(newSets)) - // Append the new sets at the end of the list, because those tx likely - // have a higher fee per input. - return append(allSets, newSets...), nil + return allSets, newSets, nil } // sweep takes a set of preselected inputs, creates a sweep tx and publishes the diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index b10b22b8c..14150032c 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -860,10 +860,6 @@ func TestRetry(t *testing.T) { // We expect a sweep to be published. ctx.receiveTx() - // New block arrives. This should trigger a new sweep attempt timer - // start. - ctx.notifier.NotifyEpoch(1000) - // Offer a fresh input. resultChan1, err := ctx.sweeper.SweepInput( spendableInputs[1], defaultFeePref, @@ -872,9 +868,7 @@ func TestRetry(t *testing.T) { t.Fatal(err) } - // Two txes are expected to be published, because new and retry inputs - // are separated. - ctx.receiveTx() + // A single tx is expected to be published. ctx.receiveTx() ctx.backend.mine() @@ -2436,3 +2430,112 @@ func TestClusterByLockTime(t *testing.T) { }) } } + +// TestGetInputLists checks that the expected input sets are returned based on +// whether there are retried inputs or not. +func TestGetInputLists(t *testing.T) { + t.Parallel() + + // Create a test param with a dummy fee preference. This is needed so + // `feeRateForPreference` won't throw an error. + param := Params{Fee: FeePreference{ConfTarget: 1}} + + // Create a mock input and mock all the methods used in this test. + testInput := &input.MockInput{} + testInput.On("RequiredLockTime").Return(0, false) + testInput.On("WitnessType").Return(input.CommitmentAnchor) + testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1}) + testInput.On("RequiredTxOut").Return(nil) + testInput.On("UnconfParent").Return(nil) + testInput.On("SignDesc").Return(&input.SignDescriptor{ + Output: &wire.TxOut{Value: 100_000}, + }) + + // Create a new and a retried input. + // + // NOTE: we use the same input.Input for both pending inputs as we only + // test the logic of returning the correct non-nil input sets, and not + // the content the of sets. To validate the content of the sets, we + // should test `generateInputPartitionings` instead. + newInput := &pendingInput{ + Input: testInput, + params: param, + } + oldInput := &pendingInput{ + Input: testInput, + params: param, + publishAttempts: 1, + } + + // clusterNew contains only new inputs. + clusterNew := pendingInputs{ + wire.OutPoint{Index: 1}: newInput, + } + + // clusterMixed contains a mixed of new and retried inputs. + clusterMixed := pendingInputs{ + wire.OutPoint{Index: 1}: newInput, + wire.OutPoint{Index: 2}: oldInput, + } + + // clusterOld contains only retried inputs. + clusterOld := pendingInputs{ + wire.OutPoint{Index: 2}: oldInput, + } + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + MaxInputsPerTx: DefaultMaxInputsPerTx, + }) + + testCases := []struct { + name string + cluster inputCluster + expectedNilAllSet bool + expectNilNewSet bool + }{ + { + // When there are only new inputs, we'd expect the + // first returned set(allSets) to be empty. + name: "new inputs only", + cluster: inputCluster{inputs: clusterNew}, + expectedNilAllSet: true, + expectNilNewSet: false, + }, + { + // When there are only retried inputs, we'd expect the + // second returned set(newSet) to be empty. + name: "retried inputs only", + cluster: inputCluster{inputs: clusterOld}, + expectedNilAllSet: false, + expectNilNewSet: true, + }, + { + // When there are mixed inputs, we'd expect two sets + // are returned. + name: "mixed inputs", + cluster: inputCluster{inputs: clusterMixed}, + expectedNilAllSet: false, + expectNilNewSet: false, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + allSets, newSets, err := s.getInputLists(tc.cluster, 0) + require.NoError(t, err) + + if tc.expectNilNewSet { + require.Nil(t, newSets) + } + + if tc.expectedNilAllSet { + require.Nil(t, allSets) + } + }) + } +} From 0816f9124cda608577522a444235bbe1a629f477 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 13 Oct 2023 02:38:56 +0800 Subject: [PATCH 09/11] itest: update itest to reflect new sweeper behavior --- itest/lnd_channel_backup_test.go | 31 +++++----------- itest/lnd_channel_force_close_test.go | 32 +++++++++++++---- itest/lnd_multi-hop_test.go | 52 +++++++++++++-------------- itest/lnd_routing_test.go | 2 +- 4 files changed, 60 insertions(+), 57 deletions(-) diff --git a/itest/lnd_channel_backup_test.go b/itest/lnd_channel_backup_test.go index e4c046b3d..ea04a139a 100644 --- a/itest/lnd_channel_backup_test.go +++ b/itest/lnd_channel_backup_test.go @@ -1524,18 +1524,12 @@ func assertDLPExecuted(ht *lntest.HarnessTest, if commitType == lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE { // Dave should sweep his anchor only, since he still has the - // lease CLTV constraint on his commitment output. - ht.Miner.AssertNumTxsInMempool(1) + // lease CLTV constraint on his commitment output. We'd also + // see Carol's anchor sweep here. + ht.Miner.AssertNumTxsInMempool(2) - // Mine Dave's anchor sweep tx. - ht.MineBlocksAndAssertNumTxes(1, 1) - blocksMined++ - - // The above block will trigger Carol's sweeper to reconsider - // the anchor sweeping. Because we are now sweeping at the fee - // rate floor, the sweeper will consider this input has - // positive yield thus attempts the sweeping. - ht.MineBlocksAndAssertNumTxes(1, 1) + // Mine anchor sweep txes for Carol and Dave. + ht.MineBlocksAndAssertNumTxes(1, 2) blocksMined++ // After Carol's output matures, she should also reclaim her @@ -1564,10 +1558,10 @@ func assertDLPExecuted(ht *lntest.HarnessTest, ht.AssertNumPendingForceClose(dave, 0) } else { // Dave should sweep his funds immediately, as they are not - // timelocked. We also expect Dave to sweep his anchor, if - // present. + // timelocked. We also expect Carol and Dave sweep their + // anchors. if lntest.CommitTypeHasAnchors(commitType) { - ht.MineBlocksAndAssertNumTxes(1, 2) + ht.MineBlocksAndAssertNumTxes(1, 3) } else { ht.MineBlocksAndAssertNumTxes(1, 1) } @@ -1577,15 +1571,6 @@ func assertDLPExecuted(ht *lntest.HarnessTest, // Now Dave should consider the channel fully closed. ht.AssertNumPendingForceClose(dave, 0) - // The above block will trigger Carol's sweeper to reconsider - // the anchor sweeping. Because we are now sweeping at the fee - // rate floor, the sweeper will consider this input has - // positive yield thus attempts the sweeping. - if lntest.CommitTypeHasAnchors(commitType) { - ht.MineBlocksAndAssertNumTxes(1, 1) - blocksMined++ - } - // After Carol's output matures, she should also reclaim her // funds. // diff --git a/itest/lnd_channel_force_close_test.go b/itest/lnd_channel_force_close_test.go index e640ea583..71950f81f 100644 --- a/itest/lnd_channel_force_close_test.go +++ b/itest/lnd_channel_force_close_test.go @@ -94,7 +94,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) { // calculateSweepFeeRate runs multiple steps to calculate the fee rate // used in sweeping the transactions. - calculateSweepFeeRate := func(expectedSweepTxNum, deadline int) int64 { + calculateSweepFeeRate := func(expectAnchor bool, deadline int) int64 { // Create two nodes, Alice and Bob. alice := setupNode("Alice") defer ht.Shutdown(alice) @@ -143,12 +143,32 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) { // section. ht.AssertChannelWaitingClose(alice, chanPoint) + // We should see Alice's force closing tx in the mempool. + expectedNumTxes := 1 + + // If anchor is expected, we should see the anchor sweep tx in + // the mempool too. + if expectAnchor { + expectedNumTxes = 2 + } + // Check our sweep transactions can be found in mempool. - sweepTxns := ht.Miner.GetNumTxsFromMempool(expectedSweepTxNum) + sweepTxns := ht.Miner.GetNumTxsFromMempool(expectedNumTxes) // Mine a block to confirm these transactions such that they // don't remain in the mempool for any subsequent tests. - ht.MineBlocks(1) + ht.MineBlocksAndAssertNumTxes(1, expectedNumTxes) + + // Bob should now sweep his to_local output and anchor output. + expectedNumTxes = 2 + + // If Alice's anchor is not swept above, we should see it here. + if !expectAnchor { + expectedNumTxes = 3 + } + + // Mine one more block to assert the sweep transactions. + ht.MineBlocksAndAssertNumTxes(1, expectedNumTxes) // Calculate the fee rate used. feeRate := ht.CalculateTxesFeeRate(sweepTxns) @@ -163,7 +183,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) { // Calculate fee rate used and assert only the force close tx is // broadcast. - feeRate := calculateSweepFeeRate(1, deadline) + feeRate := calculateSweepFeeRate(false, deadline) // We expect the default max fee rate is used. Allow some deviation // because weight estimates during tx generation are estimates. @@ -181,7 +201,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) { // Calculate fee rate used and assert only the force close tx is // broadcast. - feeRate = calculateSweepFeeRate(1, defaultDeadline) + feeRate = calculateSweepFeeRate(false, defaultDeadline) // We expect the default max fee rate is used. Allow some deviation // because weight estimates during tx generation are estimates. @@ -198,7 +218,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) { // Calculate fee rate used and assert both the force close tx and the // anchor sweeping tx are broadcast. - feeRate = calculateSweepFeeRate(2, deadline) + feeRate = calculateSweepFeeRate(true, deadline) // We expect the anchor to be swept with the deadline, which has the // fee rate of feeRateLarge. diff --git a/itest/lnd_multi-hop_test.go b/itest/lnd_multi-hop_test.go index 2630bbde6..1841d77ca 100644 --- a/itest/lnd_multi-hop_test.go +++ b/itest/lnd_multi-hop_test.go @@ -319,15 +319,17 @@ func runMultiHopHtlcLocalTimeout(ht *lntest.HarnessTest, // was in fact mined. ht.MineBlocksAndAssertNumTxes(1, 1) - // Mine an additional block to prompt Bob to broadcast their - // second layer sweep due to the CSV on the HTLC timeout output. - ht.MineBlocksAndAssertNumTxes(1, 0) + // Mine one more block to trigger the timeout path. + ht.MineEmptyBlocks(1) + + // Bob's sweeper should now broadcast his second layer sweep + // due to the CSV on the HTLC timeout output. ht.Miner.AssertOutpointInMempool(htlcTimeoutOutpoint) } // Next, we'll mine a final block that should confirm the sweeping // transactions left. - ht.MineBlocks(1) + ht.MineBlocksAndAssertNumTxes(1, 1) // Once this transaction has been confirmed, Bob should detect that he // no longer has any pending channels. @@ -482,7 +484,7 @@ func runMultiHopReceiverChainClaim(ht *lntest.HarnessTest, // We'll now mine an additional block which should confirm both the // second layer transactions. - ht.MineBlocks(1) + ht.MineBlocksAndAssertNumTxes(1, expectedTxes) // Carol's pending channel report should now show two outputs under // limbo: her commitment output, as well as the second-layer claim @@ -494,16 +496,16 @@ func runMultiHopReceiverChainClaim(ht *lntest.HarnessTest, // clearing the HTLC off-chain. ht.AssertNumActiveHtlcs(alice, 0) - // If we mine 4 additional blocks, then Carol can sweep the second level - // HTLC output. - ht.MineBlocks(defaultCSV) + // If we mine 4 additional blocks, then Carol can sweep the second + // level HTLC output once the CSV expires. + ht.MineEmptyBlocks(defaultCSV) // We should have a new transaction in the mempool. ht.Miner.AssertNumTxsInMempool(1) - // Finally, if we mine an additional block to confirm these two sweep - // transactions, Carol should not show a pending channel in her report - // afterwards. + // Finally, if we mine an additional block to confirm Carol's second + // level success transaction. Carol should not show a pending channel + // in her report afterwards. ht.MineBlocks(1) ht.AssertNumPendingForceClose(carol, 0) @@ -815,15 +817,16 @@ func runMultiHopRemoteForceCloseOnChainHtlcTimeout(ht *lntest.HarnessTest, case lnrpc.CommitmentType_LEGACY: expectedTxes = 1 - // Bob can sweep his commit and anchor outputs immediately. + // Bob can sweep his commit and anchor outputs immediately. Carol will + // also sweep her anchor. case lnrpc.CommitmentType_ANCHORS, lnrpc.CommitmentType_SIMPLE_TAPROOT: - expectedTxes = 2 + expectedTxes = 3 // Bob can't sweep his commit output yet as he was the initiator of a // script-enforced leased channel, so he'll always incur the additional // CLTV. He can still sweep his anchor output however. case lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE: - expectedTxes = 1 + expectedTxes = 2 default: ht.Fatalf("unhandled commitment type %v", c) @@ -833,15 +836,6 @@ func runMultiHopRemoteForceCloseOnChainHtlcTimeout(ht *lntest.HarnessTest, ht.MineBlocksAndAssertNumTxes(1, expectedTxes) blocksMined++ - // The above block will trigger Carol's sweeper to reconsider the - // anchor sweeping. Because we are now sweeping at the fee rate floor, - // the sweeper will consider this input has positive yield thus - // attempts the sweeping. - if lntest.CommitTypeHasAnchors(c) { - ht.MineBlocksAndAssertNumTxes(1, 1) - blocksMined++ - } - // Next, we'll mine enough blocks for the HTLC to expire. At this // point, Bob should hand off the output to his internal utxo nursery, // which will broadcast a sweep transaction. @@ -1000,15 +994,16 @@ func runMultiHopHtlcLocalChainClaim(ht *lntest.HarnessTest, case lnrpc.CommitmentType_LEGACY: expectedTxes = 1 - // Alice will sweep her commitment and anchor output immediately. + // Alice will sweep her commitment and anchor output immediately. Bob + // will also sweep his anchor. case lnrpc.CommitmentType_ANCHORS, lnrpc.CommitmentType_SIMPLE_TAPROOT: - expectedTxes = 2 + expectedTxes = 3 // Alice will sweep her anchor output immediately. Her commitment // output cannot be swept yet as it has incurred an additional CLTV due // to being the initiator of a script-enforced leased channel. case lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE: - expectedTxes = 1 + expectedTxes = 2 default: ht.Fatalf("unhandled commitment type %v", c) @@ -2162,8 +2157,9 @@ func runExtraPreimageFromRemoteCommit(ht *lntest.HarnessTest, numBlocks = htlc.ExpirationHeight - uint32(height) - lncfg.DefaultOutgoingBroadcastDelta - // We should now have Carol's htlc suucess tx in the mempool. + // We should now have Carol's htlc success tx in the mempool. numTxesMempool := 1 + ht.Miner.AssertNumTxsInMempool(numTxesMempool) // For neutrino backend, the timeout resolver needs to extract the // preimage from the blocks. @@ -2171,7 +2167,9 @@ func runExtraPreimageFromRemoteCommit(ht *lntest.HarnessTest, // Mine a block to confirm Carol's 2nd level success tx. ht.MineBlocksAndAssertNumTxes(1, 1) numTxesMempool-- + numBlocks-- } + // Mine empty blocks so Carol's htlc success tx stays in mempool. Once // the height is reached, Bob's timeout resolver will resolve the htlc // by extracing the preimage from the mempool. diff --git a/itest/lnd_routing_test.go b/itest/lnd_routing_test.go index fa898835a..55d5e7d32 100644 --- a/itest/lnd_routing_test.go +++ b/itest/lnd_routing_test.go @@ -528,7 +528,6 @@ func testPrivateChannels(ht *lntest.HarnessTest) { Private: true, }, ) - defer ht.CloseChannel(carol, chanPointPrivate) // The channel should be available for payments between Carol and Alice. // We check this by sending payments from Carol to Bob, that @@ -602,6 +601,7 @@ func testPrivateChannels(ht *lntest.HarnessTest) { ht.CloseChannel(alice, chanPointAlice) ht.CloseChannel(dave, chanPointDave) ht.CloseChannel(carol, chanPointCarol) + ht.CloseChannel(carol, chanPointPrivate) } // testInvoiceRoutingHints tests that the routing hints for an invoice are From 05fbe8b1160996709f10b76dc13c68327e5e88ed Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 13 Oct 2023 16:53:25 +0800 Subject: [PATCH 10/11] docs: update release note re RBF fix --- docs/release-notes/release-notes-0.18.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index e80c4f422..dd24c8509 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -28,6 +28,10 @@ * LND will now [enforce pong responses ](https://github.com/lightningnetwork/lnd/pull/7828) from its peers +* [Fixed a possible unintended RBF + attempt](https://github.com/lightningnetwork/lnd/pull/8091) when sweeping new + inputs with retried ones. + # New Features ## Functional Enhancements From dba4c8e5adeed1d08fced0239b92d44cfdb0ef51 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 17 Oct 2023 18:24:24 +0800 Subject: [PATCH 11/11] trivial: fix typo and nits --- sweep/sweeper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 14150032c..b50ce929f 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2150,7 +2150,7 @@ func TestFeeRateForPreference(t *testing.T) { return 0, dummyErr } - // Set the relay fee rate to be 1. + // Set the relay fee rate to be 1 sat/kw. s.relayFeeRate = 1 // smallFeeFunc is a mock over DetermineFeePerKw that always return a @@ -2198,7 +2198,7 @@ func TestFeeRateForPreference(t *testing.T) { expectedErr: ErrNoFeePreference, }, { - // When an error is returned from the fee determinor, + // When an error is returned from the fee determiner, // we should return it. name: "error from DetermineFeePerKw", feePref: FeePreference{FeeRate: 1},