diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 8792bbdb1..1bccbce2d 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -37,6 +37,10 @@ var ( // ErrTxNoOutput is returned when an output cannot be created during tx // preparation, usually due to the output being dust. ErrTxNoOutput = errors.New("tx has no output") + + // ErrThirdPartySpent is returned when a third party has spent the + // input in the sweeping tx. + ErrThirdPartySpent = errors.New("third party spent the output") ) // Bumper defines an interface that can be used by other subsystems for fee @@ -290,6 +294,11 @@ func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher { } } +// isNeutrinoBackend checks if the wallet backend is neutrino. +func (t *TxPublisher) isNeutrinoBackend() bool { + return t.cfg.Wallet.BackEnd() == "neutrino" +} + // Broadcast is used to publish the tx created from the given inputs. It will, // 1. init a fee function based on the given strategy. // 2. create an RBF-compliant tx and monitor it for confirmation. @@ -724,6 +733,12 @@ func (t *TxPublisher) processRecords() { // feeBumpRecords stores a map of the records which need to be bumped. feeBumpRecords := make(map[uint64]*monitorRecord) + // failedRecords stores a map of the records which has inputs being + // spent by a third party. + // + // NOTE: this is only used for neutrino backend. + failedRecords := make(map[uint64]*monitorRecord) + // visitor is a helper closure that visits each record and divides them // into two groups. visitor := func(requestID uint64, r *monitorRecord) error { @@ -738,6 +753,16 @@ func (t *TxPublisher) processRecords() { return nil } + // Check whether the inputs has been spent by a third party. + // + // NOTE: this check is only done for neutrino backend. + if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) { + failedRecords[requestID] = r + + // Move to the next record. + return nil + } + feeBumpRecords[requestID] = r // Return nil to move to the next record. @@ -768,6 +793,17 @@ func (t *TxPublisher) processRecords() { t.wg.Add(1) go t.handleFeeBumpTx(requestID, rec, currentHeight) } + + // For records that are failed, we'll notify the caller about this + // result. + for requestID, r := range failedRecords { + rec := r + + log.Debugf("Tx=%v has inputs been spent by a third party, "+ + "failing it now", r.tx.TxHash()) + t.wg.Add(1) + go t.handleThirdPartySpent(rec, requestID) + } } // handleTxConfirmed is called when a monitored tx is confirmed. It will @@ -837,6 +873,33 @@ func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord, }) } +// handleThirdPartySpent is called when the inputs in an unconfirmed tx is +// spent. It will notify the subscriber then remove the record from the maps +// and send a TxFailed event to the subscriber. +// +// NOTE: Must be run as a goroutine to avoid blocking on sending the result. +func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord, + requestID uint64) { + + defer t.wg.Done() + + // Create a result that will be sent to the resultChan which is + // listened by the caller. + // + // TODO(yy): create a new state `TxThirdPartySpent` to notify the + // sweeper to remove the input, hence moving the monitoring of inputs + // spent inside the fee bumper. + result := &BumpResult{ + Event: TxFailed, + Tx: r.tx, + requestID: requestID, + Err: ErrThirdPartySpent, + } + + // Notify that this tx is confirmed and remove the record from the map. + t.handleResult(result) +} + // createAndPublishTx creates a new tx with a higher fee rate and publishes it // to the network. It will update the record with the new tx and fee rate if // successfully created, and return the result when published successfully. @@ -953,6 +1016,66 @@ func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool { return details.NumConfirmations > 0 } +// isThirdPartySpent checks whether the inputs of the tx has already been spent +// by a third party. When a tx is not confirmed, yet its inputs has been spent, +// then it must be spent by a different tx other than the sweeping tx here. +// +// NOTE: this check is only performed for neutrino backend as it has no +// reliable way to tell a tx has been replaced. +func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash, + inputs []input.Input) bool { + + // Skip this check for if this is not neutrino backend. + if !t.isNeutrinoBackend() { + return false + } + + // Iterate all the inputs and check if they have been spent already. + for _, inp := range inputs { + op := inp.OutPoint() + + // If the input has already been spent after the height hint, a + // spend event is sent back immediately. + spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn( + &op, inp.SignDesc().Output.PkScript, inp.HeightHint(), + ) + if err != nil { + log.Criticalf("Failed to register spend ntfn: %v", err) + return false + } + + // Remove the subscription when exit. + defer spendEvent.Cancel() + + // Do a non-blocking read to see if the output has been spent. + select { + case spend, ok := <-spendEvent.Spend: + if !ok { + log.Debugf("Spend ntfn for %v canceled", op) + return false + } + + spendingTxID := spend.SpendingTx.TxHash() + + // If the spending tx is the same as the sweeping tx + // then we are good. + if spendingTxID == txid { + continue + } + + log.Warnf("Detected third party spent of output=%v "+ + "in tx=%v", op, spend.SpendingTx.TxHash()) + + return true + + // Move to the next input. + default: + } + } + + return false +} + // calcCurrentConfTarget calculates the current confirmation target based on // the deadline height. The conf target is capped at 0 if the deadline has // already been past. diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 38036934c..e2b2cfe14 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -1358,6 +1358,7 @@ func TestProcessRecords(t *testing.T) { NumConfirmations: 0, }, nil, ).Once() + m.wallet.On("BackEnd").Return("test-backend").Once() // Setup the initial publisher state by adding the records to the maps. subscriberConfirmed := make(chan *BumpResult, 1) diff --git a/sweep/interface.go b/sweep/interface.go index a6e5d2153..4b02f143c 100644 --- a/sweep/interface.go +++ b/sweep/interface.go @@ -51,4 +51,9 @@ type Wallet interface { // its transaction hash. GetTransactionDetails(txHash *chainhash.Hash) ( *lnwallet.TransactionDetail, error) + + // BackEnd returns a name for the wallet's backing chain service, + // which could be e.g. btcd, bitcoind, neutrino, or another consensus + // service. + BackEnd() string } diff --git a/sweep/mock_test.go b/sweep/mock_test.go index 146f0fb95..94b251eef 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -45,6 +45,10 @@ func newMockBackend(t *testing.T, notifier *MockNotifier) *mockBackend { } } +func (b *mockBackend) BackEnd() string { + return "mockbackend" +} + func (b *mockBackend) CheckMempoolAcceptance(tx *wire.MsgTx) error { return nil } @@ -357,6 +361,14 @@ type MockWallet struct { // Compile-time constraint to ensure MockWallet implements Wallet. var _ Wallet = (*MockWallet)(nil) +// BackEnd returns a name for the wallet's backing chain service, which could +// be e.g. btcd, bitcoind, neutrino, or another consensus service. +func (m *MockWallet) BackEnd() string { + args := m.Called() + + return args.String(0) +} + // CheckMempoolAcceptance checks if the transaction can be accepted to the // mempool. func (m *MockWallet) CheckMempoolAcceptance(tx *wire.MsgTx) error {