Merge pull request #5453 from Crypt-iQ/neutrino_conf_0629

chainntnfs: neutrinonotify patches
This commit is contained in:
Olaoluwa Osuntokun 2021-08-30 16:04:57 -07:00 committed by GitHub
commit 480a111c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 92 deletions

View File

@ -62,8 +62,9 @@ type NeutrinoNotifier struct {
rescanErr <-chan error
chainUpdates *queue.ConcurrentQueue
txUpdates *queue.ConcurrentQueue
chainUpdates chan *filteredBlock
txUpdates *queue.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
rescanErr: make(chan error),
chainUpdates: queue.NewConcurrentQueue(10),
txUpdates: queue.NewConcurrentQueue(10),
chainUpdates: make(chan *filteredBlock, 100),
txUpdates: queue.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error {
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be
// blocked.
n.chainUpdates.Start()
n.txUpdates.Start()
// First, we'll obtain the latest block height of the p2p node. We'll
@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error {
startingPoint, err := n.p2pNode.BestBlock()
if err != nil {
n.txUpdates.Stop()
n.chainUpdates.Stop()
return err
}
startingHeader, err := n.p2pNode.GetBlockHeader(
@ -262,7 +261,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
// Append this new chain update to the end of the queue of new chain
// updates.
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
case n.chainUpdates <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
txns: txns,
@ -281,7 +280,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// Append this new chain update to the end of the queue of new chain
// disconnects.
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
case n.chainUpdates <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
connect: false,
@ -306,11 +305,96 @@ func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDe
}
}
// connectFilteredBlock is called when we receive a filteredBlock from the
// backend. If the block is ahead of what we're expecting, we'll attempt to
// catch up and then process the block.
func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) {
n.bestBlockMtx.Lock()
defer n.bestBlockMtx.Unlock()
if update.height != uint32(n.bestBlock.Height+1) {
chainntnfs.Log.Infof("Missed blocks, attempting to catch up")
_, missedBlocks, err := chainntnfs.HandleMissedBlocks(
n.chainConn, n.txNotifier, n.bestBlock,
int32(update.height), false,
)
if err != nil {
chainntnfs.Log.Error(err)
return
}
for _, block := range missedBlocks {
filteredBlock, err := n.getFilteredBlock(block)
if err != nil {
chainntnfs.Log.Error(err)
return
}
err = n.handleBlockConnected(filteredBlock)
if err != nil {
chainntnfs.Log.Error(err)
return
}
}
}
err := n.handleBlockConnected(update)
if err != nil {
chainntnfs.Log.Error(err)
}
}
// disconnectFilteredBlock is called when our disconnected filtered block
// callback is fired. It attempts to rewind the chain to before the
// disconnection and updates our best block.
func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) {
n.bestBlockMtx.Lock()
defer n.bestBlockMtx.Unlock()
if update.height != uint32(n.bestBlock.Height) {
chainntnfs.Log.Infof("Missed disconnected blocks, attempting" +
" to catch up")
}
newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier,
n.bestBlock, int32(update.height-1),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+
"to height %d: %v", n.bestBlock.Height,
update.height-1, err,
)
}
n.bestBlock = newBestBlock
}
// drainChainUpdates is called after updating the filter. It reads every
// buffered item off the chan and returns when no more are available. It is
// used to ensure that callers performing a historical scan properly update
// their EndHeight to scan blocks that did not have the filter applied at
// processing time. Without this, a race condition exists that could allow a
// spend or confirmation notification to be missed. It is unlikely this would
// occur in a real-world scenario, and instead would manifest itself in tests.
func (n *NeutrinoNotifier) drainChainUpdates() {
for {
select {
case update := <-n.chainUpdates:
if update.connect {
n.connectFilteredBlock(update)
break
}
n.disconnectFilteredBlock(update)
default:
return
}
}
}
// notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches.
func (n *NeutrinoNotifier) notificationDispatcher() {
defer n.wg.Done()
out:
for {
select {
case cancelMsg := <-n.notificationCancels:
@ -424,88 +508,28 @@ out:
chainntnfs.Log.Errorf("Unable to "+
"update rescan filter: %v", err)
}
// Drain the chainUpdates chan so the caller
// listening on errChan can be sure that
// updates after receiving the error will have
// the filter applied. This allows the caller
// to update their EndHeight if they're
// performing a historical scan.
n.drainChainUpdates()
// After draining, send the error to the
// caller.
msg.errChan <- err
}
case item := <-n.chainUpdates.ChanOut():
update := item.(*filteredBlock)
case item := <-n.chainUpdates:
update := item
if update.connect {
n.bestBlockMtx.Lock()
// Since neutrino has no way of knowing what
// height to rewind to in the case of a reorged
// best known height, there is no point in
// checking that the previous hash matches the
// the hash from our best known height the way
// the other notifiers do when they receive
// a new connected block. Therefore, we just
// compare the heights.
if update.height != uint32(n.bestBlock.Height+1) {
// Handle the case where the notifier
// missed some blocks from its chain
// backend
chainntnfs.Log.Infof("Missed blocks, " +
"attempting to catch up")
_, missedBlocks, err :=
chainntnfs.HandleMissedBlocks(
n.chainConn,
n.txNotifier,
n.bestBlock,
int32(update.height),
false,
)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue
}
for _, block := range missedBlocks {
filteredBlock, err :=
n.getFilteredBlock(block)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue out
}
err = n.handleBlockConnected(filteredBlock)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue out
}
}
}
err := n.handleBlockConnected(update)
if err != nil {
chainntnfs.Log.Error(err)
}
n.bestBlockMtx.Unlock()
n.connectFilteredBlock(update)
continue
}
n.bestBlockMtx.Lock()
if update.height != uint32(n.bestBlock.Height) {
chainntnfs.Log.Infof("Missed disconnected " +
"blocks, attempting to catch up")
}
newBestBlock, err := chainntnfs.RewindChain(
n.chainConn, n.txNotifier, n.bestBlock,
int32(update.height-1),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v",
n.bestBlock.Height, update.height-1, err)
}
// Set the bestHeight here in case a chain rewind
// partially completed.
n.bestBlock = newBestBlock
n.bestBlockMtx.Unlock()
n.disconnectFilteredBlock(update)
case txUpdate := <-n.txUpdates.ChanOut():
// A new relevant transaction notification has been
@ -774,6 +798,14 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil
}
// Grab the current best height as the height may have been updated
// while we were draining the chainUpdates queue.
n.bestBlockMtx.RLock()
currentHeight := uint32(n.bestBlock.Height)
n.bestBlockMtx.RUnlock()
ntfn.HistoricalDispatch.EndHeight = currentHeight
// With the filter updated, we'll dispatch our historical rescan to
// ensure we detect the spend if it happened in the past.
n.wg.Add(1)
@ -926,6 +958,14 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
return ntfn.Event, nil
}
// Grab the current best height as the height may have been updated
// while we were draining the chainUpdates queue.
n.bestBlockMtx.RLock()
currentHeight := uint32(n.bestBlock.Height)
n.bestBlockMtx.RUnlock()
ntfn.HistoricalDispatch.EndHeight = currentHeight
// Finally, with the filter updated, we can dispatch the historical
// rescan to ensure we can detect if the event happened in the past.
select {

View File

@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
)
n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
n.txUpdates.Start()
if generateBlocks != nil {
@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
loop:
for {
select {
case ntfn := <-n.chainUpdates.ChanOut():
lastReceivedNtfn := ntfn.(*filteredBlock)
case ntfn := <-n.chainUpdates:
lastReceivedNtfn := ntfn
if lastReceivedNtfn.height >= uint32(syncHeight) {
break loop
}

View File

@ -872,10 +872,13 @@ func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest,
func (n *TxNotifier) dispatchConfDetails(
ntfn *ConfNtfn, details *TxConfirmation) error {
// If no details are provided, return early as we can't dispatch.
if details == nil {
Log.Debugf("Unable to dispatch %v, no details provided",
ntfn.ConfRequest)
// If there are no conf details to dispatch or if the notification has
// already been dispatched, then we can skip dispatching to this
// client.
if details == nil || ntfn.dispatched {
Log.Debugf("Skipping dispatch of conf details(%v) for "+
"request %v, dispatched=%v", details, ntfn.ConfRequest,
ntfn.dispatched)
return nil
}

View File

@ -221,6 +221,8 @@ mode](https://github.com/lightningnetwork/lnd/pull/5564).
[A validation check for sane `CltvLimit` and `FinalCltvDelta` has been added for `REST`-initiated payments.](https://github.com/lightningnetwork/lnd/pull/5591)
[A bug has been fixed with Neutrino's `RegisterConfirmationsNtfn` and `RegisterSpendNtfn` calls that would cause notifications to be missed.](https://github.com/lightningnetwork/lnd/pull/5453)
## Documentation
The [code contribution guidelines have been updated to mention the new