diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index fa6812d4e..7072808f6 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -12,7 +12,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/chain" - "github.com/btcsuite/btcwallet/wtxmgr" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/queue" ) @@ -144,6 +143,7 @@ func (b *BitcoindNotifier) Start() error { b.txNotifier = chainntnfs.NewTxNotifier( uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + b.spendHintCache, ) b.bestBlock = chainntnfs.BlockEpoch{ @@ -259,6 +259,8 @@ out: // included in the active chain. We'll do this // in a goroutine to prevent blocking // potentially long rescans. + // + // TODO(wilmer): add retry logic if rescan fails? b.wg.Add(1) go func() { defer b.wg.Done() @@ -286,6 +288,25 @@ out: } }() + case *chainntnfs.HistoricalSpendDispatch: + // In order to ensure we don't block the caller + // on what may be a long rescan, we'll launch a + // goroutine to do so in the background. + // + // TODO(wilmer): add retry logic if rescan fails? + b.wg.Add(1) + go func() { + defer b.wg.Done() + + err := b.dispatchSpendDetailsManually(msg) + if err != nil { + chainntnfs.Log.Errorf("Rescan to "+ + "determine the spend "+ + "details of %v failed: %v", + msg.OutPoint, err) + } + }() + case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -383,7 +404,23 @@ out: b.bestBlock = newBestBlock case chain.RelevantTx: - b.handleRelevantTx(item, b.bestBlock.Height) + // We only care about notifying on confirmed + // spends, so if this is a mempool spend, we can + // ignore it and wait for the spend to appear in + // on-chain. + if item.Block == nil { + continue + } + + tx := &item.TxRecord.MsgTx + err := b.txNotifier.ProcessRelevantSpendTx( + tx, item.Block.Height, + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to "+ + "process transaction %v: %v", + tx.TxHash(), err) + } } case <-b.quit: @@ -393,55 +430,6 @@ out: b.wg.Done() } -// handleRelevantTx notifies any clients of a relevant transaction. -func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int32) { - msgTx := tx.TxRecord.MsgTx - - // We only care about notifying on confirmed spends, so in case this is - // a mempool spend, we can continue, and wait for the spend to appear - // in chain. - if tx.Block == nil { - return - } - - // First, check if this transaction spends an output - // that has an existing spend notification for it. - for i, txIn := range msgTx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an - // output which we have a registered - // notification for, then create a spend - // summary, finally sending off the details to - // the notification subscriber. - if clients, ok := b.spendNotifications[prevOut]; ok { - spenderSha := msgTx.TxHash() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &spenderSha, - SpendingTx: &msgTx, - SpenderInputIndex: uint32(i), - } - spendDetails.SpendingHeight = tx.Block.Height - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching confirmed "+ - "spend notification for outpoint=%v "+ - "at height %v", ntfn.targetOutpoint, - spendDetails.SpendingHeight) - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } - delete(b.spendNotifications, prevOut) - } - } -} - // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, @@ -717,167 +705,120 @@ type spendCancel struct { func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, outpoint) - heightHint = hint - } + // First, we'll construct a spend notification request and hand it off + // to the txNotifier. + spendID := atomic.AddUint64(&b.spendClientCounter, 1) + cancel := func() { + b.txNotifier.CancelSpend(*outpoint, spendID) } - // Construct a notification request for the outpoint and send it to the - // main event loop. - ntfn := &spendNotification{ - targetOutpoint: outpoint, - spendChan: make(chan *chainntnfs.SpendDetail, 1), - spendID: atomic.AddUint64(&b.spendClientCounter, 1), + ntfn := &chainntnfs.SpendNtfn{ + SpendID: spendID, + OutPoint: *outpoint, + PkScript: pkScript, + Event: chainntnfs.NewSpendEvent(cancel), + HeightHint: heightHint, } - select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown - case b.notificationRegistry <- ntfn: - } - - if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { - return nil, err - } - - // The following conditional checks to ensure that when a spend - // notification is registered, the output hasn't already been spent. If - // the output is no longer in the UTXO set, the chain will be rescanned - // from the point where the output was added. The rescan will dispatch - // the notification. - txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) + historicalDispatch, err := b.txNotifier.RegisterSpend(ntfn) if err != nil { return nil, err } - // If the output is unspent, then we'll write it to the cache with the - // given height hint. This allows us to increase the height hint as the - // chain extends and the output remains unspent. + // If the txNotifier didn't return any details to perform a historical + // scan of the chain, then we can return early as there's nothing left + // for us to do. + if historicalDispatch == nil { + return ntfn.Event, nil + } + + // We'll then request the backend to notify us when it has detected the + // outpoint as spent. + if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { + return nil, err + } + + // In addition to the check above, we'll also check the backend's UTXO + // set to determine whether the outpoint has been spent. If it hasn't, + // we can return to the caller as well. + txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) + if err != nil { + return nil, err + } if txOut != nil { - err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + // We'll let the txNotifier know the outpoint is still unspent + // in order to begin updating its spend hint. + err := b.txNotifier.UpdateSpendDetails(*outpoint, nil) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. - chainntnfs.Log.Error("Unable to update spend hint to "+ - "%d for %v: %v", heightHint, *outpoint, err) - } - } else { - // Otherwise, we'll determine when the output was spent. - // - // First, we'll attempt to retrieve the transaction's block hash - // using the backend's transaction index. - tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) - if err != nil { - // Avoid returning an error if the transaction was not - // found to proceed with fallback methods. - jsonErr, ok := err.(*btcjson.RPCError) - if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { - return nil, fmt.Errorf("unable to query for "+ - "txid %v: %v", outpoint.Hash, err) - } + return nil, err } - var blockHash *chainhash.Hash - if tx != nil && tx.BlockHash != "" { - // If we're able to retrieve a valid block hash from the - // transaction, then we'll use it as our rescan starting - // point. - blockHash, err = chainhash.NewHashFromStr(tx.BlockHash) - if err != nil { - return nil, err - } - } else { - // Otherwise, we'll attempt to retrieve the hash for the - // block at the heightHint. - blockHash, err = b.chainConn.GetBlockHash( - int64(heightHint), - ) - if err != nil { - return nil, fmt.Errorf("unable to retrieve "+ - "hash for block with height %d: %v", - heightHint, err) - } - } + return ntfn.Event, nil + } - // We'll only scan old blocks if the transaction has actually - // been included within a block. Otherwise, we'll encounter an - // error when scanning for blocks. This can happens in the case - // of a race condition, wherein the output itself is unspent, - // and only arrives in the mempool after the getxout call. - if blockHash != nil { - // Rescan all the blocks until the current one. - startHeight, err := b.chainConn.GetBlockHeight( - blockHash, - ) - if err != nil { - return nil, err - } - - _, endHeight, err := b.chainConn.GetBestBlock() - if err != nil { - return nil, err - } - - // In order to ensure we don't block the caller on what - // may be a long rescan, we'll launch a goroutine to do - // so in the background. - b.wg.Add(1) - go func() { - defer b.wg.Done() - - err := b.dispatchSpendDetailsManually( - *outpoint, startHeight, endHeight, - ) - if err != nil { - chainntnfs.Log.Errorf("Rescan for spend "+ - "notification txout(%x) "+ - "failed: %v", outpoint, err) - } - }() + // Otherwise, we'll determine when the output was spent by scanning the + // chain. We'll begin by determining where to start our historical + // rescan. + // + // As a minimal optimization, we'll query the backend's transaction + // index (if enabled) to determine if we have a better rescan starting + // height. We can do this as the GetRawTransaction call will return the + // hash of the block it was included in within the chain. + tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) + if err != nil { + // Avoid returning an error if the transaction was not found to + // proceed with fallback methods. + jsonErr, ok := err.(*btcjson.RPCError) + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { + return nil, fmt.Errorf("unable to query for "+ + "txid %v: %v", outpoint.Hash, err) } } - return &chainntnfs.SpendEvent{ - Spend: ntfn.spendChan, - Cancel: func() { - cancel := &spendCancel{ - op: *outpoint, - spendID: ntfn.spendID, - } + // If the transaction index was enabled, we'll use the block's hash to + // retrieve its height and check whether it provides a better starting + // point for our rescan. + if tx != nil { + // If the transaction containing the outpoint hasn't confirmed + // on-chain, then there's no need to perform a rescan. + if tx.BlockHash == "" { + return ntfn.Event, nil + } - // Submit spend cancellation to notification dispatcher. - select { - case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the - // spend chan until it is closed before yielding - // to the caller. - for { - select { - case _, ok := <-ntfn.spendChan: - if !ok { - return - } - case <-b.quit: - return - } - } - case <-b.quit: - } - }, - }, nil + blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) + if err != nil { + return nil, err + } + blockHeight, err := b.chainConn.GetBlockHeight(blockHash) + if err != nil { + return nil, err + } + + if uint32(blockHeight) > historicalDispatch.StartHeight { + historicalDispatch.StartHeight = uint32(blockHeight) + } + } + + // Now that we've determined the starting point of our rescan, we can + // dispatch it. + select { + case b.notificationRegistry <- historicalDispatch: + return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + } } // disaptchSpendDetailsManually attempts to manually scan the chain within the // given height range for a transaction that spends the given outpoint. If one // is found, it's spending details are sent to the notifier dispatcher, which // will then dispatch the notification to all of its clients. -func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, - startHeight, endHeight int32) error { +func (b *BitcoindNotifier) dispatchSpendDetailsManually( + historicalDispatchDetails *chainntnfs.HistoricalSpendDispatch) error { + + op := historicalDispatchDetails.OutPoint + startHeight := historicalDispatchDetails.StartHeight + endHeight := historicalDispatchDetails.EndHeight // Begin scanning blocks at every height to determine if the outpoint // was spent. @@ -890,6 +831,7 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, default: } + // First, we'll fetch the block for the current height. blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return fmt.Errorf("unable to retrieve hash for block "+ @@ -901,38 +843,30 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, "%v: %v", blockHash, err) } + // Then, we'll manually go over every transaction in it and + // determine whether it spends the outpoint in question. for _, tx := range block.Transactions { - for _, in := range tx.TxIn { - if in.PreviousOutPoint != op { + for i, txIn := range tx.TxIn { + if txIn.PreviousOutPoint != op { continue } - // If this transaction input spends the - // outpoint, we'll gather the details of the - // spending transaction and dispatch a spend - // notification to our clients. - relTx := chain.RelevantTx{ - TxRecord: &wtxmgr.TxRecord{ - MsgTx: *tx, - Hash: tx.TxHash(), - Received: block.Header.Timestamp, - }, - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: *blockHash, - Height: height, - }, - Time: block.Header.Timestamp, - }, + // If it does, we'll construct its spend details + // and hand them over to the TxNotifier so that + // it can properly notify its registered + // clients. + txHash := tx.TxHash() + details := &chainntnfs.SpendDetail{ + SpentOutPoint: &op, + SpenderTxHash: &txHash, + SpendingTx: tx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(height), } - select { - case b.notificationRegistry <- relTx: - case <-b.quit: - return ErrChainNotifierShuttingDown - } - - return nil + return b.txNotifier.UpdateSpendDetails( + op, details, + ) } } } diff --git a/chainntnfs/bitcoindnotify/bitcoind_dev.go b/chainntnfs/bitcoindnotify/bitcoind_dev.go index 8189ecf4f..ceeeb8abd 100644 --- a/chainntnfs/bitcoindnotify/bitcoind_dev.go +++ b/chainntnfs/bitcoindnotify/bitcoind_dev.go @@ -31,6 +31,7 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has b.txNotifier = chainntnfs.NewTxNotifier( uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + b.spendHintCache, ) if generateBlocks != nil {