chainntnfs/neutrinonotify: call drainChainUpdates when updating filter

Moves filter handling logic for connecting blocks to the
connectFilteredBlock method. The disconnect logic now lives in
disconnectFilteredBlock. After updating the filter, drainChainUpdates
is called which will drain everything from the chainUpdates chan and
apply all updates by calling either connectFilteredBlock or
disconnectFilteredBlock. This will allow callers to update their
EndHeight if performing a historical dispatch, as blocks up to this
height may not have had the filter applied.
This commit is contained in:
eugene 2021-07-16 15:39:53 -04:00
parent b7de0eae93
commit 0d39c0799a
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1

View File

@ -293,11 +293,96 @@ func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDe
}
}
// connectFilteredBlock is called when we receive a filteredBlock from the
// backend. If the block is ahead of what we're expecting, we'll attempt to
// catch up and then process the block.
func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) {
n.bestBlockMtx.Lock()
defer n.bestBlockMtx.Unlock()
if update.height != uint32(n.bestBlock.Height+1) {
chainntnfs.Log.Infof("Missed blocks, attempting to catch up")
_, missedBlocks, err := chainntnfs.HandleMissedBlocks(
n.chainConn, n.txNotifier, n.bestBlock,
int32(update.height), false,
)
if err != nil {
chainntnfs.Log.Error(err)
return
}
for _, block := range missedBlocks {
filteredBlock, err := n.getFilteredBlock(block)
if err != nil {
chainntnfs.Log.Error(err)
return
}
err = n.handleBlockConnected(filteredBlock)
if err != nil {
chainntnfs.Log.Error(err)
return
}
}
}
err := n.handleBlockConnected(update)
if err != nil {
chainntnfs.Log.Error(err)
}
}
// disconnectFilteredBlock is called when our disconnected filtered block
// callback is fired. It attempts to rewind the chain to before the
// disconnection and updates our best block.
func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) {
n.bestBlockMtx.Lock()
defer n.bestBlockMtx.Unlock()
if update.height != uint32(n.bestBlock.Height) {
chainntnfs.Log.Infof("Missed disconnected blocks, attempting" +
" to catch up")
}
newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier,
n.bestBlock, int32(update.height-1),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+
"to height %d: %v", n.bestBlock.Height,
update.height-1, err,
)
}
n.bestBlock = newBestBlock
}
// drainChainUpdates is called after updating the filter. It reads every
// buffered item off the chan and returns when no more are available. It is
// used to ensure that callers performing a historical scan properly update
// their EndHeight to scan blocks that did not have the filter applied at
// processing time. Without this, a race condition exists that could allow a
// spend or confirmation notification to be missed. It is unlikely this would
// occur in a real-world scenario, and instead would manifest itself in tests.
func (n *NeutrinoNotifier) drainChainUpdates() {
for {
select {
case update := <-n.chainUpdates:
if update.connect {
n.connectFilteredBlock(update)
break
}
n.disconnectFilteredBlock(update)
default:
return
}
}
}
// notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches.
func (n *NeutrinoNotifier) notificationDispatcher() {
defer n.wg.Done()
out:
for {
select {
case cancelMsg := <-n.notificationCancels:
@ -409,88 +494,28 @@ out:
chainntnfs.Log.Errorf("Unable to "+
"update rescan filter: %v", err)
}
// Drain the chainUpdates chan so the caller
// listening on errChan can be sure that
// updates after receiving the error will have
// the filter applied. This allows the caller
// to update their EndHeight if they're
// performing a historical scan.
n.drainChainUpdates()
// After draining, send the error to the
// caller.
msg.errChan <- err
}
case item := <-n.chainUpdates:
update := item
if update.connect {
n.bestBlockMtx.Lock()
// Since neutrino has no way of knowing what
// height to rewind to in the case of a reorged
// best known height, there is no point in
// checking that the previous hash matches the
// the hash from our best known height the way
// the other notifiers do when they receive
// a new connected block. Therefore, we just
// compare the heights.
if update.height != uint32(n.bestBlock.Height+1) {
// Handle the case where the notifier
// missed some blocks from its chain
// backend
chainntnfs.Log.Infof("Missed blocks, " +
"attempting to catch up")
_, missedBlocks, err :=
chainntnfs.HandleMissedBlocks(
n.chainConn,
n.txNotifier,
n.bestBlock,
int32(update.height),
false,
)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue
}
for _, block := range missedBlocks {
filteredBlock, err :=
n.getFilteredBlock(block)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue out
}
err = n.handleBlockConnected(filteredBlock)
if err != nil {
chainntnfs.Log.Error(err)
n.bestBlockMtx.Unlock()
continue out
}
}
}
err := n.handleBlockConnected(update)
if err != nil {
chainntnfs.Log.Error(err)
}
n.bestBlockMtx.Unlock()
n.connectFilteredBlock(update)
continue
}
n.bestBlockMtx.Lock()
if update.height != uint32(n.bestBlock.Height) {
chainntnfs.Log.Infof("Missed disconnected " +
"blocks, attempting to catch up")
}
newBestBlock, err := chainntnfs.RewindChain(
n.chainConn, n.txNotifier, n.bestBlock,
int32(update.height-1),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v",
n.bestBlock.Height, update.height-1, err)
}
// Set the bestHeight here in case a chain rewind
// partially completed.
n.bestBlock = newBestBlock
n.bestBlockMtx.Unlock()
n.disconnectFilteredBlock(update)
case txUpdate := <-n.txUpdates.ChanOut():
// A new relevant transaction notification has been