router: fix 'out of order block error' by retrieving missing blocks in router

Fixes an issue where an out of order block error occurs in the router. When this occurs, the change uses the chain notifier to catch up on missed blocks and uses those blocks to fully update the routing graph with closed channels. Fixes #4710, #5132
This commit is contained in:
Turtle 2021-03-17 22:31:41 -04:00
parent b0f3a08f2d
commit 147c7dc94c
No known key found for this signature in database
GPG Key ID: CFBD7F29F6286C97
2 changed files with 147 additions and 51 deletions

View File

@ -19,6 +19,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/amp"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/htlcswitch"
@ -290,6 +291,10 @@ type Config struct {
// we need in order to properly maintain the channel graph.
ChainView chainview.FilteredChainView
// Notifier is a reference to the ChainNotifier, used to grab
// the latest blocks if the router is missing any.
Notifier chainntnfs.ChainNotifier
// Payer is an instance of a PaymentAttemptDispatcher and is used by
// the router to send payment attempts onto the network, and receive
// their results.
@ -1140,61 +1145,40 @@ func (r *ChannelRouter) networkHandler() {
// We'll ensure that any new blocks received attach
// directly to the end of our main chain. If not, then
// we've somehow missed some blocks. We don't process
// this block as otherwise, we may miss on-chain
// events.
// we've somehow missed some blocks. Here we'll catch
// up the chain with the latest blocks.
currentHeight := atomic.LoadUint32(&r.bestHeight)
if chainUpdate.Height != currentHeight+1 {
log.Errorf("out of order block: expecting "+
"height=%v, got height=%v", currentHeight+1,
chainUpdate.Height)
continue
}
// Once a new block arrives, we update our running
// track of the height of the chain tip.
blockHeight := uint32(chainUpdate.Height)
atomic.StoreUint32(&r.bestHeight, blockHeight)
log.Infof("Pruning channel graph using block %v (height=%v)",
chainUpdate.Hash, blockHeight)
// We're only interested in all prior outputs that have
// been spent in the block, so collate all the
// referenced previous outpoints within each tx and
// input.
var spentOutputs []*wire.OutPoint
for _, tx := range chainUpdate.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
switch {
case chainUpdate.Height == currentHeight+1:
err := r.updateGraphWithClosedChannels(
chainUpdate,
)
if err != nil {
log.Errorf("unable to prune graph "+
"with closed channels: %v", err)
}
case chainUpdate.Height > currentHeight+1:
log.Errorf("out of order block: expecting "+
"height=%v, got height=%v",
currentHeight+1, chainUpdate.Height)
err := r.getMissingBlocks(currentHeight, chainUpdate)
if err != nil {
log.Errorf("unable to retrieve missing"+
"blocks: %v", err)
}
case chainUpdate.Height < currentHeight+1:
log.Errorf("out of order block: expecting "+
"height=%v, got height=%v",
currentHeight+1, chainUpdate.Height)
log.Infof("Skipping channel pruning since "+
"received block height %v was already"+
" processed.", chainUpdate.Height)
}
// With the spent outputs gathered, attempt to prune
// the channel graph, also passing in the hash+height
// of the block being pruned so the prune tip can be
// updated.
chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
&chainUpdate.Hash, chainUpdate.Height)
if err != nil {
log.Errorf("unable to prune routing table: %v", err)
continue
}
log.Infof("Block %v (height=%v) closed %v channels",
chainUpdate.Hash, blockHeight, len(chansClosed))
if len(chansClosed) == 0 {
continue
}
// Notify all currently registered clients of the newly
// closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
r.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
@ -1254,6 +1238,117 @@ func (r *ChannelRouter) networkHandler() {
}
}
// getMissingBlocks walks through all missing blocks and updates the graph
// closed channels accordingly.
func (r *ChannelRouter) getMissingBlocks(currentHeight uint32,
chainUpdate *chainview.FilteredBlock) error {
outdatedHash, err := r.cfg.Chain.GetBlockHash(int64(currentHeight))
if err != nil {
return err
}
outdatedBlock := &chainntnfs.BlockEpoch{
Height: int32(currentHeight),
Hash: outdatedHash,
}
epochClient, err := r.cfg.Notifier.RegisterBlockEpochNtfn(
outdatedBlock,
)
if err != nil {
return err
}
defer epochClient.Cancel()
blockDifference := int(chainUpdate.Height - currentHeight)
// We'll walk through all the outdated blocks and make sure we're able
// to update the graph with any closed channels from them.
for i := 0; i < blockDifference; i++ {
var (
missingBlock *chainntnfs.BlockEpoch
ok bool
)
select {
case missingBlock, ok = <-epochClient.Epochs:
if !ok {
return nil
}
case <-r.quit:
return nil
}
filteredBlock, err := r.cfg.ChainView.FilterBlock(
missingBlock.Hash,
)
if err != nil {
return err
}
err = r.updateGraphWithClosedChannels(
filteredBlock,
)
if err != nil {
return err
}
}
return nil
}
// updateGraphWithClosedChannels prunes the channel graph of closed channels
// that are no longer needed.
func (r *ChannelRouter) updateGraphWithClosedChannels(
chainUpdate *chainview.FilteredBlock) error {
// Once a new block arrives, we update our running track of the height
// of the chain tip.
blockHeight := chainUpdate.Height
atomic.StoreUint32(&r.bestHeight, blockHeight)
log.Infof("Pruning channel graph using block %v (height=%v)",
chainUpdate.Hash, blockHeight)
// We're only interested in all prior outputs that have been spent in
// the block, so collate all the referenced previous outpoints within
// each tx and input.
var spentOutputs []*wire.OutPoint
for _, tx := range chainUpdate.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
}
}
// With the spent outputs gathered, attempt to prune the channel graph,
// also passing in the hash+height of the block being pruned so the
// prune tip can be updated.
chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
&chainUpdate.Hash, chainUpdate.Height)
if err != nil {
log.Errorf("unable to prune routing table: %v", err)
return err
}
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed))
if len(chansClosed) == 0 {
return err
}
// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
r.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
return nil
}
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
// the database for the passed node with a timestamp newer than the passed
// timestamp. ErrIgnored will be returned if we already have the node, and

View File

@ -781,6 +781,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
Graph: chanGraph,
Chain: cc.ChainIO,
ChainView: cc.ChainView,
Notifier: cc.ChainNotifier,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.missionControl,