diff --git a/server.go b/server.go index 7c707cead..b64378376 100644 --- a/server.go +++ b/server.go @@ -1065,6 +1065,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, aggregator := sweep.NewSimpleUtxoAggregator( cc.FeeEstimator, cfg.Sweeper.MaxFeeRate.FeePerKWeight(), + sweep.DefaultMaxInputsPerTx, ) s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{ diff --git a/sweep/aggregator.go b/sweep/aggregator.go index 331993679..60d835abe 100644 --- a/sweep/aggregator.go +++ b/sweep/aggregator.go @@ -23,22 +23,6 @@ const ( DefaultFeeRateBucketSize = 10 ) -// inputSet is a set of inputs that can be used as the basis to generate a tx -// on. -type inputSet []input.Input - -// Cluster defines an interface that prepares inputs of a cluster to be grouped -// into a list of sets that can be used to create sweep transactions. -type Cluster interface { - // CreateInputSets goes through the cluster's inputs and constructs - // sets of inputs that can be used to generate a sensible transaction. - CreateInputSets(wallet Wallet, maxFeeRate chainfee.SatPerKWeight, - maxInputs int) ([]InputSet, error) -} - -// Compile-time constraint to ensure inputCluster implements Cluster. -var _ Cluster = (*inputCluster)(nil) - // inputCluster is a helper struct to gather a set of pending inputs that // should be swept with the specified fee rate. type inputCluster struct { @@ -52,7 +36,7 @@ type inputCluster struct { // the configured maximum number of inputs. Negative yield inputs are skipped. // No input sets with a total value after fees below the dust limit are // returned. -func (c *inputCluster) CreateInputSets( +func (c *inputCluster) createInputSets( wallet Wallet, maxFeeRate chainfee.SatPerKWeight, maxInputs int) ([]InputSet, error) { @@ -160,7 +144,7 @@ func (c *inputCluster) CreateInputSets( // sweeping transaction. type UtxoAggregator interface { // ClusterInputs takes a list of inputs and groups them into clusters. - ClusterInputs(pendingInputs) []Cluster + ClusterInputs(Wallet, pendingInputs) []InputSet } // SimpleAggregator aggregates inputs known by the Sweeper based on each @@ -175,6 +159,11 @@ type SimpleAggregator struct { // SimpleAggregator. MaxFeeRate chainfee.SatPerKWeight + // MaxInputsPerTx specifies the default maximum number of inputs allowed + // in a single sweep tx. If more need to be swept, multiple txes are + // created and published. + MaxInputsPerTx int + // FeeRateBucketSize is the default size of fee rate buckets we'll use // when clustering inputs into buckets with similar fee rates within // the SimpleAggregator. @@ -193,11 +182,12 @@ var _ UtxoAggregator = (*SimpleAggregator)(nil) // NewSimpleUtxoAggregator creates a new instance of a SimpleAggregator. func NewSimpleUtxoAggregator(estimator chainfee.Estimator, - max chainfee.SatPerKWeight) *SimpleAggregator { + max chainfee.SatPerKWeight, maxTx int) *SimpleAggregator { return &SimpleAggregator{ FeeEstimator: estimator, MaxFeeRate: max, + MaxInputsPerTx: maxTx, FeeRateBucketSize: DefaultFeeRateBucketSize, } } @@ -206,7 +196,9 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator, // inputs known by the UtxoSweeper. It clusters inputs by // 1) Required tx locktime // 2) Similar fee rates. -func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster { +func (s *SimpleAggregator) ClusterInputs( + wallet Wallet, inputs pendingInputs) []InputSet { + // We start by getting the inputs clusters by locktime. Since the // inputs commit to the locktime, they can only be clustered together // if the locktime is equal. @@ -225,12 +217,21 @@ func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster { clusters[j].sweepFeeRate }) - result := make([]Cluster, 0, len(clusters)) - for _, c := range clusters { - result = append(result, &c) + // Now that we have the clusters, we can create the input sets. + var inputSets []InputSet + for _, cluster := range clusters { + sets, err := cluster.createInputSets( + wallet, s.MaxFeeRate, s.MaxInputsPerTx, + ) + if err != nil { + log.Errorf("Unable to create input sets: %v", err) + continue + } + + inputSets = append(inputSets, sets...) } - return result + return inputSets } // clusterByLockTime takes the given set of pending inputs and clusters those diff --git a/sweep/aggregator_test.go b/sweep/aggregator_test.go index f3bf2cd28..2058464ad 100644 --- a/sweep/aggregator_test.go +++ b/sweep/aggregator_test.go @@ -320,7 +320,7 @@ func TestClusterByLockTime(t *testing.T) { ) // Create a test aggregator. - s := NewSimpleUtxoAggregator(nil, maxFeeRate) + s := NewSimpleUtxoAggregator(nil, maxFeeRate, 100) testCases := []struct { name string diff --git a/sweep/mocks.go b/sweep/mocks.go index 0e144ab4f..665a16d93 100644 --- a/sweep/mocks.go +++ b/sweep/mocks.go @@ -37,8 +37,10 @@ type mockUtxoAggregator struct { var _ UtxoAggregator = (*mockUtxoAggregator)(nil) // ClusterInputs takes a list of inputs and groups them into clusters. -func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []Cluster { - args := m.Called(pendingInputs{}) +func (m *mockUtxoAggregator) ClusterInputs(wallet Wallet, + inputs pendingInputs) []InputSet { - return args.Get(0).([]Cluster) + args := m.Called(wallet, inputs) + + return args.Get(0).([]InputSet) } diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8bfcd7075..328d0b1c0 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -740,36 +740,6 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { } } -// sweepCluster tries to sweep the given input cluster. -func (s *UtxoSweeper) sweepCluster(cluster Cluster) error { - // Execute the sweep within a coin select lock. Otherwise the coins - // that we are going to spend may be selected for other transactions - // like funding of a channel. - // - // TODO(yy): decrease the lock scope. - return s.cfg.Wallet.WithCoinSelectLock(func() error { - // Examine pending inputs and try to construct lists of inputs. - sets, err := cluster.CreateInputSets( - s.cfg.Wallet, - s.cfg.MaxFeeRate.FeePerKWeight(), - s.cfg.MaxInputsPerTx, - ) - if err != nil { - return fmt.Errorf("examine pending inputs: %w", err) - } - - // Create sweeping transaction for each set. - for _, inputs := range sets { - err := s.sweep(inputs) - if err != nil { - log.Errorf("sweep new inputs: %w", err) - } - } - - return nil - }) -} - // signalResult notifies the listeners of the final result of the input sweep. // It also cancels any pending spend notification. func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) { @@ -1561,17 +1531,33 @@ func (s *UtxoSweeper) updateSweeperInputs() pendingInputs { // sweepPendingInputs is called when the ticker fires. It will create clusters // and attempt to create and publish the sweeping transactions. func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) { - // We'll attempt to cluster all of our inputs with similar fee rates. - // Before attempting to sweep them, we'll sort them in descending fee - // rate order. We do this to ensure any inputs which have had their fee - // rate bumped are broadcast first in order enforce the RBF policy. - inputClusters := s.cfg.Aggregator.ClusterInputs(inputs) + // Execute the sweep within a coin select lock. Otherwise the coins + // that we are going to spend may be selected for other transactions + // like funding of a channel. + // + // TODO(yy): decrease the lock scope - we need to remove the wallet + // used here, which means we need to ask the aggregator to return input + // sets and specifying whether wallet utoxs are needed or not. Then, by + // calling `TxInput.NeedWalletInput`, we can then lock and add the + // wallet input, creating a much smaller lock scope. + err := s.cfg.Wallet.WithCoinSelectLock(func() error { + // Cluster all of our inputs based on the specific Aggregator. + inputSets := s.cfg.Aggregator.ClusterInputs( + s.cfg.Wallet, inputs, + ) - for _, cluster := range inputClusters { - err := s.sweepCluster(cluster) - if err != nil { - log.Errorf("input cluster sweep: %v", err) + // Create sweeping transaction for each set. + for _, inputs := range inputSets { + err := s.sweep(inputs) + if err != nil { + log.Errorf("sweep new inputs: %v", err) + } } + + return nil + }) + if err != nil { + log.Errorf("input cluster sweep: %v", err) } } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index a13b41fc8..e7cd27827 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -123,6 +123,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { aggregator := NewSimpleUtxoAggregator( estimator, DefaultMaxFeeRate.FeePerKWeight(), + testMaxInputsPerTx, ) ctx := &sweeperTestContext{ @@ -1287,6 +1288,11 @@ func TestLockTimes(t *testing.T) { // impact our test. ctx.sweeper.cfg.MaxInputsPerTx = 100 + // We also need to update the aggregator about this new config. + ctx.sweeper.cfg.Aggregator = NewSimpleUtxoAggregator( + ctx.estimator, DefaultMaxFeeRate.FeePerKWeight(), 100, + ) + // We will set up the lock times in such a way that we expect the // sweeper to divide the inputs into 4 diffeerent transactions. const numSweeps = 4 @@ -1369,7 +1375,7 @@ func TestLockTimes(t *testing.T) { // The should be no inputs not foud in any of the sweeps. if len(inputs) != 0 { - t.Fatalf("had unsweeped inputs") + t.Fatalf("had unsweeped inputs: %v", inputs) } // Mine the first sweeps @@ -1377,9 +1383,11 @@ func TestLockTimes(t *testing.T) { // Results should all come back. for i := range results { - result := <-results[i] - if result.Err != nil { - t.Fatal("expected input to be swept") + select { + case result := <-results[i]: + require.NoError(t, result.Err) + case <-time.After(1 * time.Second): + t.Fatalf("result %v did not come back", i) } } }