From 86f28cdc1de8ffeee3ba04c074cb6f474c233979 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 16 Jul 2021 15:27:17 -0400 Subject: [PATCH 1/5] chainntnfs: skip conf dispatch if ntfn already dispatched This mirrors the logic for the spend case. This prevents a double dispatch scenario when combined with a later commit. Otherwise, the confirmation would linger in the buffer which is not ideal. --- chainntnfs/txnotifier.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 2e527cfa4..477d45286 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -872,10 +872,13 @@ func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest, func (n *TxNotifier) dispatchConfDetails( ntfn *ConfNtfn, details *TxConfirmation) error { - // If no details are provided, return early as we can't dispatch. - if details == nil { - Log.Debugf("Unable to dispatch %v, no details provided", - ntfn.ConfRequest) + // If there are no conf details to dispatch or if the notification has + // already been dispatched, then we can skip dispatching to this + // client. + if details == nil || ntfn.dispatched { + Log.Debugf("Skipping dispatch of conf details(%v) for "+ + "request %v, dispatched=%v", details, ntfn.ConfRequest, + ntfn.dispatched) return nil } From b7de0eae93e81e1b7250f48356c7c306f512cf23 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 16 Jul 2021 15:35:43 -0400 Subject: [PATCH 2/5] chainntnfs/neutrinonotify: make chainUpdates a buffered chan ConcurrentQueue has internal structures so if a filterUpdate exists in it, the idea of draining the queue may not work reliably. The update may exist in the ConcurrentQueue but may not be available via ChanOut() when we're ready to drain the ConcurrentQueue. Fix this by using a regular buffered chan, which will either have the update or not have the update. Its size is set to 100 as our tests may generate quite a bit of updates. --- chainntnfs/neutrinonotify/neutrino.go | 21 ++++++++++----------- chainntnfs/neutrinonotify/neutrino_dev.go | 5 ++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 2320c0e6e..ef96eaa8a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -62,8 +62,9 @@ type NeutrinoNotifier struct { rescanErr <-chan error - chainUpdates *queue.ConcurrentQueue - txUpdates *queue.ConcurrentQueue + chainUpdates chan *filteredBlock + + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), - chainUpdates: queue.NewConcurrentQueue(10), - txUpdates: queue.NewConcurrentQueue(10), + chainUpdates: make(chan *filteredBlock, 100), + + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.chainUpdates.Stop() n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related @@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. - n.chainUpdates.Start() n.txUpdates.Start() // First, we'll obtain the latest block height of the p2p node. We'll @@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error { startingPoint, err := n.p2pNode.BestBlock() if err != nil { n.txUpdates.Stop() - n.chainUpdates.Stop() return err } n.bestBlock.Hash = &startingPoint.Hash @@ -251,7 +250,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, @@ -269,7 +268,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, @@ -413,8 +412,8 @@ out: msg.errChan <- err } - case item := <-n.chainUpdates.ChanOut(): - update := item.(*filteredBlock) + case item := <-n.chainUpdates: + update := item if update.connect { n.bestBlockMtx.Lock() // Since neutrino has no way of knowing what diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index 728514aa8..d19783cce 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, ) n.rescanErr = n.chainView.Start() - n.chainUpdates.Start() n.txUpdates.Start() if generateBlocks != nil { @@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, loop: for { select { - case ntfn := <-n.chainUpdates.ChanOut(): - lastReceivedNtfn := ntfn.(*filteredBlock) + case ntfn := <-n.chainUpdates: + lastReceivedNtfn := ntfn if lastReceivedNtfn.height >= uint32(syncHeight) { break loop } From 0d39c0799adfd5cce35cbc471ba9479c10775c7a Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 16 Jul 2021 15:39:53 -0400 Subject: [PATCH 3/5] chainntnfs/neutrinonotify: call drainChainUpdates when updating filter Moves filter handling logic for connecting blocks to the connectFilteredBlock method. The disconnect logic now lives in disconnectFilteredBlock. After updating the filter, drainChainUpdates is called which will drain everything from the chainUpdates chan and apply all updates by calling either connectFilteredBlock or disconnectFilteredBlock. This will allow callers to update their EndHeight if performing a historical dispatch, as blocks up to this height may not have had the filter applied. --- chainntnfs/neutrinonotify/neutrino.go | 173 +++++++++++++++----------- 1 file changed, 99 insertions(+), 74 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index ef96eaa8a..6e82b0b9a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -293,11 +293,96 @@ func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDe } } +// connectFilteredBlock is called when we receive a filteredBlock from the +// backend. If the block is ahead of what we're expecting, we'll attempt to +// catch up and then process the block. +func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) { + n.bestBlockMtx.Lock() + defer n.bestBlockMtx.Unlock() + + if update.height != uint32(n.bestBlock.Height+1) { + chainntnfs.Log.Infof("Missed blocks, attempting to catch up") + + _, missedBlocks, err := chainntnfs.HandleMissedBlocks( + n.chainConn, n.txNotifier, n.bestBlock, + int32(update.height), false, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + for _, block := range missedBlocks { + filteredBlock, err := n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + return + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + return + } + } + } + + err := n.handleBlockConnected(update) + if err != nil { + chainntnfs.Log.Error(err) + } +} + +// disconnectFilteredBlock is called when our disconnected filtered block +// callback is fired. It attempts to rewind the chain to before the +// disconnection and updates our best block. +func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) { + n.bestBlockMtx.Lock() + defer n.bestBlockMtx.Unlock() + + if update.height != uint32(n.bestBlock.Height) { + chainntnfs.Log.Infof("Missed disconnected blocks, attempting" + + " to catch up") + } + newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier, + n.bestBlock, int32(update.height-1), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+ + "to height %d: %v", n.bestBlock.Height, + update.height-1, err, + ) + } + + n.bestBlock = newBestBlock +} + +// drainChainUpdates is called after updating the filter. It reads every +// buffered item off the chan and returns when no more are available. It is +// used to ensure that callers performing a historical scan properly update +// their EndHeight to scan blocks that did not have the filter applied at +// processing time. Without this, a race condition exists that could allow a +// spend or confirmation notification to be missed. It is unlikely this would +// occur in a real-world scenario, and instead would manifest itself in tests. +func (n *NeutrinoNotifier) drainChainUpdates() { + for { + select { + case update := <-n.chainUpdates: + if update.connect { + n.connectFilteredBlock(update) + break + } + n.disconnectFilteredBlock(update) + default: + return + } + } +} + // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() -out: + for { select { case cancelMsg := <-n.notificationCancels: @@ -409,88 +494,28 @@ out: chainntnfs.Log.Errorf("Unable to "+ "update rescan filter: %v", err) } + + // Drain the chainUpdates chan so the caller + // listening on errChan can be sure that + // updates after receiving the error will have + // the filter applied. This allows the caller + // to update their EndHeight if they're + // performing a historical scan. + n.drainChainUpdates() + + // After draining, send the error to the + // caller. msg.errChan <- err } case item := <-n.chainUpdates: update := item if update.connect { - n.bestBlockMtx.Lock() - // Since neutrino has no way of knowing what - // height to rewind to in the case of a reorged - // best known height, there is no point in - // checking that the previous hash matches the - // the hash from our best known height the way - // the other notifiers do when they receive - // a new connected block. Therefore, we just - // compare the heights. - if update.height != uint32(n.bestBlock.Height+1) { - // Handle the case where the notifier - // missed some blocks from its chain - // backend - chainntnfs.Log.Infof("Missed blocks, " + - "attempting to catch up") - - _, missedBlocks, err := - chainntnfs.HandleMissedBlocks( - n.chainConn, - n.txNotifier, - n.bestBlock, - int32(update.height), - false, - ) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue - } - - for _, block := range missedBlocks { - filteredBlock, err := - n.getFilteredBlock(block) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue out - } - err = n.handleBlockConnected(filteredBlock) - if err != nil { - chainntnfs.Log.Error(err) - n.bestBlockMtx.Unlock() - continue out - } - } - - } - - err := n.handleBlockConnected(update) - if err != nil { - chainntnfs.Log.Error(err) - } - - n.bestBlockMtx.Unlock() + n.connectFilteredBlock(update) continue } - n.bestBlockMtx.Lock() - if update.height != uint32(n.bestBlock.Height) { - chainntnfs.Log.Infof("Missed disconnected " + - "blocks, attempting to catch up") - } - newBestBlock, err := chainntnfs.RewindChain( - n.chainConn, n.txNotifier, n.bestBlock, - int32(update.height-1), - ) - if err != nil { - chainntnfs.Log.Errorf("Unable to rewind chain "+ - "from height %d to height %d: %v", - n.bestBlock.Height, update.height-1, err) - } - - // Set the bestHeight here in case a chain rewind - // partially completed. - n.bestBlock = newBestBlock - n.bestBlockMtx.Unlock() + n.disconnectFilteredBlock(update) case txUpdate := <-n.txUpdates.ChanOut(): // A new relevant transaction notification has been From de71195bcbc02697e6fe9707baac9bcd8278531d Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 16 Jul 2021 15:47:51 -0400 Subject: [PATCH 4/5] chainntnfs/neutrinonotify: update EndHeight after filter update After the error is received on the filter update errChan, update the EndHeight if we're performing a historical scan. If a block was mined after the call to RegisterConf/RegisterSpend but before the filter was updated, then the block would not have the filter applied. This means that a block containing the desired conf/spend parameters would be undetected. Fix this by ensuring the historical scan also includes this height, as it would previously not be included. --- chainntnfs/neutrinonotify/neutrino.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 6e82b0b9a..ace62f3b8 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -777,6 +777,14 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return ntfn.Event, nil } + // Grab the current best height as the height may have been updated + // while we were draining the chainUpdates queue. + n.bestBlockMtx.RLock() + currentHeight := uint32(n.bestBlock.Height) + n.bestBlockMtx.RUnlock() + + ntfn.HistoricalDispatch.EndHeight = currentHeight + // With the filter updated, we'll dispatch our historical rescan to // ensure we detect the spend if it happened in the past. n.wg.Add(1) @@ -929,6 +937,14 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, return ntfn.Event, nil } + // Grab the current best height as the height may have been updated + // while we were draining the chainUpdates queue. + n.bestBlockMtx.RLock() + currentHeight := uint32(n.bestBlock.Height) + n.bestBlockMtx.RUnlock() + + ntfn.HistoricalDispatch.EndHeight = currentHeight + // Finally, with the filter updated, we can dispatch the historical // rescan to ensure we can detect if the event happened in the past. select { From 9f6f5e963aa0dda18cfa3180de3fa4bb5c523dd4 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 11 Aug 2021 15:41:59 -0400 Subject: [PATCH 5/5] docs: update release notes for 0.14 --- docs/release-notes/release-notes-0.14.0.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 94a8df3a3..e20c24059 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -190,6 +190,8 @@ mode](https://github.com/lightningnetwork/lnd/pull/5564). [A validation check for sane `CltvLimit` and `FinalCltvDelta` has been added for `REST`-initiated payments.](https://github.com/lightningnetwork/lnd/pull/5591) +[A bug has been fixed with Neutrino's `RegisterConfirmationsNtfn` and `RegisterSpendNtfn` calls that would cause notifications to be missed.](https://github.com/lightningnetwork/lnd/pull/5453) + ## Documentation The [code contribution guidelines have been updated to mention the new