diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 4df08a8a0..c3ce504ec 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1365,20 +1365,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) { return } + // This is a new input, and we want to query the mempool to see if this + // input has already been spent. If so, we'll start the input with + // state Published and attach the RBFInfo. + state, rbfInfo := s.decideStateAndRBFInfo(*input.input.OutPoint()) + // Create a new pendingInput and initialize the listeners slice with // the passed in result channel. If this input is offered for sweep // again, the result channel will be appended to this slice. pi = &pendingInput{ - state: StateInit, + state: state, listeners: []chan Result{input.resultChan}, Input: input.input, params: input.params, + rbf: rbfInfo, } - // Try to find fee info for possible RBF if this input has already been - // spent. - pi = s.attachAvailableRBFInfo(pi) - s.pendingInputs[outpoint] = pi log.Tracef("input %v, state=%v, added to pendingInputs", outpoint, pi.state) @@ -1399,12 +1401,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) { pi.ntfnRegCancel = cancel } -// attachAvailableRBFInfo queries the mempool to see whether the given input -// has already been spent. If so, it will query the sweeper store to fetch the -// fee info of the spending transction, hence preparing for possible RBF. -func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { +// decideStateAndRBFInfo queries the mempool to see whether the given input has +// already been spent. If so, the state Published will be returned, otherwise +// state Init. When spent, it will query the sweeper store to fetch the fee +// info of the spending transction, and construct an RBFInfo based on it. +// Suppose an error occurs, fn.None is returned. +func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) ( + SweepState, fn.Option[RBFInfo]) { + // Check if we can find the spending tx of this input in mempool. - txOption := s.mempoolLookup(*pi.OutPoint()) + txOption := s.mempoolLookup(op) + + // Extract the spending tx from the option. + var tx *wire.MsgTx + txOption.WhenSome(func(t wire.MsgTx) { + tx = &t + }) // Exit early if it's not found. // @@ -1412,18 +1424,13 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { // lookup: // - for neutrino we don't have a mempool. // - for btcd below v0.24.1 we don't have `gettxspendingprevout`. - if txOption.IsNone() { - return pi + if tx == nil { + return StateInit, fn.None[RBFInfo]() } - // NOTE: we use UnsafeFromSome for here because we are sure this option - // is NOT none. - tx := txOption.UnsafeFromSome() - - // Otherwise the input is already spent in the mempool, update its - // state to StatePublished. - pi.state = StatePublished - + // Otherwise the input is already spent in the mempool, so eventually + // we will return StatePublished. + // // We also need to update the RBF info for this input. If the sweeping // transaction is broadcast by us, we can find the fee info in the // sweeper store. @@ -1436,7 +1443,7 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { // pendingInputs. if errors.Is(err, ErrTxNotFound) { log.Warnf("Spending tx %v not found in sweeper store", txid) - return pi + return StatePublished, fn.None[RBFInfo]() } // Exit if we get an db error. @@ -1444,17 +1451,17 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { log.Errorf("Unable to get tx %v from sweeper store: %v", txid, err) - return pi + return StatePublished, fn.None[RBFInfo]() } - // Attach the fee info and return it. - pi.rbf = fn.Some(RBFInfo{ + // Prepare the fee info and return it. + rbf := fn.Some(RBFInfo{ Txid: txid, Fee: btcutil.Amount(tr.Fee), FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) - return pi + return StatePublished, rbf } // handleExistingInput processes an input that is already known to the sweeper. diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index a4ef79695..0168d9f08 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2399,10 +2399,10 @@ func TestUpdateSweeperInputs(t *testing.T) { require.Equal(expectedInputs, s.pendingInputs) } -// TestAttachAvailableRBFInfo checks that the RBF info is attached to the -// pending input, along with the state being marked as published, when this -// input can be found both in mempool and the sweeper store. -func TestAttachAvailableRBFInfo(t *testing.T) { +// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are +// returned based on whether this input can be found both in mempool and the +// sweeper store. +func TestDecideStateAndRBFInfo(t *testing.T) { t.Parallel() require := require.New(t) @@ -2410,16 +2410,6 @@ func TestAttachAvailableRBFInfo(t *testing.T) { // Create a test outpoint. op := wire.OutPoint{Index: 1} - // Create a mock input. - testInput := &input.MockInput{} - defer testInput.AssertExpectations(t) - - testInput.On("OutPoint").Return(&op) - pi := &pendingInput{ - Input: testInput, - state: StateInit, - } - // Create a mock mempool watcher and a mock sweeper store. mockMempool := chainntnfs.NewMockMempoolWatcher() defer mockMempool.AssertExpectations(t) @@ -2436,11 +2426,11 @@ func TestAttachAvailableRBFInfo(t *testing.T) { mockMempool.On("LookupInputMempoolSpend", op).Return( fn.None[wire.MsgTx]()).Once() - // Since the mempool lookup failed, we exepect the original pending - // input to stay unchanged. - result := s.attachAvailableRBFInfo(pi) - require.True(result.rbf.IsNone()) - require.Equal(StateInit, result.state) + // Since the mempool lookup failed, we exepect state Init and no + // RBFInfo. + state, rbf := s.decideStateAndRBFInfo(op) + require.True(rbf.IsNone()) + require.Equal(StateInit, state) // Mock the mempool lookup to return a tx three times as we are calling // attachAvailableRBFInfo three times. @@ -2451,21 +2441,19 @@ func TestAttachAvailableRBFInfo(t *testing.T) { // Mock the store to return an error saying the tx cannot be found. mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once() - // Although the db lookup failed, the pending input should have been - // marked as published without attaching any RBF info. - result = s.attachAvailableRBFInfo(pi) - require.True(result.rbf.IsNone()) - require.Equal(StatePublished, result.state) + // Although the db lookup failed, we expect the state to be Published. + state, rbf = s.decideStateAndRBFInfo(op) + require.True(rbf.IsNone()) + require.Equal(StatePublished, state) // Mock the store to return a db error. dummyErr := errors.New("dummy error") mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() - // Although the db lookup failed, the pending input should have been - // marked as published without attaching any RBF info. - result = s.attachAvailableRBFInfo(pi) - require.True(result.rbf.IsNone()) - require.Equal(StatePublished, result.state) + // Although the db lookup failed, we expect the state to be Published. + state, rbf = s.decideStateAndRBFInfo(op) + require.True(rbf.IsNone()) + require.Equal(StatePublished, state) // Mock the store to return a record. tr := &TxRecord{ @@ -2475,18 +2463,18 @@ func TestAttachAvailableRBFInfo(t *testing.T) { mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once() // Call the method again. - result = s.attachAvailableRBFInfo(pi) + state, rbf = s.decideStateAndRBFInfo(op) - // Assert that the RBF info is attached to the pending input. + // Assert that the RBF info is returned. rbfInfo := fn.Some(RBFInfo{ Txid: tx.TxHash(), Fee: btcutil.Amount(tr.Fee), FeeRate: chainfee.SatPerKWeight(tr.FeeRate), }) - require.Equal(rbfInfo, result.rbf) + require.Equal(rbfInfo, rbf) // Assert the state is updated. - require.Equal(StatePublished, result.state) + require.Equal(StatePublished, state) } // TestMarkInputFailed checks that the input is marked as failed as expected.