sweep: add method handleBumpEventError and fix markInputFailed

Previously in `markInputFailed`, we'd remove all inputs under the same
group via `removeExclusiveGroup`. This is wrong as when the current
sweep fails for this input, it shouldn't affect other inputs.
This commit is contained in:
yyforyongyu 2024-04-30 19:25:06 +08:00 committed by yyforyongyu
parent 719ca5b229
commit ba238962d6
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 181 additions and 6 deletions

View File

@ -1441,11 +1441,6 @@ func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
pi.state = Failed
// Remove all other inputs in this exclusive group.
if pi.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(*pi.params.ExclusiveGroup)
}
s.signalResult(pi, Result{Err: err})
}
@ -1728,6 +1723,62 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
return nil
}
// handleBumpEventTxFatal handles the case where there's an unexpected error
// when creating or publishing the sweeping tx. In this case, the tx will be
// removed from the sweeper store and the inputs will be marked as `Failed`,
// which means they will not be retried.
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
r := resp.result
// Remove the tx from the sweeper store if there is one. Since this is
// a broadcast error, it's likely there isn't a tx here.
if r.Tx != nil {
txid := r.Tx.TxHash()
log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
// Remove the tx from the sweeper db if it exists.
if err := s.cfg.Store.DeleteTx(txid); err != nil {
return fmt.Errorf("delete tx record for %v: %w", txid,
err)
}
}
// Mark the inputs as failed.
s.markInputsFailed(resp.set, r.Err)
return nil
}
// markInputsFailed marks all inputs found in the tx as failed. It will also
// notify all the subscribers of these inputs.
func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) {
for _, inp := range set.Inputs() {
outpoint := inp.OutPoint()
input, ok := s.inputs[outpoint]
if !ok {
// It's very likely that a spending tx contains inputs
// that we don't know.
log.Tracef("Skipped marking input as failed: %v not "+
"found in pending inputs", outpoint)
continue
}
// If the input is already in a terminal state, we don't want
// to rewrite it, which also indicates an error as we only get
// an error event during the initial broadcast.
if input.terminated() {
log.Errorf("Skipped marking input=%v as failed due to "+
"unexpected state=%v", outpoint, input.state)
continue
}
s.markInputFailed(input, err)
}
}
// handleBumpEvent handles the result sent from the bumper based on its event
// type.
//
@ -1752,8 +1803,10 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
case TxReplaced:
return s.handleBumpEventTxReplaced(r)
// There's a fatal error in creating the tx, we will remove the tx from
// the sweeper db and mark the inputs as failed.
case TxFatal:
// TODO(yy): create a method to remove this input.
return s.handleBumpEventTxFatal(r)
}
return nil

View File

@ -1075,3 +1075,125 @@ func TestMonitorFeeBumpResult(t *testing.T) {
})
}
}
// TestMarkInputsFailed checks that given a list of inputs with different
// states, the method `markInputsFailed` correctly marks the inputs as failed.
func TestMarkInputsFailed(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{})
// Create testing inputs for each state.
// - inputInit specifies a newly created input. When marking this as
// published, we should see an error log as this input hasn't been
// published yet.
// - inputPendingPublish specifies an input about to be published.
// - inputPublished specifies an input that's published.
// - inputPublishFailed specifies an input that's failed to be
// published.
// - inputSwept specifies an input that's swept.
// - inputExcluded specifies an input that's excluded.
// - inputFailed specifies an input that's failed.
var (
inputInit = createMockInput(t, s, Init)
inputPendingPublish = createMockInput(t, s, PendingPublish)
inputPublished = createMockInput(t, s, Published)
inputPublishFailed = createMockInput(t, s, PublishFailed)
inputSwept = createMockInput(t, s, Swept)
inputExcluded = createMockInput(t, s, Excluded)
inputFailed = createMockInput(t, s, Failed)
)
// Gather all inputs.
set.On("Inputs").Return([]input.Input{
inputInit, inputPendingPublish, inputPublished,
inputPublishFailed, inputSwept, inputExcluded, inputFailed,
})
// Mark the test inputs. We expect the non-exist input and
// inputSwept/inputExcluded/inputFailed to be skipped.
s.markInputsFailed(set, errDummy)
// We expect unchanged number of pending inputs.
require.Len(s.inputs, 7)
// We expect the init input's to be marked as failed.
require.Equal(Failed, s.inputs[inputInit.OutPoint()].state)
// We expect the pending-publish input to be marked as failed.
require.Equal(Failed, s.inputs[inputPendingPublish.OutPoint()].state)
// We expect the published input to be marked as failed.
require.Equal(Failed, s.inputs[inputPublished.OutPoint()].state)
// We expect the publish failed input to be markd as failed.
require.Equal(Failed, s.inputs[inputPublishFailed.OutPoint()].state)
// We expect the swept input to stay unchanged.
require.Equal(Swept, s.inputs[inputSwept.OutPoint()].state)
// We expect the excluded input to stay unchanged.
require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state)
// We expect the failed input to stay unchanged.
require.Equal(Failed, s.inputs[inputFailed.OutPoint()].state)
}
// TestHandleBumpEventTxFatal checks that `handleBumpEventTxFatal` correctly
// handles a `TxFatal` event.
func TestHandleBumpEventTxFatal(t *testing.T) {
t.Parallel()
rt := require.New(t)
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set. We are not testing `markInputFailed` here,
// so the actual set doesn't matter.
set := &MockInputSet{}
defer set.AssertExpectations(t)
set.On("Inputs").Return(nil)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a dummy tx.
tx := &wire.MsgTx{
LockTime: 1,
}
// Create a testing bump response.
result := &BumpResult{
Err: errDummy,
Tx: tx,
}
resp := &bumpResp{
result: result,
set: set,
}
// Mock the store to return an error.
store.On("DeleteTx", mock.Anything).Return(errDummy).Once()
// Call the method under test and assert the error is returned.
err := s.handleBumpEventTxFatal(resp)
rt.ErrorIs(err, errDummy)
// Mock the store to return nil.
store.On("DeleteTx", mock.Anything).Return(nil).Once()
// Call the method under test and assert no error is returned.
err = s.handleBumpEventTxFatal(resp)
rt.NoError(err)
}