sweep: pass default deadline height when clustering inputs

This commit changes the method `ClusterInputs` to also take a default
deadline height. Previously, when calculating the default deadline
height for a non-time sensitive input, we would first cluster it with
other non-time sensitive inputs, then give it a deadline before we are
about to `sweep`. This is now moved to the step where we decide to
cluster inputs, allowing time-sensitive and non-sensitive inputs to be
grouped together, if they happen to share the same deadline heights.
This commit is contained in:
yyforyongyu 2024-03-27 19:22:02 +08:00
parent 15588355b3
commit 4c13ea1747
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
7 changed files with 80 additions and 69 deletions

View File

@ -5,7 +5,6 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
@ -123,7 +122,7 @@ func (c *inputCluster) createInputSets(maxFeeRate chainfee.SatPerKWeight,
type UtxoAggregator interface {
// ClusterInputs takes a list of inputs and groups them into input
// sets. Each input set will be used to create a sweeping transaction.
ClusterInputs(InputsMap) []InputSet
ClusterInputs(inputs InputsMap, defaultDeadline int32) []InputSet
}
// SimpleAggregator aggregates inputs known by the Sweeper based on each
@ -175,7 +174,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
// inputs known by the UtxoSweeper. It clusters inputs by
// 1) Required tx locktime
// 2) Similar fee rates.
func (s *SimpleAggregator) ClusterInputs(inputs InputsMap) []InputSet {
func (s *SimpleAggregator) ClusterInputs(inputs InputsMap, _ int32) []InputSet {
// We start by getting the inputs clusters by locktime. Since the
// inputs commit to the locktime, they can only be clustered together
// if the locktime is equal.
@ -492,7 +491,7 @@ func NewBudgetAggregator(estimator chainfee.Estimator,
}
// clusterGroup defines an alias for a set of inputs that are to be grouped.
type clusterGroup map[fn.Option[int32]][]SweeperInput
type clusterGroup map[int32][]SweeperInput
// ClusterInputs creates a list of input sets from pending inputs.
// 1. filter out inputs whose budget cannot cover min relay fee.
@ -502,7 +501,9 @@ type clusterGroup map[fn.Option[int32]][]SweeperInput
// 5. optionally split a cluster if it exceeds the max input limit.
// 6. create input sets from each of the clusters.
// 7. create input sets for each of the exclusive inputs.
func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet {
func (b *BudgetAggregator) ClusterInputs(inputs InputsMap,
defaultDeadline int32) []InputSet {
// Filter out inputs that have a budget below min relay fee.
filteredInputs := b.filterInputs(inputs)
@ -513,19 +514,25 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet {
// any cluster. These inputs can only be swept independently as there's
// no guarantee which input will be confirmed first, which means
// grouping exclusive inputs may jeopardize non-exclusive inputs.
exclusiveInputs := make(InputsMap)
exclusiveInputs := make(map[wire.OutPoint]clusterGroup)
// Iterate all the inputs and group them based on their specified
// deadline heights.
for _, input := range filteredInputs {
// Get deadline height, and use the specified default deadline
// height if it's not set.
height := input.params.DeadlineHeight.UnwrapOr(defaultDeadline)
// Put exclusive inputs in their own set.
if input.params.ExclusiveGroup != nil {
log.Tracef("Input %v is exclusive", input.OutPoint())
exclusiveInputs[input.OutPoint()] = input
exclusiveInputs[input.OutPoint()] = clusterGroup{
height: []SweeperInput{*input},
}
continue
}
height := input.params.DeadlineHeight
cluster, ok := clusters[height]
if !ok {
cluster = make([]SweeperInput, 0)
@ -540,19 +547,21 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet {
// NOTE: cannot pre-allocate the slice since we don't know the number
// of input sets in advance.
inputSets := make([]InputSet, 0)
for _, cluster := range clusters {
for height, cluster := range clusters {
// Sort the inputs by their economical value.
sortedInputs := b.sortInputs(cluster)
// Create input sets from the cluster.
sets := b.createInputSets(sortedInputs)
sets := b.createInputSets(sortedInputs, height)
inputSets = append(inputSets, sets...)
}
// Create input sets from the exclusive inputs.
for _, input := range exclusiveInputs {
sets := b.createInputSets([]SweeperInput{*input})
inputSets = append(inputSets, sets...)
for _, cluster := range exclusiveInputs {
for height, input := range cluster {
sets := b.createInputSets(input, height)
inputSets = append(inputSets, sets...)
}
}
return inputSets
@ -561,7 +570,9 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet {
// createInputSet takes a set of inputs which share the same deadline height
// and turns them into a list of `InputSet`, each set is then used to create a
// sweep transaction.
func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet {
func (b *BudgetAggregator) createInputSets(inputs []SweeperInput,
deadlineHeight int32) []InputSet {
// sets holds the InputSets that we will return.
sets := make([]InputSet, 0)
@ -582,7 +593,9 @@ func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet {
remainingInputs = remainingInputs[b.maxInputs:]
// Create an InputSet using the max allowed number of inputs.
set, err := NewBudgetInputSet(currentInputs)
set, err := NewBudgetInputSet(
currentInputs, deadlineHeight,
)
if err != nil {
log.Errorf("unable to create input set: %v", err)
@ -594,7 +607,9 @@ func (b *BudgetAggregator) createInputSets(inputs []SweeperInput) []InputSet {
// Create an InputSet from the remaining inputs.
if len(remainingInputs) > 0 {
set, err := NewBudgetInputSet(remainingInputs)
set, err := NewBudgetInputSet(
remainingInputs, deadlineHeight,
)
if err != nil {
log.Errorf("unable to create input set: %v", err)
return nil

View File

@ -39,6 +39,8 @@ var (
wire.OutPoint{Hash: chainhash.Hash{}, Index: 11}: &SweeperInput{},
wire.OutPoint{Hash: chainhash.Hash{}, Index: 12}: &SweeperInput{},
}
testHeight = int32(800000)
)
// TestMergeClusters check that we properly can merge clusters together,
@ -779,7 +781,7 @@ func TestBudgetAggregatorCreateInputSets(t *testing.T) {
tc.setupMock()
// Call the method under test.
result := b.createInputSets(tc.inputs)
result := b.createInputSets(tc.inputs, testHeight)
// Validate the expected number of input sets are
// returned.
@ -938,7 +940,8 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
b := NewBudgetAggregator(estimator, DefaultMaxInputsPerTx)
// Call the method under test.
result := b.ClusterInputs(inputs)
defaultDeadline := testHeight + DefaultDeadlineDelta
result := b.ClusterInputs(inputs, defaultDeadline)
// We expect four input sets to be returned, one for each deadline and
// extra one for the exclusive input.
@ -949,7 +952,7 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
require.Len(t, setExclusive.Inputs(), 1)
// Check the each of rest has exactly two inputs.
deadlines := make(map[fn.Option[int32]]struct{})
deadlines := make(map[int32]struct{})
for _, set := range result[:3] {
// We expect two inputs in each set.
require.Len(t, set.Inputs(), 2)
@ -962,7 +965,7 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
}
// We expect to see all three deadlines.
require.Contains(t, deadlines, deadlineNone)
require.Contains(t, deadlines, deadline1)
require.Contains(t, deadlines, deadline2)
require.Contains(t, deadlines, defaultDeadline)
require.Contains(t, deadlines, deadline1.UnwrapOrFail(t))
require.Contains(t, deadlines, deadline2.UnwrapOrFail(t))
}

View File

@ -8,7 +8,6 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
@ -342,8 +341,10 @@ type mockUtxoAggregator struct {
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
// ClusterInputs takes a list of inputs and groups them into clusters.
func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap) []InputSet {
args := m.Called(inputs)
func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap,
defaultDeadline int32) []InputSet {
args := m.Called(inputs, defaultDeadline)
return args.Get(0).([]InputSet)
}
@ -484,10 +485,10 @@ func (m *MockInputSet) NeedWalletInput() bool {
}
// DeadlineHeight returns the deadline height for the set.
func (m *MockInputSet) DeadlineHeight() fn.Option[int32] {
func (m *MockInputSet) DeadlineHeight() int32 {
args := m.Called()
return args.Get(0).(fn.Option[int32])
return args.Get(0).(int32)
}
// Budget givens the total amount that can be used as fees by this input set.

View File

@ -817,18 +817,13 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
s.currentOutputScript = pkScript
}
// Create a default deadline height, and replace it with set's
// DeadlineHeight if it's set.
deadlineHeight := s.currentHeight + DefaultDeadlineDelta
deadlineHeight = set.DeadlineHeight().UnwrapOr(deadlineHeight)
// Create a fee bump request and ask the publisher to broadcast it. The
// publisher will then take over and start monitoring the tx for
// potential fee bump.
req := &BumpRequest{
Inputs: set.Inputs(),
Budget: set.Budget(),
DeadlineHeight: deadlineHeight,
DeadlineHeight: set.DeadlineHeight(),
DeliveryAddress: s.currentOutputScript,
MaxFeeRate: s.cfg.MaxFeeRate.FeePerKWeight(),
// TODO(yy): pass the strategy here.
@ -1554,8 +1549,12 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
// Create a default deadline height, which will be used when there's no
// DeadlineHeight specified for a given input.
defaultDeadline := s.currentHeight + DefaultDeadlineDelta
// Cluster all of our inputs based on the specific Aggregator.
sets := s.cfg.Aggregator.ClusterInputs(inputs)
sets := s.cfg.Aggregator.ClusterInputs(inputs, defaultDeadline)
// sweepWithLock is a helper closure that executes the sweep within a
// coin select lock to prevent the coins being selected for other

View File

@ -2679,6 +2679,9 @@ func TestSweepPendingInputs(t *testing.T) {
},
})
// Set a current height to test the deadline override.
s.currentHeight = testHeight
// Create an input set that needs wallet inputs.
setNeedWallet := &MockInputSet{}
defer setNeedWallet.AssertExpectations(t)
@ -2699,10 +2702,10 @@ func TestSweepPendingInputs(t *testing.T) {
// Mock the methods used in `sweep`. This is not important for this
// unit test.
setNeedWallet.On("Inputs").Return(nil).Times(4)
setNeedWallet.On("DeadlineHeight").Return(fn.None[int32]()).Once()
setNeedWallet.On("DeadlineHeight").Return(testHeight).Once()
setNeedWallet.On("Budget").Return(btcutil.Amount(1)).Once()
normalSet.On("Inputs").Return(nil).Times(4)
normalSet.On("DeadlineHeight").Return(fn.None[int32]()).Once()
normalSet.On("DeadlineHeight").Return(testHeight).Once()
normalSet.On("Budget").Return(btcutil.Amount(1)).Once()
// Make pending inputs for testing. We don't need real values here as
@ -2710,7 +2713,9 @@ func TestSweepPendingInputs(t *testing.T) {
pis := make(InputsMap)
// Mock the aggregator to return the mocked input sets.
aggregator.On("ClusterInputs", pis).Return([]InputSet{
expectedDeadlineUsed := testHeight + DefaultDeadlineDelta
aggregator.On("ClusterInputs", pis,
expectedDeadlineUsed).Return([]InputSet{
setNeedWallet, normalSet,
})

View File

@ -60,9 +60,9 @@ type InputSet interface {
// inputs.
NeedWalletInput() bool
// DeadlineHeight returns an optional absolute block height to express
// the time-sensitivity of the input set. The outputs from a force
// close tx have different time preferences:
// DeadlineHeight returns an absolute block height to express the
// time-sensitivity of the input set. The outputs from a force close tx
// have different time preferences:
// - to_local: no time pressure as it can only be swept by us.
// - first level outgoing HTLC: must be swept before its corresponding
// incoming HTLC's CLTV is reached.
@ -72,7 +72,7 @@ type InputSet interface {
// - anchor: for CPFP-purpose anchor, it must be swept before any of
// the above CLTVs is reached. For non-CPFP purpose anchor, there's
// no time pressure.
DeadlineHeight() fn.Option[int32]
DeadlineHeight() int32
// Budget givens the total amount that can be used as fees by this
// input set.
@ -201,8 +201,8 @@ func (t *txInputSet) Budget() btcutil.Amount {
// DeadlineHeight gives the block height that this set must be confirmed by.
//
// NOTE: this field is only used for `BudgetInputSet`.
func (t *txInputSet) DeadlineHeight() fn.Option[int32] {
return fn.None[int32]()
func (t *txInputSet) DeadlineHeight() int32 {
return 0
}
// NeedWalletInput returns true if the input set needs more wallet inputs.
@ -553,7 +553,7 @@ type BudgetInputSet struct {
// deadlineHeight is the height which the inputs in this set must be
// confirmed by.
deadlineHeight fn.Option[int32]
deadlineHeight int32
}
// Compile-time constraint to ensure budgetInputSet implements InputSet.
@ -597,18 +597,14 @@ func validateInputs(inputs []SweeperInput) error {
}
// NewBudgetInputSet creates a new BudgetInputSet.
func NewBudgetInputSet(inputs []SweeperInput) (*BudgetInputSet, error) {
func NewBudgetInputSet(inputs []SweeperInput,
deadlineHeight int32) (*BudgetInputSet, error) {
// Validate the supplied inputs.
if err := validateInputs(inputs); err != nil {
return nil, err
}
// TODO(yy): all the inputs share the same deadline height, which means
// there exists an opportunity to refactor the deadline height to be
// tracked on the set-level, not per input. This would allow us to
// avoid the overhead of tracking the same height for each input in the
// set.
deadlineHeight := inputs[0].params.DeadlineHeight
bi := &BudgetInputSet{
deadlineHeight: deadlineHeight,
inputs: make([]*SweeperInput, 0, len(inputs)),
@ -625,18 +621,13 @@ func NewBudgetInputSet(inputs []SweeperInput) (*BudgetInputSet, error) {
// String returns a human-readable description of the input set.
func (b *BudgetInputSet) String() string {
deadlineDesc := "none"
b.deadlineHeight.WhenSome(func(h int32) {
deadlineDesc = fmt.Sprintf("%d", h)
})
inputsDesc := ""
for _, input := range b.inputs {
inputsDesc += fmt.Sprintf("\n%v", input)
}
return fmt.Sprintf("BudgetInputSet(budget=%v, deadline=%v, "+
"inputs=[%v])", b.Budget(), deadlineDesc, inputsDesc)
"inputs=[%v])", b.Budget(), b.DeadlineHeight(), inputsDesc)
}
// addInput adds an input to the input set.
@ -748,12 +739,9 @@ func (b *BudgetInputSet) AddWalletInputs(wallet Wallet) error {
pi := SweeperInput{
Input: input,
params: Params{
// Inherit the deadline height from the input
// set.
DeadlineHeight: b.deadlineHeight,
DeadlineHeight: fn.Some(b.deadlineHeight),
},
}
b.addInput(pi)
// Return if we've reached the minimum output amount.
@ -784,7 +772,7 @@ func (b *BudgetInputSet) Budget() btcutil.Amount {
// DeadlineHeight returns the deadline height of the set.
//
// NOTE: part of the InputSet interface.
func (b *BudgetInputSet) DeadlineHeight() fn.Option[int32] {
func (b *BudgetInputSet) DeadlineHeight() int32 {
return b.deadlineHeight
}

View File

@ -253,7 +253,7 @@ func TestNewBudgetInputSet(t *testing.T) {
rt := require.New(t)
// Pass an empty slice and expect an error.
set, err := NewBudgetInputSet([]SweeperInput{})
set, err := NewBudgetInputSet([]SweeperInput{}, testHeight)
rt.ErrorContains(err, "inputs slice is empty")
rt.Nil(set)
@ -284,17 +284,17 @@ func TestNewBudgetInputSet(t *testing.T) {
}
// Pass a slice of inputs with different deadline heights.
set, err = NewBudgetInputSet([]SweeperInput{input1, input2})
set, err = NewBudgetInputSet([]SweeperInput{input1, input2}, testHeight)
rt.ErrorContains(err, "inputs have different deadline heights")
rt.Nil(set)
// Pass a slice of inputs that only one input has the deadline height.
set, err = NewBudgetInputSet([]SweeperInput{input0, input2})
set, err = NewBudgetInputSet([]SweeperInput{input0, input2}, testHeight)
rt.NoError(err)
rt.NotNil(set)
// Pass a slice of inputs that are duplicates.
set, err = NewBudgetInputSet([]SweeperInput{input1, input1})
set, err = NewBudgetInputSet([]SweeperInput{input1, input1}, testHeight)
rt.ErrorContains(err, "duplicate inputs")
rt.Nil(set)
}
@ -314,7 +314,7 @@ func TestBudgetInputSetAddInput(t *testing.T) {
}
// Initialize an input set, which adds the above input.
set, err := NewBudgetInputSet([]SweeperInput{*pi})
set, err := NewBudgetInputSet([]SweeperInput{*pi}, testHeight)
require.NoError(t, err)
// Add the input to the set again.
@ -646,7 +646,7 @@ func TestAddWalletInputSuccess(t *testing.T) {
min, max).Return([]*lnwallet.Utxo{utxo, utxo}, nil).Once()
// Initialize an input set with the pending input.
set, err := NewBudgetInputSet([]SweeperInput{*pi})
set, err := NewBudgetInputSet([]SweeperInput{*pi}, deadline)
require.NoError(t, err)
// Add wallet inputs to the input set, which should give us an error as
@ -669,7 +669,7 @@ func TestAddWalletInputSuccess(t *testing.T) {
// Finally, check the interface methods.
require.EqualValues(t, budget, set.Budget())
require.Equal(t, deadline, set.DeadlineHeight().UnsafeFromSome())
require.Equal(t, deadline, set.DeadlineHeight())
// Weak check, a strong check is to open the slice and check each item.
require.Len(t, set.inputs, 3)
}