diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 227d9af4d..029d37570 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -62,8 +62,9 @@ type NeutrinoNotifier struct { rescanErr <-chan error - chainUpdates *queue.ConcurrentQueue - txUpdates *queue.ConcurrentQueue + chainUpdates chan *filteredBlock + + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), - chainUpdates: queue.NewConcurrentQueue(10), - txUpdates: queue.NewConcurrentQueue(10), + chainUpdates: make(chan *filteredBlock, 100), + + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.chainUpdates.Stop() n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related @@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. - n.chainUpdates.Start() n.txUpdates.Start() // First, we'll obtain the latest block height of the p2p node. We'll @@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error { startingPoint, err := n.p2pNode.BestBlock() if err != nil { n.txUpdates.Stop() - n.chainUpdates.Stop() return err } startingHeader, err := n.p2pNode.GetBlockHeader( @@ -262,7 +261,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, @@ -281,7 +280,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, @@ -306,11 +305,96 @@ func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDe } } +// connectFilteredBlock is called when we receive a filteredBlock from the +// backend. If the block is ahead of what we're expecting, we'll attempt to +// catch up and then process the block. +func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) { + n.bestBlockMtx.Lock() + defer n.bestBlockMtx.Unlock() + + if update.height != uint32(n.bestBlock.Height+1) { + chainntnfs.Log.Infof("Missed blocks, attempting to catch up") + + _, missedBlocks, err := chainntnfs.HandleMissedBlocks( + n.chainConn, n.txNotifier, n.bestBlock, + int32(update.height), false, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + for _, block := range missedBlocks { + filteredBlock, err := n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + return + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + return + } + } + } + + err := n.handleBlockConnected(update) + if err != nil { + chainntnfs.Log.Error(err) + } +} + +// disconnectFilteredBlock is called when our disconnected filtered block +// callback is fired. It attempts to rewind the chain to before the +// disconnection and updates our best block. +func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) { + n.bestBlockMtx.Lock() + defer n.bestBlockMtx.Unlock() + + if update.height != uint32(n.bestBlock.Height) { + chainntnfs.Log.Infof("Missed disconnected blocks, attempting" + + " to catch up") + } + newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier, + n.bestBlock, int32(update.height-1), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+ + "to height %d: %v", n.bestBlock.Height, + update.height-1, err, + ) + } + + n.bestBlock = newBestBlock +} + +// drainChainUpdates is called after updating the filter. It reads every +// buffered item off the chan and returns when no more are available. It is +// used to ensure that callers performing a historical scan properly update +// their EndHeight to scan blocks that did not have the filter applied at +// processing time. Without this, a race condition exists that could allow a +// spend or confirmation notification to be missed. It is unlikely this would +// occur in a real-world scenario, and instead would manifest itself in tests. +func (n *NeutrinoNotifier) drainChainUpdates() { + for { + select { + case update := <-n.chainUpdates: + if update.connect { + n.connectFilteredBlock(update) + break + } + n.disconnectFilteredBlock(update) + default: + return + } + } +} + // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() -out: + for { select { case cancelMsg := <-n.notificationCancels: @@ -424,88 +508,28 @@ out: chainntnfs.Log.Errorf("Unable to "+ "update rescan filter: %v", err) } + + // Drain the chainUpdates chan so the caller + // listening on errChan can be sure that + // updates after receiving the error will have + // the filter applied. This allows the caller + // to update their EndHeight if they're + // performing a historical scan. + n.drainChainUpdates() + + // After draining, send the error to the + // caller. msg.errChan <- err } - case item := <-n.chainUpdates.ChanOut(): - update := item.(*filteredBlock) + case item := <-n.chainUpdates: + update := item if update.connect { - n.bestBlockMtx.Lock() - // Since neutrino has no way of knowing what - // height to rewind to in the case of a reorged - // best known height, there is no point in - // checking that the previous hash matches the - // the hash from our best known height the way - // the other notifiers do when they receive - // a new connected block. Therefore, we just - // compare the heights. - if update.height != uint32(n.bestBlock.Height+1) { - // Handle the case where the notifier - // missed some blocks from its chain - // backend - chainntnfs.Log.Infof("Missed blocks, " + - "attempting to catch up") - - _, missedBlocks, err := - chainntnfs.HandleMissedBlocks( - n.chainConn, - n.txNotifier, - n.bestBlock, - int32(update.height), - false, - ) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue - } - - for _, block := range missedBlocks { - filteredBlock, err := - n.getFilteredBlock(block) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue out - } - err = n.handleBlockConnected(filteredBlock) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue out - } - } - - } - - err := n.handleBlockConnected(update) - if err != nil { - chainntnfs.Log.Error(err) - } - - n.bestBlockMtx.Unlock() + n.connectFilteredBlock(update) continue } - n.bestBlockMtx.Lock() - if update.height != uint32(n.bestBlock.Height) { - chainntnfs.Log.Infof("Missed disconnected " + - "blocks, attempting to catch up") - } - newBestBlock, err := chainntnfs.RewindChain( - n.chainConn, n.txNotifier, n.bestBlock, - int32(update.height-1), - ) - if err != nil { - chainntnfs.Log.Errorf("Unable to rewind chain "+ - "from height %d to height %d: %v", - n.bestBlock.Height, update.height-1, err) - } - - // Set the bestHeight here in case a chain rewind - // partially completed. - n.bestBlock = newBestBlock - n.bestBlockMtx.Unlock() + n.disconnectFilteredBlock(update) case txUpdate := <-n.txUpdates.ChanOut(): // A new relevant transaction notification has been @@ -774,6 +798,14 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return ntfn.Event, nil } + // Grab the current best height as the height may have been updated + // while we were draining the chainUpdates queue. + n.bestBlockMtx.RLock() + currentHeight := uint32(n.bestBlock.Height) + n.bestBlockMtx.RUnlock() + + ntfn.HistoricalDispatch.EndHeight = currentHeight + // With the filter updated, we'll dispatch our historical rescan to // ensure we detect the spend if it happened in the past. n.wg.Add(1) @@ -926,6 +958,14 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, return ntfn.Event, nil } + // Grab the current best height as the height may have been updated + // while we were draining the chainUpdates queue. + n.bestBlockMtx.RLock() + currentHeight := uint32(n.bestBlock.Height) + n.bestBlockMtx.RUnlock() + + ntfn.HistoricalDispatch.EndHeight = currentHeight + // Finally, with the filter updated, we can dispatch the historical // rescan to ensure we can detect if the event happened in the past. select { diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index 728514aa8..d19783cce 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, ) n.rescanErr = n.chainView.Start() - n.chainUpdates.Start() n.txUpdates.Start() if generateBlocks != nil { @@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, loop: for { select { - case ntfn := <-n.chainUpdates.ChanOut(): - lastReceivedNtfn := ntfn.(*filteredBlock) + case ntfn := <-n.chainUpdates: + lastReceivedNtfn := ntfn if lastReceivedNtfn.height >= uint32(syncHeight) { break loop } diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 2e527cfa4..477d45286 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -872,10 +872,13 @@ func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest, func (n *TxNotifier) dispatchConfDetails( ntfn *ConfNtfn, details *TxConfirmation) error { - // If no details are provided, return early as we can't dispatch. - if details == nil { - Log.Debugf("Unable to dispatch %v, no details provided", - ntfn.ConfRequest) + // If there are no conf details to dispatch or if the notification has + // already been dispatched, then we can skip dispatching to this + // client. + if details == nil || ntfn.dispatched { + Log.Debugf("Skipping dispatch of conf details(%v) for "+ + "request %v, dispatched=%v", details, ntfn.ConfRequest, + ntfn.dispatched) return nil } diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index a4bc53c87..25600ae4a 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -221,6 +221,8 @@ mode](https://github.com/lightningnetwork/lnd/pull/5564). [A validation check for sane `CltvLimit` and `FinalCltvDelta` has been added for `REST`-initiated payments.](https://github.com/lightningnetwork/lnd/pull/5591) +[A bug has been fixed with Neutrino's `RegisterConfirmationsNtfn` and `RegisterSpendNtfn` calls that would cause notifications to be missed.](https://github.com/lightningnetwork/lnd/pull/5453) + ## Documentation The [code contribution guidelines have been updated to mention the new