sweep+itest: return next retry fee rate in TxFailed event

We now return the next retry fee rate in `TxFailed` event in
`TxPublisher`. When handling the event, `UtxoSweeper` will update the
inputs to make sure the starting fee rate is set before attempting the
next sweep.
This commit is contained in:
yyforyongyu
2025-03-20 21:57:08 +08:00
parent 6dbf4ce470
commit eea3561eea
4 changed files with 82 additions and 38 deletions

View File

@@ -1103,25 +1103,36 @@ func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
case errors.Is(err, ErrTxNoOutput): case errors.Is(err, ErrTxNoOutput):
result.Event = TxFailed result.Event = TxFailed
// When the error is due to budget being used up, we'll send a TxFailed
// so these inputs can be retried with a different group in the next
// block.
case errors.Is(err, ErrMaxPosition):
result.Event = TxFailed
// When the error is due to zero fee rate delta, we'll send a TxFailed // When the error is due to zero fee rate delta, we'll send a TxFailed
// so these inputs can be retried in the next block. // so these inputs can be retried in the next block.
case errors.Is(err, ErrZeroFeeRateDelta): case errors.Is(err, ErrZeroFeeRateDelta):
result.Event = TxFailed result.Event = TxFailed
// When the error is due to not enough inputs to cover the budget, we'll // When the error is due to budget being used up, we'll send a TxFailed
// send a TxFailed event so these inputs can be retried when the wallet // so these inputs can be retried with a different group in the next
// has more UTXOs. // block.
case errors.Is(err, ErrMaxPosition):
fallthrough
// If the tx doesn't not have enough budget, or if the inputs amounts
// are not sufficient to cover the budget, we will return a TxFailed
// event so the sweeper can handle it by re-clustering the utxos.
case errors.Is(err, ErrNotEnoughInputs), case errors.Is(err, ErrNotEnoughInputs),
errors.Is(err, ErrNotEnoughBudget): errors.Is(err, ErrNotEnoughBudget):
result.Event = TxFailed result.Event = TxFailed
// Calculate the starting fee rate to be used when retry
// sweeping these inputs.
feeRate, err := t.calculateRetryFeeRate(r)
if err != nil {
result.Event = TxFatal
result.Err = err
}
// Attach the new fee rate.
result.FeeRate = feeRate
// When there are missing inputs, we'll create a TxUnknownSpend bump // When there are missing inputs, we'll create a TxUnknownSpend bump
// result here so the rest of the inputs can be retried. // result here so the rest of the inputs can be retried.
case errors.Is(err, ErrInputMissing): case errors.Is(err, ErrInputMissing):
@@ -1832,18 +1843,33 @@ func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
return fn.Some(*bumpResult) return fn.Some(*bumpResult)
} }
// If the error is not fee related, we will return a `TxFailed` event // Return a failed event to retry the sweep.
// so this input can be retried. event := TxFailed
// Calculate the next fee rate for the retry.
feeRate, ferr := t.calculateRetryFeeRate(r)
if ferr != nil {
// If there's an error with the fee calculation, we need to
// abort the sweep.
event = TxFatal
}
// If the error is not fee related, we will return a `TxFailed` event so
// this input can be retried.
result := fn.Some(BumpResult{ result := fn.Some(BumpResult{
Event: TxFailed, Event: event,
Tx: oldTx, Tx: oldTx,
Err: err, Err: err,
requestID: r.requestID, requestID: r.requestID,
FeeRate: feeRate,
}) })
// If the tx doesn't not have enought budget, we will return a result so // If the tx doesn't not have enough budget, or if the inputs amounts
// are not sufficient to cover the budget, we will return a result so
// the sweeper can handle it by re-clustering the utxos. // the sweeper can handle it by re-clustering the utxos.
if errors.Is(err, ErrNotEnoughBudget) { if errors.Is(err, ErrNotEnoughBudget) ||
errors.Is(err, ErrNotEnoughInputs) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err) log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return result return result
} }

View File

@@ -1134,6 +1134,7 @@ func TestCreateAnPublishFail(t *testing.T) {
// Create a test feerate and return it from the mock fee function. // Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000) feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate) m.feeFunc.On("FeeRate").Return(feerate)
m.feeFunc.On("Increment").Return(true, nil).Once()
// Create a testing monitor record. // Create a testing monitor record.
req := createTestBumpRequest() req := createTestBumpRequest()

View File

@@ -951,7 +951,9 @@ func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
} }
// markInputsPublishFailed marks the list of inputs as failed to be published. // markInputsPublishFailed marks the list of inputs as failed to be published.
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) { func (s *UtxoSweeper) markInputsPublishFailed(set InputSet,
feeRate chainfee.SatPerKWeight) {
// Reschedule sweep. // Reschedule sweep.
for _, inp := range set.Inputs() { for _, inp := range set.Inputs() {
op := inp.OutPoint() op := inp.OutPoint()
@@ -978,6 +980,15 @@ func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
// Update the input's state. // Update the input's state.
pi.state = PublishFailed pi.state = PublishFailed
log.Debugf("Input(%v): updating params: starting fee rate "+
"[%v -> %v]", op, pi.params.StartingFeeRate,
feeRate)
// Update the input using the fee rate specified from the
// BumpResult, which should be the starting fee rate to use for
// the next sweeping attempt.
pi.params.StartingFeeRate = fn.Some(feeRate)
} }
} }
@@ -1699,7 +1710,7 @@ func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
// the inputs specified by the set. // the inputs specified by the set.
// //
// TODO(yy): should we also remove the failed tx from db? // TODO(yy): should we also remove the failed tx from db?
s.markInputsPublishFailed(resp.set) s.markInputsPublishFailed(resp.set, resp.result.FeeRate)
} }
// handleBumpEventTxReplaced handles the case where the sweeping tx has been // handleBumpEventTxReplaced handles the case where the sweeping tx has been
@@ -1948,7 +1959,7 @@ func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) { func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
// Mark the inputs as publish failed, which means they will be retried // Mark the inputs as publish failed, which means they will be retried
// later. // later.
s.markInputsPublishFailed(r.set) s.markInputsPublishFailed(r.set, r.result.FeeRate)
// Get all the inputs that are not spent in the current sweeping tx. // Get all the inputs that are not spent in the current sweeping tx.
spentInputs := r.result.SpentInputs spentInputs := r.result.SpentInputs
@@ -1982,15 +1993,9 @@ func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
continue continue
} }
log.Debugf("Input(%v): updating params: starting fee rate "+ log.Debugf("Input(%v): updating params: immediate [%v -> true]",
"[%v -> %v], immediate [%v -> true]", op, op, r.result.FeeRate, input.params.Immediate)
input.params.StartingFeeRate, r.result.FeeRate,
input.params.Immediate)
// Update the input using the fee rate specified from the
// BumpResult, which should be the starting fee rate to use for
// the next sweeping attempt.
input.params.StartingFeeRate = fn.Some(r.result.FeeRate)
input.params.Immediate = true input.params.Immediate = true
inputsToRetry = append(inputsToRetry, input) inputsToRetry = append(inputsToRetry, input)
} }

View File

@@ -232,39 +232,51 @@ func TestMarkInputsPublishFailed(t *testing.T) {
inputPublishFailed, inputSwept, inputExcluded, inputFatal, inputPublishFailed, inputSwept, inputExcluded, inputFatal,
}) })
feeRate := chainfee.SatPerKWeight(1000)
// Mark the test inputs. We expect the non-exist input and the // Mark the test inputs. We expect the non-exist input and the
// inputInit to be skipped, and the final input to be marked as // inputInit to be skipped, and the final input to be marked as
// published. // published.
s.markInputsPublishFailed(set) s.markInputsPublishFailed(set, feeRate)
// We expect unchanged number of pending inputs. // We expect unchanged number of pending inputs.
require.Len(s.inputs, 7) require.Len(s.inputs, 7)
// We expect the init input's state to stay unchanged. // We expect the init input's state to stay unchanged.
require.Equal(Init, pi := s.inputs[inputInit.OutPoint()]
s.inputs[inputInit.OutPoint()].state) require.Equal(Init, pi.state)
require.True(pi.params.StartingFeeRate.IsNone())
// We expect the pending-publish input's is now marked as publish // We expect the pending-publish input's is now marked as publish
// failed. // failed.
require.Equal(PublishFailed, pi = s.inputs[inputPendingPublish.OutPoint()]
s.inputs[inputPendingPublish.OutPoint()].state) require.Equal(PublishFailed, pi.state)
require.Equal(feeRate, pi.params.StartingFeeRate.UnsafeFromSome())
// We expect the published input's is now marked as publish failed. // We expect the published input's is now marked as publish failed.
require.Equal(PublishFailed, pi = s.inputs[inputPublished.OutPoint()]
s.inputs[inputPublished.OutPoint()].state) require.Equal(PublishFailed, pi.state)
require.Equal(feeRate, pi.params.StartingFeeRate.UnsafeFromSome())
// We expect the publish failed input to stay unchanged. // We expect the publish failed input to stay unchanged.
require.Equal(PublishFailed, pi = s.inputs[inputPublishFailed.OutPoint()]
s.inputs[inputPublishFailed.OutPoint()].state) require.Equal(PublishFailed, pi.state)
require.True(pi.params.StartingFeeRate.IsNone())
// We expect the swept input to stay unchanged. // We expect the swept input to stay unchanged.
require.Equal(Swept, s.inputs[inputSwept.OutPoint()].state) pi = s.inputs[inputSwept.OutPoint()]
require.Equal(Swept, pi.state)
require.True(pi.params.StartingFeeRate.IsNone())
// We expect the excluded input to stay unchanged. // We expect the excluded input to stay unchanged.
require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state) pi = s.inputs[inputExcluded.OutPoint()]
require.Equal(Excluded, pi.state)
require.True(pi.params.StartingFeeRate.IsNone())
// We expect the failed input to stay unchanged. // We expect the fatal input to stay unchanged.
require.Equal(Fatal, s.inputs[inputFatal.OutPoint()].state) pi = s.inputs[inputFatal.OutPoint()]
require.Equal(Fatal, pi.state)
require.True(pi.params.StartingFeeRate.IsNone())
// Assert mocked statements are executed as expected. // Assert mocked statements are executed as expected.
mockStore.AssertExpectations(t) mockStore.AssertExpectations(t)