sweep: refactor attachAvailableRBFInfo to decideStateAndRBFInfo

Thus this method `decideStateAndRBFInfo` won't touch the state changes
of a given input.
This commit is contained in:
yyforyongyu 2024-03-20 07:31:41 +08:00
parent 6f5b7a9fd3
commit df4e51e2e0
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 53 additions and 58 deletions

View File

@ -1365,20 +1365,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
return return
} }
// This is a new input, and we want to query the mempool to see if this
// input has already been spent. If so, we'll start the input with
// state Published and attach the RBFInfo.
state, rbfInfo := s.decideStateAndRBFInfo(*input.input.OutPoint())
// Create a new pendingInput and initialize the listeners slice with // Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep // the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice. // again, the result channel will be appended to this slice.
pi = &pendingInput{ pi = &pendingInput{
state: StateInit, state: state,
listeners: []chan Result{input.resultChan}, listeners: []chan Result{input.resultChan},
Input: input.input, Input: input.input,
params: input.params, params: input.params,
rbf: rbfInfo,
} }
// Try to find fee info for possible RBF if this input has already been
// spent.
pi = s.attachAvailableRBFInfo(pi)
s.pendingInputs[outpoint] = pi s.pendingInputs[outpoint] = pi
log.Tracef("input %v, state=%v, added to pendingInputs", outpoint, log.Tracef("input %v, state=%v, added to pendingInputs", outpoint,
pi.state) pi.state)
@ -1399,12 +1401,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
pi.ntfnRegCancel = cancel pi.ntfnRegCancel = cancel
} }
// attachAvailableRBFInfo queries the mempool to see whether the given input // decideStateAndRBFInfo queries the mempool to see whether the given input has
// has already been spent. If so, it will query the sweeper store to fetch the // already been spent. If so, the state Published will be returned, otherwise
// fee info of the spending transction, hence preparing for possible RBF. // state Init. When spent, it will query the sweeper store to fetch the fee
func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput { // info of the spending transction, and construct an RBFInfo based on it.
// Suppose an error occurs, fn.None is returned.
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
SweepState, fn.Option[RBFInfo]) {
// Check if we can find the spending tx of this input in mempool. // Check if we can find the spending tx of this input in mempool.
txOption := s.mempoolLookup(*pi.OutPoint()) txOption := s.mempoolLookup(op)
// Extract the spending tx from the option.
var tx *wire.MsgTx
txOption.WhenSome(func(t wire.MsgTx) {
tx = &t
})
// Exit early if it's not found. // Exit early if it's not found.
// //
@ -1412,18 +1424,13 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput {
// lookup: // lookup:
// - for neutrino we don't have a mempool. // - for neutrino we don't have a mempool.
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`. // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
if txOption.IsNone() { if tx == nil {
return pi return StateInit, fn.None[RBFInfo]()
} }
// NOTE: we use UnsafeFromSome for here because we are sure this option // Otherwise the input is already spent in the mempool, so eventually
// is NOT none. // we will return StatePublished.
tx := txOption.UnsafeFromSome() //
// Otherwise the input is already spent in the mempool, update its
// state to StatePublished.
pi.state = StatePublished
// We also need to update the RBF info for this input. If the sweeping // We also need to update the RBF info for this input. If the sweeping
// transaction is broadcast by us, we can find the fee info in the // transaction is broadcast by us, we can find the fee info in the
// sweeper store. // sweeper store.
@ -1436,7 +1443,7 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput {
// pendingInputs. // pendingInputs.
if errors.Is(err, ErrTxNotFound) { if errors.Is(err, ErrTxNotFound) {
log.Warnf("Spending tx %v not found in sweeper store", txid) log.Warnf("Spending tx %v not found in sweeper store", txid)
return pi return StatePublished, fn.None[RBFInfo]()
} }
// Exit if we get an db error. // Exit if we get an db error.
@ -1444,17 +1451,17 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput {
log.Errorf("Unable to get tx %v from sweeper store: %v", log.Errorf("Unable to get tx %v from sweeper store: %v",
txid, err) txid, err)
return pi return StatePublished, fn.None[RBFInfo]()
} }
// Attach the fee info and return it. // Prepare the fee info and return it.
pi.rbf = fn.Some(RBFInfo{ rbf := fn.Some(RBFInfo{
Txid: txid, Txid: txid,
Fee: btcutil.Amount(tr.Fee), Fee: btcutil.Amount(tr.Fee),
FeeRate: chainfee.SatPerKWeight(tr.FeeRate), FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
}) })
return pi return StatePublished, rbf
} }
// handleExistingInput processes an input that is already known to the sweeper. // handleExistingInput processes an input that is already known to the sweeper.

View File

@ -2399,10 +2399,10 @@ func TestUpdateSweeperInputs(t *testing.T) {
require.Equal(expectedInputs, s.pendingInputs) require.Equal(expectedInputs, s.pendingInputs)
} }
// TestAttachAvailableRBFInfo checks that the RBF info is attached to the // TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
// pending input, along with the state being marked as published, when this // returned based on whether this input can be found both in mempool and the
// input can be found both in mempool and the sweeper store. // sweeper store.
func TestAttachAvailableRBFInfo(t *testing.T) { func TestDecideStateAndRBFInfo(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
@ -2410,16 +2410,6 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
// Create a test outpoint. // Create a test outpoint.
op := wire.OutPoint{Index: 1} op := wire.OutPoint{Index: 1}
// Create a mock input.
testInput := &input.MockInput{}
defer testInput.AssertExpectations(t)
testInput.On("OutPoint").Return(&op)
pi := &pendingInput{
Input: testInput,
state: StateInit,
}
// Create a mock mempool watcher and a mock sweeper store. // Create a mock mempool watcher and a mock sweeper store.
mockMempool := chainntnfs.NewMockMempoolWatcher() mockMempool := chainntnfs.NewMockMempoolWatcher()
defer mockMempool.AssertExpectations(t) defer mockMempool.AssertExpectations(t)
@ -2436,11 +2426,11 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
mockMempool.On("LookupInputMempoolSpend", op).Return( mockMempool.On("LookupInputMempoolSpend", op).Return(
fn.None[wire.MsgTx]()).Once() fn.None[wire.MsgTx]()).Once()
// Since the mempool lookup failed, we exepect the original pending // Since the mempool lookup failed, we exepect state Init and no
// input to stay unchanged. // RBFInfo.
result := s.attachAvailableRBFInfo(pi) state, rbf := s.decideStateAndRBFInfo(op)
require.True(result.rbf.IsNone()) require.True(rbf.IsNone())
require.Equal(StateInit, result.state) require.Equal(StateInit, state)
// Mock the mempool lookup to return a tx three times as we are calling // Mock the mempool lookup to return a tx three times as we are calling
// attachAvailableRBFInfo three times. // attachAvailableRBFInfo three times.
@ -2451,21 +2441,19 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
// Mock the store to return an error saying the tx cannot be found. // Mock the store to return an error saying the tx cannot be found.
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once() mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
// Although the db lookup failed, the pending input should have been // Although the db lookup failed, we expect the state to be Published.
// marked as published without attaching any RBF info. state, rbf = s.decideStateAndRBFInfo(op)
result = s.attachAvailableRBFInfo(pi) require.True(rbf.IsNone())
require.True(result.rbf.IsNone()) require.Equal(StatePublished, state)
require.Equal(StatePublished, result.state)
// Mock the store to return a db error. // Mock the store to return a db error.
dummyErr := errors.New("dummy error") dummyErr := errors.New("dummy error")
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
// Although the db lookup failed, the pending input should have been // Although the db lookup failed, we expect the state to be Published.
// marked as published without attaching any RBF info. state, rbf = s.decideStateAndRBFInfo(op)
result = s.attachAvailableRBFInfo(pi) require.True(rbf.IsNone())
require.True(result.rbf.IsNone()) require.Equal(StatePublished, state)
require.Equal(StatePublished, result.state)
// Mock the store to return a record. // Mock the store to return a record.
tr := &TxRecord{ tr := &TxRecord{
@ -2475,18 +2463,18 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once() mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
// Call the method again. // Call the method again.
result = s.attachAvailableRBFInfo(pi) state, rbf = s.decideStateAndRBFInfo(op)
// Assert that the RBF info is attached to the pending input. // Assert that the RBF info is returned.
rbfInfo := fn.Some(RBFInfo{ rbfInfo := fn.Some(RBFInfo{
Txid: tx.TxHash(), Txid: tx.TxHash(),
Fee: btcutil.Amount(tr.Fee), Fee: btcutil.Amount(tr.Fee),
FeeRate: chainfee.SatPerKWeight(tr.FeeRate), FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
}) })
require.Equal(rbfInfo, result.rbf) require.Equal(rbfInfo, rbf)
// Assert the state is updated. // Assert the state is updated.
require.Equal(StatePublished, result.state) require.Equal(StatePublished, state)
} }
// TestMarkInputFailed checks that the input is marked as failed as expected. // TestMarkInputFailed checks that the input is marked as failed as expected.