mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-29 03:01:52 +01:00
sweep: deepen the interface Aggregator
This commit makes the `ClusterInputs` directly returning the `InputSet` so the sweeper doesn't know about the existence of `Cluster` interface. This way we can have a deeper interface as the sweeper only needs to interact with `Aggregator` only to get the final input sets, leaving the implementation details being managed by `SimpleAggregator` and future aggregators.
This commit is contained in:
parent
1530fee9b3
commit
b536e9bd3f
@ -1065,6 +1065,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||||||
|
|
||||||
aggregator := sweep.NewSimpleUtxoAggregator(
|
aggregator := sweep.NewSimpleUtxoAggregator(
|
||||||
cc.FeeEstimator, cfg.Sweeper.MaxFeeRate.FeePerKWeight(),
|
cc.FeeEstimator, cfg.Sweeper.MaxFeeRate.FeePerKWeight(),
|
||||||
|
sweep.DefaultMaxInputsPerTx,
|
||||||
)
|
)
|
||||||
|
|
||||||
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
|
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
|
||||||
|
@ -23,22 +23,6 @@ const (
|
|||||||
DefaultFeeRateBucketSize = 10
|
DefaultFeeRateBucketSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// inputSet is a set of inputs that can be used as the basis to generate a tx
|
|
||||||
// on.
|
|
||||||
type inputSet []input.Input
|
|
||||||
|
|
||||||
// Cluster defines an interface that prepares inputs of a cluster to be grouped
|
|
||||||
// into a list of sets that can be used to create sweep transactions.
|
|
||||||
type Cluster interface {
|
|
||||||
// CreateInputSets goes through the cluster's inputs and constructs
|
|
||||||
// sets of inputs that can be used to generate a sensible transaction.
|
|
||||||
CreateInputSets(wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
|
|
||||||
maxInputs int) ([]InputSet, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compile-time constraint to ensure inputCluster implements Cluster.
|
|
||||||
var _ Cluster = (*inputCluster)(nil)
|
|
||||||
|
|
||||||
// inputCluster is a helper struct to gather a set of pending inputs that
|
// inputCluster is a helper struct to gather a set of pending inputs that
|
||||||
// should be swept with the specified fee rate.
|
// should be swept with the specified fee rate.
|
||||||
type inputCluster struct {
|
type inputCluster struct {
|
||||||
@ -52,7 +36,7 @@ type inputCluster struct {
|
|||||||
// the configured maximum number of inputs. Negative yield inputs are skipped.
|
// the configured maximum number of inputs. Negative yield inputs are skipped.
|
||||||
// No input sets with a total value after fees below the dust limit are
|
// No input sets with a total value after fees below the dust limit are
|
||||||
// returned.
|
// returned.
|
||||||
func (c *inputCluster) CreateInputSets(
|
func (c *inputCluster) createInputSets(
|
||||||
wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
|
wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
|
||||||
maxInputs int) ([]InputSet, error) {
|
maxInputs int) ([]InputSet, error) {
|
||||||
|
|
||||||
@ -160,7 +144,7 @@ func (c *inputCluster) CreateInputSets(
|
|||||||
// sweeping transaction.
|
// sweeping transaction.
|
||||||
type UtxoAggregator interface {
|
type UtxoAggregator interface {
|
||||||
// ClusterInputs takes a list of inputs and groups them into clusters.
|
// ClusterInputs takes a list of inputs and groups them into clusters.
|
||||||
ClusterInputs(pendingInputs) []Cluster
|
ClusterInputs(Wallet, pendingInputs) []InputSet
|
||||||
}
|
}
|
||||||
|
|
||||||
// SimpleAggregator aggregates inputs known by the Sweeper based on each
|
// SimpleAggregator aggregates inputs known by the Sweeper based on each
|
||||||
@ -175,6 +159,11 @@ type SimpleAggregator struct {
|
|||||||
// SimpleAggregator.
|
// SimpleAggregator.
|
||||||
MaxFeeRate chainfee.SatPerKWeight
|
MaxFeeRate chainfee.SatPerKWeight
|
||||||
|
|
||||||
|
// MaxInputsPerTx specifies the default maximum number of inputs allowed
|
||||||
|
// in a single sweep tx. If more need to be swept, multiple txes are
|
||||||
|
// created and published.
|
||||||
|
MaxInputsPerTx int
|
||||||
|
|
||||||
// FeeRateBucketSize is the default size of fee rate buckets we'll use
|
// FeeRateBucketSize is the default size of fee rate buckets we'll use
|
||||||
// when clustering inputs into buckets with similar fee rates within
|
// when clustering inputs into buckets with similar fee rates within
|
||||||
// the SimpleAggregator.
|
// the SimpleAggregator.
|
||||||
@ -193,11 +182,12 @@ var _ UtxoAggregator = (*SimpleAggregator)(nil)
|
|||||||
|
|
||||||
// NewSimpleUtxoAggregator creates a new instance of a SimpleAggregator.
|
// NewSimpleUtxoAggregator creates a new instance of a SimpleAggregator.
|
||||||
func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
|
func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
|
||||||
max chainfee.SatPerKWeight) *SimpleAggregator {
|
max chainfee.SatPerKWeight, maxTx int) *SimpleAggregator {
|
||||||
|
|
||||||
return &SimpleAggregator{
|
return &SimpleAggregator{
|
||||||
FeeEstimator: estimator,
|
FeeEstimator: estimator,
|
||||||
MaxFeeRate: max,
|
MaxFeeRate: max,
|
||||||
|
MaxInputsPerTx: maxTx,
|
||||||
FeeRateBucketSize: DefaultFeeRateBucketSize,
|
FeeRateBucketSize: DefaultFeeRateBucketSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -206,7 +196,9 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
|
|||||||
// inputs known by the UtxoSweeper. It clusters inputs by
|
// inputs known by the UtxoSweeper. It clusters inputs by
|
||||||
// 1) Required tx locktime
|
// 1) Required tx locktime
|
||||||
// 2) Similar fee rates.
|
// 2) Similar fee rates.
|
||||||
func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster {
|
func (s *SimpleAggregator) ClusterInputs(
|
||||||
|
wallet Wallet, inputs pendingInputs) []InputSet {
|
||||||
|
|
||||||
// We start by getting the inputs clusters by locktime. Since the
|
// We start by getting the inputs clusters by locktime. Since the
|
||||||
// inputs commit to the locktime, they can only be clustered together
|
// inputs commit to the locktime, they can only be clustered together
|
||||||
// if the locktime is equal.
|
// if the locktime is equal.
|
||||||
@ -225,12 +217,21 @@ func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster {
|
|||||||
clusters[j].sweepFeeRate
|
clusters[j].sweepFeeRate
|
||||||
})
|
})
|
||||||
|
|
||||||
result := make([]Cluster, 0, len(clusters))
|
// Now that we have the clusters, we can create the input sets.
|
||||||
for _, c := range clusters {
|
var inputSets []InputSet
|
||||||
result = append(result, &c)
|
for _, cluster := range clusters {
|
||||||
|
sets, err := cluster.createInputSets(
|
||||||
|
wallet, s.MaxFeeRate, s.MaxInputsPerTx,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to create input sets: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
inputSets = append(inputSets, sets...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return inputSets
|
||||||
}
|
}
|
||||||
|
|
||||||
// clusterByLockTime takes the given set of pending inputs and clusters those
|
// clusterByLockTime takes the given set of pending inputs and clusters those
|
||||||
|
@ -320,7 +320,7 @@ func TestClusterByLockTime(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Create a test aggregator.
|
// Create a test aggregator.
|
||||||
s := NewSimpleUtxoAggregator(nil, maxFeeRate)
|
s := NewSimpleUtxoAggregator(nil, maxFeeRate, 100)
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -37,8 +37,10 @@ type mockUtxoAggregator struct {
|
|||||||
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
|
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
|
||||||
|
|
||||||
// ClusterInputs takes a list of inputs and groups them into clusters.
|
// ClusterInputs takes a list of inputs and groups them into clusters.
|
||||||
func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []Cluster {
|
func (m *mockUtxoAggregator) ClusterInputs(wallet Wallet,
|
||||||
args := m.Called(pendingInputs{})
|
inputs pendingInputs) []InputSet {
|
||||||
|
|
||||||
return args.Get(0).([]Cluster)
|
args := m.Called(wallet, inputs)
|
||||||
|
|
||||||
|
return args.Get(0).([]InputSet)
|
||||||
}
|
}
|
||||||
|
@ -740,36 +740,6 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sweepCluster tries to sweep the given input cluster.
|
|
||||||
func (s *UtxoSweeper) sweepCluster(cluster Cluster) error {
|
|
||||||
// Execute the sweep within a coin select lock. Otherwise the coins
|
|
||||||
// that we are going to spend may be selected for other transactions
|
|
||||||
// like funding of a channel.
|
|
||||||
//
|
|
||||||
// TODO(yy): decrease the lock scope.
|
|
||||||
return s.cfg.Wallet.WithCoinSelectLock(func() error {
|
|
||||||
// Examine pending inputs and try to construct lists of inputs.
|
|
||||||
sets, err := cluster.CreateInputSets(
|
|
||||||
s.cfg.Wallet,
|
|
||||||
s.cfg.MaxFeeRate.FeePerKWeight(),
|
|
||||||
s.cfg.MaxInputsPerTx,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("examine pending inputs: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create sweeping transaction for each set.
|
|
||||||
for _, inputs := range sets {
|
|
||||||
err := s.sweep(inputs)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("sweep new inputs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// signalResult notifies the listeners of the final result of the input sweep.
|
// signalResult notifies the listeners of the final result of the input sweep.
|
||||||
// It also cancels any pending spend notification.
|
// It also cancels any pending spend notification.
|
||||||
func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
|
func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
|
||||||
@ -1561,17 +1531,33 @@ func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
|
|||||||
// sweepPendingInputs is called when the ticker fires. It will create clusters
|
// sweepPendingInputs is called when the ticker fires. It will create clusters
|
||||||
// and attempt to create and publish the sweeping transactions.
|
// and attempt to create and publish the sweeping transactions.
|
||||||
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
|
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
|
||||||
// We'll attempt to cluster all of our inputs with similar fee rates.
|
// Execute the sweep within a coin select lock. Otherwise the coins
|
||||||
// Before attempting to sweep them, we'll sort them in descending fee
|
// that we are going to spend may be selected for other transactions
|
||||||
// rate order. We do this to ensure any inputs which have had their fee
|
// like funding of a channel.
|
||||||
// rate bumped are broadcast first in order enforce the RBF policy.
|
//
|
||||||
inputClusters := s.cfg.Aggregator.ClusterInputs(inputs)
|
// TODO(yy): decrease the lock scope - we need to remove the wallet
|
||||||
|
// used here, which means we need to ask the aggregator to return input
|
||||||
|
// sets and specifying whether wallet utoxs are needed or not. Then, by
|
||||||
|
// calling `TxInput.NeedWalletInput`, we can then lock and add the
|
||||||
|
// wallet input, creating a much smaller lock scope.
|
||||||
|
err := s.cfg.Wallet.WithCoinSelectLock(func() error {
|
||||||
|
// Cluster all of our inputs based on the specific Aggregator.
|
||||||
|
inputSets := s.cfg.Aggregator.ClusterInputs(
|
||||||
|
s.cfg.Wallet, inputs,
|
||||||
|
)
|
||||||
|
|
||||||
for _, cluster := range inputClusters {
|
// Create sweeping transaction for each set.
|
||||||
err := s.sweepCluster(cluster)
|
for _, inputs := range inputSets {
|
||||||
if err != nil {
|
err := s.sweep(inputs)
|
||||||
log.Errorf("input cluster sweep: %v", err)
|
if err != nil {
|
||||||
|
log.Errorf("sweep new inputs: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("input cluster sweep: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +123,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
|
|||||||
|
|
||||||
aggregator := NewSimpleUtxoAggregator(
|
aggregator := NewSimpleUtxoAggregator(
|
||||||
estimator, DefaultMaxFeeRate.FeePerKWeight(),
|
estimator, DefaultMaxFeeRate.FeePerKWeight(),
|
||||||
|
testMaxInputsPerTx,
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx := &sweeperTestContext{
|
ctx := &sweeperTestContext{
|
||||||
@ -1287,6 +1288,11 @@ func TestLockTimes(t *testing.T) {
|
|||||||
// impact our test.
|
// impact our test.
|
||||||
ctx.sweeper.cfg.MaxInputsPerTx = 100
|
ctx.sweeper.cfg.MaxInputsPerTx = 100
|
||||||
|
|
||||||
|
// We also need to update the aggregator about this new config.
|
||||||
|
ctx.sweeper.cfg.Aggregator = NewSimpleUtxoAggregator(
|
||||||
|
ctx.estimator, DefaultMaxFeeRate.FeePerKWeight(), 100,
|
||||||
|
)
|
||||||
|
|
||||||
// We will set up the lock times in such a way that we expect the
|
// We will set up the lock times in such a way that we expect the
|
||||||
// sweeper to divide the inputs into 4 diffeerent transactions.
|
// sweeper to divide the inputs into 4 diffeerent transactions.
|
||||||
const numSweeps = 4
|
const numSweeps = 4
|
||||||
@ -1369,7 +1375,7 @@ func TestLockTimes(t *testing.T) {
|
|||||||
|
|
||||||
// The should be no inputs not foud in any of the sweeps.
|
// The should be no inputs not foud in any of the sweeps.
|
||||||
if len(inputs) != 0 {
|
if len(inputs) != 0 {
|
||||||
t.Fatalf("had unsweeped inputs")
|
t.Fatalf("had unsweeped inputs: %v", inputs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mine the first sweeps
|
// Mine the first sweeps
|
||||||
@ -1377,9 +1383,11 @@ func TestLockTimes(t *testing.T) {
|
|||||||
|
|
||||||
// Results should all come back.
|
// Results should all come back.
|
||||||
for i := range results {
|
for i := range results {
|
||||||
result := <-results[i]
|
select {
|
||||||
if result.Err != nil {
|
case result := <-results[i]:
|
||||||
t.Fatal("expected input to be swept")
|
require.NoError(t, result.Err)
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("result %v did not come back", i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user