From 4c13ea174764ed29678f36a88a99f8485aabf2c6 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 27 Mar 2024 19:22:02 +0800 Subject: [PATCH] sweep: pass default deadline height when clustering inputs This commit changes the method `ClusterInputs` to also take a default deadline height. Previously, when calculating the default deadline height for a non-time sensitive input, we would first cluster it with other non-time sensitive inputs, then give it a deadline before we are about to `sweep`. This is now moved to the step where we decide to cluster inputs, allowing time-sensitive and non-sensitive inputs to be grouped together, if they happen to share the same deadline heights. --- sweep/aggregator.go | 47 +++++++++++++++++++++++++------------- sweep/aggregator_test.go | 15 +++++++----- sweep/mock_test.go | 11 +++++---- sweep/sweeper.go | 13 +++++------ sweep/sweeper_test.go | 11 ++++++--- sweep/tx_input_set.go | 38 +++++++++++------------------- sweep/tx_input_set_test.go | 14 ++++++------ 7 files changed, 80 insertions(+), 69 deletions(-) diff --git a/sweep/aggregator.go b/sweep/aggregator.go index 09a691fb0..4138afc90 100644 --- a/sweep/aggregator.go +++ b/sweep/aggregator.go @@ -5,7 +5,6 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) @@ -123,7 +122,7 @@ func (c *inputCluster) createInputSets(maxFeeRate chainfee.SatPerKWeight, type UtxoAggregator interface { // ClusterInputs takes a list of inputs and groups them into input // sets. Each input set will be used to create a sweeping transaction. - ClusterInputs(InputsMap) []InputSet + ClusterInputs(inputs InputsMap, defaultDeadline int32) []InputSet } // SimpleAggregator aggregates inputs known by the Sweeper based on each @@ -175,7 +174,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator, // inputs known by the UtxoSweeper. It clusters inputs by // 1) Required tx locktime // 2) Similar fee rates. -func (s *SimpleAggregator) ClusterInputs(inputs InputsMap) []InputSet { +func (s *SimpleAggregator) ClusterInputs(inputs InputsMap, _ int32) []InputSet { // We start by getting the inputs clusters by locktime. Since the // inputs commit to the locktime, they can only be clustered together // if the locktime is equal. @@ -492,7 +491,7 @@ func NewBudgetAggregator(estimator chainfee.Estimator, } // clusterGroup defines an alias for a set of inputs that are to be grouped. -type clusterGroup map[fn.Option[int32]][]SweeperInput +type clusterGroup map[int32][]SweeperInput // ClusterInputs creates a list of input sets from pending inputs. // 1. filter out inputs whose budget cannot cover min relay fee. @@ -502,7 +501,9 @@ type clusterGroup map[fn.Option[int32]][]SweeperInput // 5. optionally split a cluster if it exceeds the max input limit. // 6. create input sets from each of the clusters. // 7. create input sets for each of the exclusive inputs. -func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet { +func (b *BudgetAggregator) ClusterInputs(inputs InputsMap, + defaultDeadline int32) []InputSet { + // Filter out inputs that have a budget below min relay fee. filteredInputs := b.filterInputs(inputs) @@ -513,19 +514,25 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet { // any cluster. These inputs can only be swept independently as there's // no guarantee which input will be confirmed first, which means // grouping exclusive inputs may jeopardize non-exclusive inputs. - exclusiveInputs := make(InputsMap) + exclusiveInputs := make(map[wire.OutPoint]clusterGroup) // Iterate all the inputs and group them based on their specified // deadline heights. for _, input := range filteredInputs { + // Get deadline height, and use the specified default deadline + // height if it's not set. + height := input.params.DeadlineHeight.UnwrapOr(defaultDeadline) + // Put exclusive inputs in their own set. if input.params.ExclusiveGroup != nil { log.Tracef("Input %v is exclusive", input.OutPoint()) - exclusiveInputs[input.OutPoint()] = input + exclusiveInputs[input.OutPoint()] = clusterGroup{ + height: []SweeperInput{*input}, + } + continue } - height := input.params.DeadlineHeight cluster, ok := clusters[height] if !ok { cluster = make([]SweeperInput, 0) @@ -540,19 +547,21 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet { // NOTE: cannot pre-allocate the slice since we don't know the number // of input sets in advance. inputSets := make([]InputSet, 0) - for _, cluster := range clusters { + for height, cluster := range clusters { // Sort the inputs by their economical value. sortedInputs := b.sortInputs(cluster) // Create input sets from the cluster. - sets := b.createInputSets(sortedInputs) + sets := b.createInputSets(sortedInputs, height) inputSets = append(inputSets, sets...) } // Create input sets from the exclusive inputs. - for _, input := range exclusiveInputs { - sets := b.createInputSets([]SweeperInput{*input}) - inputSets = append(inputSets, sets...) + for _, cluster := range exclusiveInputs { + for height, input := range cluster { + sets := b.createInputSets(input, height) + inputSets = append(inputSets, sets...) + } } return inputSets @@ -561,7 +570,9 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet { // createInputSet takes a set of inputs which share the same deadline height // and turns them into a list of `InputSet`, each set is then used to create a // sweep transaction. -func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet { +func (b *BudgetAggregator) createInputSets(inputs []SweeperInput, + deadlineHeight int32) []InputSet { + // sets holds the InputSets that we will return. sets := make([]InputSet, 0) @@ -582,7 +593,9 @@ func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet { remainingInputs = remainingInputs[b.maxInputs:] // Create an InputSet using the max allowed number of inputs. - set, err := NewBudgetInputSet(currentInputs) + set, err := NewBudgetInputSet( + currentInputs, deadlineHeight, + ) if err != nil { log.Errorf("unable to create input set: %v", err) @@ -594,7 +607,9 @@ func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet { // Create an InputSet from the remaining inputs. if len(remainingInputs) > 0 { - set, err := NewBudgetInputSet(remainingInputs) + set, err := NewBudgetInputSet( + remainingInputs, deadlineHeight, + ) if err != nil { log.Errorf("unable to create input set: %v", err) return nil diff --git a/sweep/aggregator_test.go b/sweep/aggregator_test.go index fce02de32..e1a5e093d 100644 --- a/sweep/aggregator_test.go +++ b/sweep/aggregator_test.go @@ -39,6 +39,8 @@ var ( wire.OutPoint{Hash: chainhash.Hash{}, Index: 11}: &SweeperInput{}, wire.OutPoint{Hash: chainhash.Hash{}, Index: 12}: &SweeperInput{}, } + + testHeight = int32(800000) ) // TestMergeClusters check that we properly can merge clusters together, @@ -779,7 +781,7 @@ func TestBudgetAggregatorCreateInputSets(t *testing.T) { tc.setupMock() // Call the method under test. - result := b.createInputSets(tc.inputs) + result := b.createInputSets(tc.inputs, testHeight) // Validate the expected number of input sets are // returned. @@ -938,7 +940,8 @@ func TestBudgetInputSetClusterInputs(t *testing.T) { b := NewBudgetAggregator(estimator, DefaultMaxInputsPerTx) // Call the method under test. - result := b.ClusterInputs(inputs) + defaultDeadline := testHeight + DefaultDeadlineDelta + result := b.ClusterInputs(inputs, defaultDeadline) // We expect four input sets to be returned, one for each deadline and // extra one for the exclusive input. @@ -949,7 +952,7 @@ func TestBudgetInputSetClusterInputs(t *testing.T) { require.Len(t, setExclusive.Inputs(), 1) // Check the each of rest has exactly two inputs. - deadlines := make(map[fn.Option[int32]]struct{}) + deadlines := make(map[int32]struct{}) for _, set := range result[:3] { // We expect two inputs in each set. require.Len(t, set.Inputs(), 2) @@ -962,7 +965,7 @@ func TestBudgetInputSetClusterInputs(t *testing.T) { } // We expect to see all three deadlines. - require.Contains(t, deadlines, deadlineNone) - require.Contains(t, deadlines, deadline1) - require.Contains(t, deadlines, deadline2) + require.Contains(t, deadlines, defaultDeadline) + require.Contains(t, deadlines, deadline1.UnwrapOrFail(t)) + require.Contains(t, deadlines, deadline2.UnwrapOrFail(t)) } diff --git a/sweep/mock_test.go b/sweep/mock_test.go index c04807d6b..146f0fb95 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -8,7 +8,6 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" @@ -342,8 +341,10 @@ type mockUtxoAggregator struct { var _ UtxoAggregator = (*mockUtxoAggregator)(nil) // ClusterInputs takes a list of inputs and groups them into clusters. -func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap) []InputSet { - args := m.Called(inputs) +func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap, + defaultDeadline int32) []InputSet { + + args := m.Called(inputs, defaultDeadline) return args.Get(0).([]InputSet) } @@ -484,10 +485,10 @@ func (m *MockInputSet) NeedWalletInput() bool { } // DeadlineHeight returns the deadline height for the set. -func (m *MockInputSet) DeadlineHeight() fn.Option[int32] { +func (m *MockInputSet) DeadlineHeight() int32 { args := m.Called() - return args.Get(0).(fn.Option[int32]) + return args.Get(0).(int32) } // Budget givens the total amount that can be used as fees by this input set. diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 1bb4afb2b..6b8abd6fa 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -817,18 +817,13 @@ func (s *UtxoSweeper) sweep(set InputSet) error { s.currentOutputScript = pkScript } - // Create a default deadline height, and replace it with set's - // DeadlineHeight if it's set. - deadlineHeight := s.currentHeight + DefaultDeadlineDelta - deadlineHeight = set.DeadlineHeight().UnwrapOr(deadlineHeight) - // Create a fee bump request and ask the publisher to broadcast it. The // publisher will then take over and start monitoring the tx for // potential fee bump. req := &BumpRequest{ Inputs: set.Inputs(), Budget: set.Budget(), - DeadlineHeight: deadlineHeight, + DeadlineHeight: set.DeadlineHeight(), DeliveryAddress: s.currentOutputScript, MaxFeeRate: s.cfg.MaxFeeRate.FeePerKWeight(), // TODO(yy): pass the strategy here. @@ -1554,8 +1549,12 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap { // sweepPendingInputs is called when the ticker fires. It will create clusters // and attempt to create and publish the sweeping transactions. func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) { + // Create a default deadline height, which will be used when there's no + // DeadlineHeight specified for a given input. + defaultDeadline := s.currentHeight + DefaultDeadlineDelta + // Cluster all of our inputs based on the specific Aggregator. - sets := s.cfg.Aggregator.ClusterInputs(inputs) + sets := s.cfg.Aggregator.ClusterInputs(inputs, defaultDeadline) // sweepWithLock is a helper closure that executes the sweep within a // coin select lock to prevent the coins being selected for other diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 57fa74116..fecdf2105 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2679,6 +2679,9 @@ func TestSweepPendingInputs(t *testing.T) { }, }) + // Set a current height to test the deadline override. + s.currentHeight = testHeight + // Create an input set that needs wallet inputs. setNeedWallet := &MockInputSet{} defer setNeedWallet.AssertExpectations(t) @@ -2699,10 +2702,10 @@ func TestSweepPendingInputs(t *testing.T) { // Mock the methods used in `sweep`. This is not important for this // unit test. setNeedWallet.On("Inputs").Return(nil).Times(4) - setNeedWallet.On("DeadlineHeight").Return(fn.None[int32]()).Once() + setNeedWallet.On("DeadlineHeight").Return(testHeight).Once() setNeedWallet.On("Budget").Return(btcutil.Amount(1)).Once() normalSet.On("Inputs").Return(nil).Times(4) - normalSet.On("DeadlineHeight").Return(fn.None[int32]()).Once() + normalSet.On("DeadlineHeight").Return(testHeight).Once() normalSet.On("Budget").Return(btcutil.Amount(1)).Once() // Make pending inputs for testing. We don't need real values here as @@ -2710,7 +2713,9 @@ func TestSweepPendingInputs(t *testing.T) { pis := make(InputsMap) // Mock the aggregator to return the mocked input sets. - aggregator.On("ClusterInputs", pis).Return([]InputSet{ + expectedDeadlineUsed := testHeight + DefaultDeadlineDelta + aggregator.On("ClusterInputs", pis, + expectedDeadlineUsed).Return([]InputSet{ setNeedWallet, normalSet, }) diff --git a/sweep/tx_input_set.go b/sweep/tx_input_set.go index 132f66fbe..5ec29f860 100644 --- a/sweep/tx_input_set.go +++ b/sweep/tx_input_set.go @@ -60,9 +60,9 @@ type InputSet interface { // inputs. NeedWalletInput() bool - // DeadlineHeight returns an optional absolute block height to express - // the time-sensitivity of the input set. The outputs from a force - // close tx have different time preferences: + // DeadlineHeight returns an absolute block height to express the + // time-sensitivity of the input set. The outputs from a force close tx + // have different time preferences: // - to_local: no time pressure as it can only be swept by us. // - first level outgoing HTLC: must be swept before its corresponding // incoming HTLC's CLTV is reached. @@ -72,7 +72,7 @@ type InputSet interface { // - anchor: for CPFP-purpose anchor, it must be swept before any of // the above CLTVs is reached. For non-CPFP purpose anchor, there's // no time pressure. - DeadlineHeight() fn.Option[int32] + DeadlineHeight() int32 // Budget givens the total amount that can be used as fees by this // input set. @@ -201,8 +201,8 @@ func (t *txInputSet) Budget() btcutil.Amount { // DeadlineHeight gives the block height that this set must be confirmed by. // // NOTE: this field is only used for `BudgetInputSet`. -func (t *txInputSet) DeadlineHeight() fn.Option[int32] { - return fn.None[int32]() +func (t *txInputSet) DeadlineHeight() int32 { + return 0 } // NeedWalletInput returns true if the input set needs more wallet inputs. @@ -553,7 +553,7 @@ type BudgetInputSet struct { // deadlineHeight is the height which the inputs in this set must be // confirmed by. - deadlineHeight fn.Option[int32] + deadlineHeight int32 } // Compile-time constraint to ensure budgetInputSet implements InputSet. @@ -597,18 +597,14 @@ func validateInputs(inputs []SweeperInput) error { } // NewBudgetInputSet creates a new BudgetInputSet. -func NewBudgetInputSet(inputs []SweeperInput) (*BudgetInputSet, error) { +func NewBudgetInputSet(inputs []SweeperInput, + deadlineHeight int32) (*BudgetInputSet, error) { + // Validate the supplied inputs. if err := validateInputs(inputs); err != nil { return nil, err } - // TODO(yy): all the inputs share the same deadline height, which means - // there exists an opportunity to refactor the deadline height to be - // tracked on the set-level, not per input. This would allow us to - // avoid the overhead of tracking the same height for each input in the - // set. - deadlineHeight := inputs[0].params.DeadlineHeight bi := &BudgetInputSet{ deadlineHeight: deadlineHeight, inputs: make([]*SweeperInput, 0, len(inputs)), @@ -625,18 +621,13 @@ func NewBudgetInputSet(inputs []SweeperInput) (*BudgetInputSet, error) { // String returns a human-readable description of the input set. func (b *BudgetInputSet) String() string { - deadlineDesc := "none" - b.deadlineHeight.WhenSome(func(h int32) { - deadlineDesc = fmt.Sprintf("%d", h) - }) - inputsDesc := "" for _, input := range b.inputs { inputsDesc += fmt.Sprintf("\n%v", input) } return fmt.Sprintf("BudgetInputSet(budget=%v, deadline=%v, "+ - "inputs=[%v])", b.Budget(), deadlineDesc, inputsDesc) + "inputs=[%v])", b.Budget(), b.DeadlineHeight(), inputsDesc) } // addInput adds an input to the input set. @@ -748,12 +739,9 @@ func (b *BudgetInputSet) AddWalletInputs(wallet Wallet) error { pi := SweeperInput{ Input: input, params: Params{ - // Inherit the deadline height from the input - // set. - DeadlineHeight: b.deadlineHeight, + DeadlineHeight: fn.Some(b.deadlineHeight), }, } - b.addInput(pi) // Return if we've reached the minimum output amount. @@ -784,7 +772,7 @@ func (b *BudgetInputSet) Budget() btcutil.Amount { // DeadlineHeight returns the deadline height of the set. // // NOTE: part of the InputSet interface. -func (b *BudgetInputSet) DeadlineHeight() fn.Option[int32] { +func (b *BudgetInputSet) DeadlineHeight() int32 { return b.deadlineHeight } diff --git a/sweep/tx_input_set_test.go b/sweep/tx_input_set_test.go index cbe1abb8e..6699886ba 100644 --- a/sweep/tx_input_set_test.go +++ b/sweep/tx_input_set_test.go @@ -253,7 +253,7 @@ func TestNewBudgetInputSet(t *testing.T) { rt := require.New(t) // Pass an empty slice and expect an error. - set, err := NewBudgetInputSet([]SweeperInput{}) + set, err := NewBudgetInputSet([]SweeperInput{}, testHeight) rt.ErrorContains(err, "inputs slice is empty") rt.Nil(set) @@ -284,17 +284,17 @@ func TestNewBudgetInputSet(t *testing.T) { } // Pass a slice of inputs with different deadline heights. - set, err = NewBudgetInputSet([]SweeperInput{input1, input2}) + set, err = NewBudgetInputSet([]SweeperInput{input1, input2}, testHeight) rt.ErrorContains(err, "inputs have different deadline heights") rt.Nil(set) // Pass a slice of inputs that only one input has the deadline height. - set, err = NewBudgetInputSet([]SweeperInput{input0, input2}) + set, err = NewBudgetInputSet([]SweeperInput{input0, input2}, testHeight) rt.NoError(err) rt.NotNil(set) // Pass a slice of inputs that are duplicates. - set, err = NewBudgetInputSet([]SweeperInput{input1, input1}) + set, err = NewBudgetInputSet([]SweeperInput{input1, input1}, testHeight) rt.ErrorContains(err, "duplicate inputs") rt.Nil(set) } @@ -314,7 +314,7 @@ func TestBudgetInputSetAddInput(t *testing.T) { } // Initialize an input set, which adds the above input. - set, err := NewBudgetInputSet([]SweeperInput{*pi}) + set, err := NewBudgetInputSet([]SweeperInput{*pi}, testHeight) require.NoError(t, err) // Add the input to the set again. @@ -646,7 +646,7 @@ func TestAddWalletInputSuccess(t *testing.T) { min, max).Return([]*lnwallet.Utxo{utxo, utxo}, nil).Once() // Initialize an input set with the pending input. - set, err := NewBudgetInputSet([]SweeperInput{*pi}) + set, err := NewBudgetInputSet([]SweeperInput{*pi}, deadline) require.NoError(t, err) // Add wallet inputs to the input set, which should give us an error as @@ -669,7 +669,7 @@ func TestAddWalletInputSuccess(t *testing.T) { // Finally, check the interface methods. require.EqualValues(t, budget, set.Budget()) - require.Equal(t, deadline, set.DeadlineHeight().UnsafeFromSome()) + require.Equal(t, deadline, set.DeadlineHeight()) // Weak check, a strong check is to open the slice and check each item. require.Len(t, set.inputs, 3) }