sweep: don't give up an input based on number of attempts

This commit removes the logic where we remove an input when it's been
published more than 10 times. This is needed as in our future fee
bumper, we might start with a low fee and rebroadcast the same input for
hundred of blocks.
This commit is contained in:
yyforyongyu 2023-10-26 15:13:23 +08:00
parent 8b876be3b6
commit a8f5a09dea
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
5 changed files with 33 additions and 146 deletions

View File

@ -5,15 +5,16 @@ import "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
// PendingSweep is a CLI-friendly type of the walletrpc.PendingSweep proto. We
// use this to show more useful string versions of byte slices and enums.
type PendingSweep struct {
OutPoint OutPoint `json:"outpoint"`
WitnessType string `json:"witness_type"`
AmountSat uint32 `json:"amount_sat"`
SatPerVByte uint32 `json:"sat_per_vbyte"`
BroadcastAttempts uint32 `json:"broadcast_attempts"`
NextBroadcastHeight uint32 `json:"next_broadcast_height"`
RequestedSatPerVByte uint32 `json:"requested_sat_per_vbyte"`
RequestedConfTarget uint32 `json:"requested_conf_target"`
Force bool `json:"force"`
OutPoint OutPoint `json:"outpoint"`
WitnessType string `json:"witness_type"`
AmountSat uint32 `json:"amount_sat"`
SatPerVByte uint32 `json:"sat_per_vbyte"`
BroadcastAttempts uint32 `json:"broadcast_attempts"`
// TODO(yy): deprecate.
NextBroadcastHeight uint32 `json:"next_broadcast_height"`
RequestedSatPerVByte uint32 `json:"requested_sat_per_vbyte"`
RequestedConfTarget uint32 `json:"requested_conf_target"`
Force bool `json:"force"`
}
// NewPendingSweepFromProto converts the walletrpc.PendingSweep proto type into

View File

@ -145,17 +145,6 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
c.log.Warnf("our anchor spent by someone else")
outcome = channeldb.ResolverOutcomeUnclaimed
// The sweeper gave up on sweeping the anchor. This happens
// after the maximum number of sweep attempts has been reached.
// See sweep.DefaultMaxSweepAttempts. Sweep attempts are
// interspaced with random delays picked from a range that
// increases exponentially.
//
// We consider the anchor as being lost.
case sweep.ErrTooManyAttempts:
c.log.Warnf("anchor sweep abandoned")
outcome = channeldb.ResolverOutcomeUnclaimed
// An unexpected error occurred.
default:
c.log.Errorf("unable to sweep anchor: %v", sweepRes.Err)

View File

@ -882,7 +882,6 @@ func (w *WalletKit) PendingSweeps(ctx context.Context,
amountSat := uint32(pendingInput.Amount)
satPerVbyte := uint64(pendingInput.LastFeeRate.FeePerVByte())
broadcastAttempts := uint32(pendingInput.BroadcastAttempts)
nextBroadcastHeight := uint32(pendingInput.NextBroadcastHeight)
feePref := pendingInput.Params.Fee
requestedFee, ok := feePref.(sweep.FeeEstimateInfo)
@ -899,7 +898,6 @@ func (w *WalletKit) PendingSweeps(ctx context.Context,
AmountSat: amountSat,
SatPerVbyte: satPerVbyte,
BroadcastAttempts: broadcastAttempts,
NextBroadcastHeight: nextBroadcastHeight,
RequestedSatPerVbyte: requestedFeeRate,
RequestedConfTarget: requestedFee.ConfTarget,
Force: pendingInput.Params.Force,

View File

@ -26,10 +26,6 @@ var (
// confirmed in a tx of the remote party.
ErrRemoteSpend = errors.New("remote party swept utxo")
// ErrTooManyAttempts is returned in case sweeping an output has failed
// for the configured max number of attempts.
ErrTooManyAttempts = errors.New("sweep failed after max attempts")
// ErrFeePreferenceTooLow is returned when the fee preference gives a
// fee rate that's below the relay fee rate.
ErrFeePreferenceTooLow = errors.New("fee preference too low")
@ -188,10 +184,6 @@ type pendingInput struct {
// notifier spend registration.
ntfnRegCancel func()
// minPublishHeight indicates the minimum block height at which this
// input may be (re)published.
minPublishHeight int32
// publishAttempts records the number of attempts that have already been
// made to sweep this tx.
publishAttempts int
@ -268,10 +260,6 @@ type PendingInput struct {
// input.
BroadcastAttempts int
// NextBroadcastHeight is the next height of the chain at which we'll
// attempt to broadcast a transaction sweeping the input.
NextBroadcastHeight uint32
// Params contains the sweep parameters for this pending request.
Params Params
}
@ -698,6 +686,10 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
s.sweepPendingInputs(inputs)
// A new block comes in, update the bestHeight.
//
// TODO(yy): this is where we check our published transactions
// and perform RBF if needed. We'd also like to consult our fee
// bumper to get an updated fee rate.
case epoch, ok := <-blockEpochs:
if !ok {
return
@ -884,12 +876,6 @@ func (s *UtxoSweeper) getInputLists(
// sweeper to avoid this.
var newInputs, retryInputs []txInput
for _, input := range cluster.inputs {
// Skip inputs that have a minimum publish height that is not
// yet reached.
if input.minPublishHeight > s.currentHeight {
continue
}
// Add input to the either one of the lists.
if input.publishAttempts == 0 {
newInputs = append(newInputs, input)
@ -998,11 +984,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet,
}
// markInputsPendingPublish saves the sweeping tx to db and updates the pending
// inputs with the given tx inputs. It increments the `publishAttempts` and
// calculates the next broadcast height for each input. When the
// publishAttempts exceeds MaxSweepAttemps(10), this input will be removed.
//
// TODO(yy): add unit test once done refactoring.
// inputs with the given tx inputs. It also increments the `publishAttempts`.
func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord,
tx *wire.MsgTx) error {
@ -1041,36 +1023,6 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord,
// Record another publish attempt.
pi.publishAttempts++
// We don't care what the result of the publish call was. Even
// if it is published successfully, it can still be that it
// needs to be retried. Call NextAttemptDeltaFunc to calculate
// when to resweep this input.
nextAttemptDelta := s.cfg.NextAttemptDeltaFunc(
pi.publishAttempts,
)
pi.minPublishHeight = s.currentHeight + nextAttemptDelta
log.Debugf("Rescheduling input %v after %v attempts at "+
"height %v (delta %v)", input.PreviousOutPoint,
pi.publishAttempts, pi.minPublishHeight,
nextAttemptDelta)
if pi.publishAttempts >= s.cfg.MaxSweepAttempts {
log.Warnf("input %v: publishAttempts(%v) exceeds "+
"MaxSweepAttempts(%v), removed",
input.PreviousOutPoint, pi.publishAttempts,
s.cfg.MaxSweepAttempts)
// Signal result channels sweep result.
s.signalResult(pi, Result{
Err: ErrTooManyAttempts,
})
// Mark the input as failed.
pi.state = StateFailed
}
}
return nil
@ -1240,10 +1192,9 @@ func (s *UtxoSweeper) handlePendingSweepsReq(
Amount: btcutil.Amount(
pendingInput.SignDesc().Output.Value,
),
LastFeeRate: pendingInput.lastFeeRate,
BroadcastAttempts: pendingInput.publishAttempts,
NextBroadcastHeight: uint32(pendingInput.minPublishHeight),
Params: pendingInput.params,
LastFeeRate: pendingInput.lastFeeRate,
BroadcastAttempts: pendingInput.publishAttempts,
Params: pendingInput.params,
}
}
@ -1319,22 +1270,16 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
newParams.Fee = req.params.Fee
newParams.Force = req.params.Force
log.Debugf("Updating sweep parameters for %v from %v to %v", req.input,
pendingInput.params, newParams)
log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
req.input, pendingInput.state, pendingInput.params, newParams)
pendingInput.params = newParams
// We'll reset the input's publish height to the current so that a new
// transaction can be created that replaces the transaction currently
// spending the input. We only do this for inputs that have been
// broadcast at least once to ensure we don't spend an input before its
// maturity height.
// We need to reset the state so this input will be attempted again by
// our sweeper.
//
// NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the
// check for broadcast attempts is redundant at the moment.
if pendingInput.publishAttempts > 0 {
pendingInput.minPublishHeight = s.currentHeight
}
// TODO(yy): a dedicated state?
pendingInput.state = StateInit
resultChan := make(chan Result, 1)
pendingInput.listeners = append(pendingInput.listeners, resultChan)
@ -1455,11 +1400,10 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pi = &pendingInput{
state: StateInit,
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: s.currentHeight,
params: input.params,
state: StateInit,
listeners: []chan Result{input.resultChan},
Input: input.input,
params: input.params,
}
// Try to find fee info for possible RBF if this input has already been

View File

@ -881,41 +881,6 @@ func TestRetry(t *testing.T) {
ctx.finish(1)
}
// TestGiveUp asserts that the sweeper gives up on an input if it can't be swept
// after a configured number of attempts.a
func TestGiveUp(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan0, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
// We expect a sweep to be published at height 100 (mockChainIOHeight).
ctx.receiveTx()
// Because of MaxSweepAttemps, two more sweeps will be attempted. We
// configured exponential back-off without randomness for the test. The
// second attempt, we expect to happen at 101. The third attempt at 103.
// At that point, the input is expected to be failed.
// Second attempt
ctx.notifier.NotifyEpoch(101)
ctx.receiveTx()
// Third attempt
ctx.notifier.NotifyEpoch(103)
ctx.receiveTx()
ctx.expectResult(resultChan0, ErrTooManyAttempts)
ctx.backend.mine()
ctx.finish(1)
}
// TestDifferentFeePreferences ensures that the sweeper can have different
// transactions for different fee preferences. These transactions should be
// broadcast from highest to lowest fee rate.
@ -1030,24 +995,14 @@ func TestPendingInputs(t *testing.T) {
// We should expect to see all inputs pending.
ctx.assertPendingInputs(input1, input2, input3)
// We should expect to see both sweep transactions broadcast. The higher
// fee rate sweep should be broadcast first. We'll remove the lower fee
// rate sweep to ensure we can detect pending inputs after a sweep.
// Once the higher fee rate sweep confirms, we should no longer see
// those inputs pending.
// We should expect to see both sweep transactions broadcast - one for
// the higher feerate, the other for the lower.
ctx.receiveTx()
lowFeeRateTx := ctx.receiveTx()
ctx.backend.deleteUnconfirmed(lowFeeRateTx.TxHash())
ctx.receiveTx()
// Mine these txns, and we should expect to see the results delivered.
ctx.backend.mine()
ctx.expectResult(resultChan1, nil)
ctx.assertPendingInputs(input3)
// We'll then trigger a new block to rebroadcast the lower fee rate
// sweep. Once again we'll ensure those inputs are no longer pending
// once the sweep transaction confirms.
ctx.backend.notifier.NotifyEpoch(101)
ctx.receiveTx()
ctx.backend.mine()
ctx.expectResult(resultChan3, nil)
ctx.assertPendingInputs()