mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-04-14 15:09:06 +02:00
Merge pull request #3951 from joostjager/exclusive-group-sweeper
sweep: add exclusive groups
This commit is contained in:
commit
b75259a3e2
45
sweep/bucket_list.go
Normal file
45
sweep/bucket_list.go
Normal file
@ -0,0 +1,45 @@
|
||||
package sweep
|
||||
|
||||
// bucket contains a set of inputs that are not mutually exclusive.
|
||||
type bucket pendingInputs
|
||||
|
||||
// tryAdd tries to add a new input to this bucket.
|
||||
func (b bucket) tryAdd(input *pendingInput) bool {
|
||||
exclusiveGroup := input.params.ExclusiveGroup
|
||||
if exclusiveGroup != nil {
|
||||
for _, input := range b {
|
||||
existingGroup := input.params.ExclusiveGroup
|
||||
if existingGroup != nil &&
|
||||
*existingGroup == *exclusiveGroup {
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
b[*input.OutPoint()] = input
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// bucketList is a list of buckets that contain non-mutually exclusive inputs.
|
||||
type bucketList struct {
|
||||
buckets []bucket
|
||||
}
|
||||
|
||||
// add adds a new input. If the input is not accepted by any of the existing
|
||||
// buckets, a new bucket will be created.
|
||||
func (b *bucketList) add(input *pendingInput) {
|
||||
for _, existingBucket := range b.buckets {
|
||||
if existingBucket.tryAdd(input) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new bucket and add the input. It is not necessary to check
|
||||
// the return value of tryAdd because it will always succeed on an empty
|
||||
// bucket.
|
||||
newBucket := make(bucket)
|
||||
newBucket.tryAdd(input)
|
||||
b.buckets = append(b.buckets, newBucket)
|
||||
}
|
@ -50,6 +50,11 @@ var (
|
||||
// request from a client whom did not specify a fee preference.
|
||||
ErrNoFeePreference = errors.New("no fee preference specified")
|
||||
|
||||
// ErrExclusiveGroupSpend is returned in case a different input of the
|
||||
// same exclusive group was spent.
|
||||
ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
|
||||
"was spent")
|
||||
|
||||
// ErrSweeperShuttingDown is an error returned when a client attempts to
|
||||
// make a request to the UtxoSweeper, but it is unable to handle it as
|
||||
// it is/has already been stoppepd.
|
||||
@ -71,11 +76,16 @@ type Params struct {
|
||||
// Force indicates whether the input should be swept regardless of
|
||||
// whether it is economical to do so.
|
||||
Force bool
|
||||
|
||||
// ExclusiveGroup is an identifier that, if set, prevents other inputs
|
||||
// with the same identifier from being batched together.
|
||||
ExclusiveGroup *uint64
|
||||
}
|
||||
|
||||
// String returns a human readable interpretation of the sweep parameters.
|
||||
func (p Params) String() string {
|
||||
return fmt.Sprintf("fee=%v, force=%v", p.Fee, p.Force)
|
||||
return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v",
|
||||
p.Fee, p.Force, p.ExclusiveGroup)
|
||||
}
|
||||
|
||||
// pendingInput is created when an input reaches the main loop for the first
|
||||
@ -402,10 +412,9 @@ func (s *UtxoSweeper) SweepInput(input input.Input,
|
||||
}
|
||||
|
||||
log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
|
||||
"time_lock=%v, amount=%v, fee_preference=%v, force=%v",
|
||||
"time_lock=%v, amount=%v, params=(%v)",
|
||||
input.OutPoint(), input.WitnessType(), input.BlocksToMaturity(),
|
||||
btcutil.Amount(input.SignDesc().Output.Value),
|
||||
params.Fee, params.Force)
|
||||
btcutil.Amount(input.SignDesc().Output.Value), params)
|
||||
|
||||
sweeperInput := &sweepInputMessage{
|
||||
input: input,
|
||||
@ -553,7 +562,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
// registration, deleted from pendingInputs but
|
||||
// the ntfn was in-flight already. Or this could
|
||||
// be not one of our inputs.
|
||||
_, ok := s.pendingInputs[outpoint]
|
||||
input, ok := s.pendingInputs[outpoint]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -569,6 +578,14 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
Tx: spend.SpendingTx,
|
||||
Err: err,
|
||||
})
|
||||
|
||||
// Remove all other inputs in this exclusive
|
||||
// group.
|
||||
if input.params.ExclusiveGroup != nil {
|
||||
s.removeExclusiveGroup(
|
||||
*input.params.ExclusiveGroup,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Now that an input of ours is spent, we can try to
|
||||
@ -640,6 +657,31 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
}
|
||||
}
|
||||
|
||||
// removeExclusiveGroup removes all inputs in the given exclusive group. This
|
||||
// function is called when one of the exclusive group inputs has been spent. The
|
||||
// other inputs won't ever be spendable and can be removed. This also prevents
|
||||
// them from being part of future sweep transactions that would fail.
|
||||
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
|
||||
for outpoint, input := range s.pendingInputs {
|
||||
outpoint := outpoint
|
||||
|
||||
// Skip inputs that aren't exclusive.
|
||||
if input.params.ExclusiveGroup == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip inputs from other exclusive groups.
|
||||
if *input.params.ExclusiveGroup != group {
|
||||
continue
|
||||
}
|
||||
|
||||
// Signal result channels.
|
||||
s.signalAndRemove(&outpoint, Result{
|
||||
Err: ErrExclusiveGroupSpend,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// sweepCluster tries to sweep the given input cluster.
|
||||
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
|
||||
currentHeight int32) error {
|
||||
@ -680,7 +722,7 @@ func (s *UtxoSweeper) bucketForFeeRate(
|
||||
// sweep fee rate, which is determined by calculating the average fee rate of
|
||||
// all inputs within that cluster.
|
||||
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
|
||||
bucketInputs := make(map[int]pendingInputs)
|
||||
bucketInputs := make(map[int]*bucketList)
|
||||
inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight)
|
||||
|
||||
// First, we'll group together all inputs with similar fee rates. This
|
||||
@ -693,30 +735,37 @@ func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
|
||||
}
|
||||
feeGroup := s.bucketForFeeRate(feeRate)
|
||||
|
||||
inputs, ok := bucketInputs[feeGroup]
|
||||
// Create a bucket list for this fee rate if there isn't one
|
||||
// yet.
|
||||
buckets, ok := bucketInputs[feeGroup]
|
||||
if !ok {
|
||||
inputs = make(pendingInputs)
|
||||
bucketInputs[feeGroup] = inputs
|
||||
buckets = &bucketList{}
|
||||
bucketInputs[feeGroup] = buckets
|
||||
}
|
||||
|
||||
// Request the bucket list to add this input. The bucket list
|
||||
// will take into account exclusive group constraints.
|
||||
buckets.add(input)
|
||||
|
||||
input.lastFeeRate = feeRate
|
||||
inputs[op] = input
|
||||
inputFeeRates[op] = feeRate
|
||||
}
|
||||
|
||||
// We'll then determine the sweep fee rate for each set of inputs by
|
||||
// calculating the average fee rate of the inputs within each set.
|
||||
inputClusters := make([]inputCluster, 0, len(bucketInputs))
|
||||
for _, inputs := range bucketInputs {
|
||||
var sweepFeeRate chainfee.SatPerKWeight
|
||||
for op := range inputs {
|
||||
sweepFeeRate += inputFeeRates[op]
|
||||
for _, buckets := range bucketInputs {
|
||||
for _, inputs := range buckets.buckets {
|
||||
var sweepFeeRate chainfee.SatPerKWeight
|
||||
for op := range inputs {
|
||||
sweepFeeRate += inputFeeRates[op]
|
||||
}
|
||||
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
|
||||
inputClusters = append(inputClusters, inputCluster{
|
||||
sweepFeeRate: sweepFeeRate,
|
||||
inputs: inputs,
|
||||
})
|
||||
}
|
||||
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
|
||||
inputClusters = append(inputClusters, inputCluster{
|
||||
sweepFeeRate: sweepFeeRate,
|
||||
inputs: inputs,
|
||||
})
|
||||
}
|
||||
|
||||
return inputClusters
|
||||
|
@ -1232,3 +1232,63 @@ func TestBumpFeeRBF(t *testing.T) {
|
||||
|
||||
ctx.finish(1)
|
||||
}
|
||||
|
||||
// TestExclusiveGroup tests the sweeper exclusive group functionality.
|
||||
func TestExclusiveGroup(t *testing.T) {
|
||||
ctx := createSweeperTestContext(t)
|
||||
|
||||
// Sweep three inputs in the same exclusive group.
|
||||
var results []chan Result
|
||||
for i := 0; i < 3; i++ {
|
||||
exclusiveGroup := uint64(1)
|
||||
result, err := ctx.sweeper.SweepInput(
|
||||
spendableInputs[i], Params{
|
||||
Fee: FeePreference{ConfTarget: 6},
|
||||
ExclusiveGroup: &exclusiveGroup,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
// We expect all inputs to be published in separate transactions, even
|
||||
// though they share the same fee preference.
|
||||
ctx.tick()
|
||||
for i := 0; i < 3; i++ {
|
||||
sweepTx := ctx.receiveTx()
|
||||
if len(sweepTx.TxOut) != 1 {
|
||||
t.Fatal("expected a single tx out in the sweep tx")
|
||||
}
|
||||
|
||||
// Remove all txes except for the one that sweeps the first
|
||||
// input. This simulates the sweeps being conflicting.
|
||||
if sweepTx.TxIn[0].PreviousOutPoint !=
|
||||
*spendableInputs[0].OutPoint() {
|
||||
|
||||
ctx.backend.deleteUnconfirmed(sweepTx.TxHash())
|
||||
}
|
||||
}
|
||||
|
||||
// Mine the first sweep tx.
|
||||
ctx.backend.mine()
|
||||
|
||||
// Expect the first input to be swept by the confirmed sweep tx.
|
||||
result0 := <-results[0]
|
||||
if result0.Err != nil {
|
||||
t.Fatal("expected first input to be swept")
|
||||
}
|
||||
|
||||
// Expect the other two inputs to return an error. They have no chance
|
||||
// of confirming.
|
||||
result1 := <-results[1]
|
||||
if result1.Err != ErrExclusiveGroupSpend {
|
||||
t.Fatal("expected second input to be canceled")
|
||||
}
|
||||
|
||||
result2 := <-results[2]
|
||||
if result2.Err != ErrExclusiveGroupSpend {
|
||||
t.Fatal("expected third input to be canceled")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user