sweep: decrease coin selection lock scope

This commit changes how `WithCoinSelectLock` is used - previously the
lock is held when creating the input sets, now it's only be held after
the input sets have been created and explicitly signal they need wallet
inputs.
This commit is contained in:
yyforyongyu 2023-11-01 17:52:53 +08:00
parent b536e9bd3f
commit 0110a09595
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
5 changed files with 100 additions and 80 deletions

View File

@ -4,8 +4,6 @@ import (
"sort"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
@ -36,9 +34,8 @@ type inputCluster struct {
// the configured maximum number of inputs. Negative yield inputs are skipped.
// No input sets with a total value after fees below the dust limit are
// returned.
func (c *inputCluster) createInputSets(
wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
maxInputs int) ([]InputSet, error) {
func (c *inputCluster) createInputSets(maxFeeRate chainfee.SatPerKWeight,
maxInputs int) []InputSet {
// Turn the inputs into a slice so we can sort them.
inputList := make([]*pendingInput, 0, len(c.inputs))
@ -91,9 +88,7 @@ func (c *inputCluster) createInputSets(
// Start building a set of positive-yield tx inputs under the
// condition that the tx will be published with the specified
// fee rate.
txInputs := newTxInputSet(
wallet, c.sweepFeeRate, maxFeeRate, maxInputs,
)
txInputs := newTxInputSet(c.sweepFeeRate, maxFeeRate, maxInputs)
// From the set of sweepable inputs, keep adding inputs to the
// input set until the tx output value no longer goes up or the
@ -103,27 +98,7 @@ func (c *inputCluster) createInputSets(
// If there are no positive yield inputs, we can stop here.
inputCount := len(txInputs.inputs)
if inputCount == 0 {
return sets, nil
}
// Check the current output value and add wallet utxos if
// needed to push the output value to the lower limit.
if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil {
return nil, err
}
// If the output value of this block of inputs does not reach
// the dust limit, stop sweeping. Because of the sorting,
// continuing with the remaining inputs will only lead to sets
// with an even lower output value.
if !txInputs.enoughInput() {
// The change output is always a p2tr here.
dl := lnwallet.DustLimitForSize(input.P2TRSize)
log.Debugf("Input set value %v (required=%v, "+
"change=%v) below dust limit of %v",
txInputs.totalOutput(), txInputs.requiredOutput,
txInputs.changeOutput, dl)
return sets, nil
return sets
}
log.Infof("Candidate sweep set of size=%v (+%v wallet inputs),"+
@ -136,15 +111,16 @@ func (c *inputCluster) createInputSets(
inputList = inputList[inputCount:]
}
return sets, nil
return sets
}
// UtxoAggregator defines an interface that takes a list of inputs and
// aggregate them into groups. Each group is used as the inputs to create a
// sweeping transaction.
type UtxoAggregator interface {
// ClusterInputs takes a list of inputs and groups them into clusters.
ClusterInputs(Wallet, pendingInputs) []InputSet
// ClusterInputs takes a list of inputs and groups them into input
// sets. Each input set will be used to create a sweeping transaction.
ClusterInputs(pendingInputs) []InputSet
}
// SimpleAggregator aggregates inputs known by the Sweeper based on each
@ -196,9 +172,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
// inputs known by the UtxoSweeper. It clusters inputs by
// 1) Required tx locktime
// 2) Similar fee rates.
func (s *SimpleAggregator) ClusterInputs(
wallet Wallet, inputs pendingInputs) []InputSet {
func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []InputSet {
// We start by getting the inputs clusters by locktime. Since the
// inputs commit to the locktime, they can only be clustered together
// if the locktime is equal.
@ -220,14 +194,9 @@ func (s *SimpleAggregator) ClusterInputs(
// Now that we have the clusters, we can create the input sets.
var inputSets []InputSet
for _, cluster := range clusters {
sets, err := cluster.createInputSets(
wallet, s.MaxFeeRate, s.MaxInputsPerTx,
sets := cluster.createInputSets(
s.MaxFeeRate, s.MaxInputsPerTx,
)
if err != nil {
log.Errorf("Unable to create input sets: %v", err)
continue
}
inputSets = append(inputSets, sets...)
}

View File

@ -37,10 +37,8 @@ type mockUtxoAggregator struct {
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
// ClusterInputs takes a list of inputs and groups them into clusters.
func (m *mockUtxoAggregator) ClusterInputs(wallet Wallet,
inputs pendingInputs) []InputSet {
args := m.Called(wallet, inputs)
func (m *mockUtxoAggregator) ClusterInputs(inputs pendingInputs) []InputSet {
args := m.Called(inputs)
return args.Get(0).([]InputSet)
}

View File

@ -1531,33 +1531,44 @@ func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
// Execute the sweep within a coin select lock. Otherwise the coins
// that we are going to spend may be selected for other transactions
// like funding of a channel.
//
// TODO(yy): decrease the lock scope - we need to remove the wallet
// used here, which means we need to ask the aggregator to return input
// sets and specifying whether wallet utoxs are needed or not. Then, by
// calling `TxInput.NeedWalletInput`, we can then lock and add the
// wallet input, creating a much smaller lock scope.
err := s.cfg.Wallet.WithCoinSelectLock(func() error {
// Cluster all of our inputs based on the specific Aggregator.
inputSets := s.cfg.Aggregator.ClusterInputs(
s.cfg.Wallet, inputs,
)
// Cluster all of our inputs based on the specific Aggregator.
sets := s.cfg.Aggregator.ClusterInputs(inputs)
// Create sweeping transaction for each set.
for _, inputs := range inputSets {
err := s.sweep(inputs)
// sweepWithLock is a helper closure that executes the sweep within a
// coin select lock to prevent the coins being selected for other
// transactions like funding of a channel.
sweepWithLock := func(set InputSet) error {
return s.cfg.Wallet.WithCoinSelectLock(func() error {
// Try to add inputs from our wallet.
err := set.AddWalletInputs(s.cfg.Wallet)
if err != nil {
log.Errorf("sweep new inputs: %v", err)
return err
}
// Create sweeping transaction for each set.
err = s.sweep(set)
if err != nil {
return err
}
return nil
})
}
for _, set := range sets {
var err error
if set.NeedWalletInput() {
// Sweep the set of inputs that need the wallet inputs.
err = sweepWithLock(set)
} else {
// Sweep the set of inputs that don't need the wallet
// inputs.
err = s.sweep(set)
}
return nil
})
if err != nil {
log.Errorf("input cluster sweep: %v", err)
if err != nil {
log.Errorf("Sweep new inputs: %v", err)
}
}
}

View File

@ -30,6 +30,12 @@ const (
constraintsForce
)
var (
// ErrNotEnoughInputs is returned when there are not enough wallet
// inputs to construct a non-dust change output for an input set.
ErrNotEnoughInputs = fmt.Errorf("not enough inputs")
)
// InputSet defines an interface that's responsible for filtering a set of
// inputs that can be swept economically.
type InputSet interface {
@ -38,6 +44,15 @@ type InputSet interface {
// FeeRate returns the fee rate that should be used for the tx.
FeeRate() chainfee.SatPerKWeight
// AddWalletInputs adds wallet inputs to the set until a non-dust
// change output can be made. Return an error if there are not enough
// wallet inputs.
AddWalletInputs(wallet Wallet) error
// NeedWalletInput returns true if the input set needs more wallet
// inputs.
NeedWalletInput() bool
}
type txInputSetState struct {
@ -125,17 +140,13 @@ type txInputSet struct {
// maxInputs is the maximum number of inputs that will be accepted in
// the set.
maxInputs int
// wallet contains wallet functionality required by the input set to
// retrieve utxos.
wallet Wallet
}
// Compile-time constraint to ensure txInputSet implements InputSet.
var _ InputSet = (*txInputSet)(nil)
// newTxInputSet constructs a new, empty input set.
func newTxInputSet(wallet Wallet, feePerKW, maxFeeRate chainfee.SatPerKWeight,
func newTxInputSet(feePerKW, maxFeeRate chainfee.SatPerKWeight,
maxInputs int) *txInputSet {
state := txInputSetState{
@ -145,7 +156,6 @@ func newTxInputSet(wallet Wallet, feePerKW, maxFeeRate chainfee.SatPerKWeight,
b := txInputSet{
maxInputs: maxInputs,
wallet: wallet,
txInputSetState: state,
}
@ -162,6 +172,11 @@ func (t *txInputSet) FeeRate() chainfee.SatPerKWeight {
return t.feeRate
}
// NeedWalletInput returns true if the input set needs more wallet inputs.
func (t *txInputSet) NeedWalletInput() bool {
return !t.enoughInput()
}
// enoughInput returns true if we've accumulated enough inputs to pay the fees
// and have at least one output that meets the dust limit.
func (t *txInputSet) enoughInput() bool {
@ -384,9 +399,36 @@ func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []*pendingInput) {
// We managed to add all inputs to the set.
}
// AddWalletInputs adds wallet inputs to the set until a non-dust output can be
// made. This non-dust output is either a change output or a required output.
// Return an error if there are not enough wallet inputs.
func (t *txInputSet) AddWalletInputs(wallet Wallet) error {
// Check the current output value and add wallet utxos if needed to
// push the output value to the lower limit.
if err := t.tryAddWalletInputsIfNeeded(wallet); err != nil {
return err
}
// If the output value of this block of inputs does not reach the dust
// limit, stop sweeping. Because of the sorting, continuing with the
// remaining inputs will only lead to sets with an even lower output
// value.
if !t.enoughInput() {
// The change output is always a p2tr here.
dl := lnwallet.DustLimitForSize(input.P2TRSize)
log.Debugf("Input set value %v (required=%v, change=%v) "+
"below dust limit of %v", t.totalOutput(),
t.requiredOutput, t.changeOutput, dl)
return ErrNotEnoughInputs
}
return nil
}
// tryAddWalletInputsIfNeeded retrieves utxos from the wallet and tries adding
// as many as required to bring the tx output value above the given minimum.
func (t *txInputSet) tryAddWalletInputsIfNeeded() error {
func (t *txInputSet) tryAddWalletInputsIfNeeded(wallet Wallet) error {
// If we've already have enough to pay the transaction fees and have at
// least one output materialize, no action is needed.
if t.enoughInput() {
@ -396,7 +438,7 @@ func (t *txInputSet) tryAddWalletInputsIfNeeded() error {
// Retrieve wallet utxos. Only consider confirmed utxos to prevent
// problems around RBF rules for unconfirmed inputs. This currently
// ignores the configured coin selection strategy.
utxos, err := t.wallet.ListUnspentWitnessFromDefaultAccount(
utxos, err := wallet.ListUnspentWitnessFromDefaultAccount(
1, math.MaxInt32,
)
if err != nil {

View File

@ -16,7 +16,7 @@ func TestTxInputSet(t *testing.T) {
feeRate = 1000
maxInputs = 10
)
set := newTxInputSet(nil, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)
// Create a 300 sat input. The fee to sweep this input to a P2WKH output
// is 439 sats. That means that this input yields -139 sats and we
@ -65,7 +65,7 @@ func TestTxInputSetFromWallet(t *testing.T) {
)
wallet := &mockWallet{}
set := newTxInputSet(wallet, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)
// Add a 500 sat input to the set. It yields positively, but doesn't
// reach the output dust limit.
@ -86,7 +86,7 @@ func TestTxInputSetFromWallet(t *testing.T) {
t.Fatal("expected forced add to succeed")
}
err := set.tryAddWalletInputsIfNeeded()
err := set.AddWalletInputs(wallet)
if err != nil {
t.Fatal(err)
}
@ -134,7 +134,7 @@ func TestTxInputSetRequiredOutput(t *testing.T) {
feeRate = 1000
maxInputs = 10
)
set := newTxInputSet(nil, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)
// Attempt to add an input with a required txout below the dust limit.
// This should fail since we cannot trim such outputs.