From 147c7dc94cb4d97bdaf523aa4bfa13a5e1c18824 Mon Sep 17 00:00:00 2001 From: Turtle Date: Wed, 17 Mar 2021 22:31:41 -0400 Subject: [PATCH] router: fix 'out of order block error' by retrieving missing blocks in router Fixes an issue where an out of order block error occurs in the router. When this occurs, the change uses the chain notifier to catch up on missed blocks and uses those blocks to fully update the routing graph with closed channels. Fixes #4710, #5132 --- routing/router.go | 197 ++++++++++++++++++++++++++++++++++------------ server.go | 1 + 2 files changed, 147 insertions(+), 51 deletions(-) diff --git a/routing/router.go b/routing/router.go index 17b5c4e72..8b9cd97da 100644 --- a/routing/router.go +++ b/routing/router.go @@ -19,6 +19,7 @@ import ( sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/amp" "github.com/lightningnetwork/lnd/batch" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/htlcswitch" @@ -290,6 +291,10 @@ type Config struct { // we need in order to properly maintain the channel graph. ChainView chainview.FilteredChainView + // Notifier is a reference to the ChainNotifier, used to grab + // the latest blocks if the router is missing any. + Notifier chainntnfs.ChainNotifier + // Payer is an instance of a PaymentAttemptDispatcher and is used by // the router to send payment attempts onto the network, and receive // their results. @@ -1140,61 +1145,40 @@ func (r *ChannelRouter) networkHandler() { // We'll ensure that any new blocks received attach // directly to the end of our main chain. If not, then - // we've somehow missed some blocks. We don't process - // this block as otherwise, we may miss on-chain - // events. + // we've somehow missed some blocks. Here we'll catch + // up the chain with the latest blocks. currentHeight := atomic.LoadUint32(&r.bestHeight) - if chainUpdate.Height != currentHeight+1 { - log.Errorf("out of order block: expecting "+ - "height=%v, got height=%v", currentHeight+1, - chainUpdate.Height) - continue - } - - // Once a new block arrives, we update our running - // track of the height of the chain tip. - blockHeight := uint32(chainUpdate.Height) - atomic.StoreUint32(&r.bestHeight, blockHeight) - log.Infof("Pruning channel graph using block %v (height=%v)", - chainUpdate.Hash, blockHeight) - - // We're only interested in all prior outputs that have - // been spent in the block, so collate all the - // referenced previous outpoints within each tx and - // input. - var spentOutputs []*wire.OutPoint - for _, tx := range chainUpdate.Transactions { - for _, txIn := range tx.TxIn { - spentOutputs = append(spentOutputs, - &txIn.PreviousOutPoint) + switch { + case chainUpdate.Height == currentHeight+1: + err := r.updateGraphWithClosedChannels( + chainUpdate, + ) + if err != nil { + log.Errorf("unable to prune graph "+ + "with closed channels: %v", err) } + + case chainUpdate.Height > currentHeight+1: + log.Errorf("out of order block: expecting "+ + "height=%v, got height=%v", + currentHeight+1, chainUpdate.Height) + + err := r.getMissingBlocks(currentHeight, chainUpdate) + if err != nil { + log.Errorf("unable to retrieve missing"+ + "blocks: %v", err) + } + + case chainUpdate.Height < currentHeight+1: + log.Errorf("out of order block: expecting "+ + "height=%v, got height=%v", + currentHeight+1, chainUpdate.Height) + + log.Infof("Skipping channel pruning since "+ + "received block height %v was already"+ + " processed.", chainUpdate.Height) } - // With the spent outputs gathered, attempt to prune - // the channel graph, also passing in the hash+height - // of the block being pruned so the prune tip can be - // updated. - chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, - &chainUpdate.Hash, chainUpdate.Height) - if err != nil { - log.Errorf("unable to prune routing table: %v", err) - continue - } - - log.Infof("Block %v (height=%v) closed %v channels", - chainUpdate.Hash, blockHeight, len(chansClosed)) - - if len(chansClosed) == 0 { - continue - } - - // Notify all currently registered clients of the newly - // closed channels. - closeSummaries := createCloseSummaries(blockHeight, chansClosed...) - r.notifyTopologyChange(&TopologyChange{ - ClosedChannels: closeSummaries, - }) - // A new notification client update has arrived. We're either // gaining a new client, or cancelling notifications for an // existing client. @@ -1254,6 +1238,117 @@ func (r *ChannelRouter) networkHandler() { } } +// getMissingBlocks walks through all missing blocks and updates the graph +// closed channels accordingly. +func (r *ChannelRouter) getMissingBlocks(currentHeight uint32, + chainUpdate *chainview.FilteredBlock) error { + + outdatedHash, err := r.cfg.Chain.GetBlockHash(int64(currentHeight)) + if err != nil { + return err + } + + outdatedBlock := &chainntnfs.BlockEpoch{ + Height: int32(currentHeight), + Hash: outdatedHash, + } + + epochClient, err := r.cfg.Notifier.RegisterBlockEpochNtfn( + outdatedBlock, + ) + if err != nil { + return err + } + defer epochClient.Cancel() + + blockDifference := int(chainUpdate.Height - currentHeight) + + // We'll walk through all the outdated blocks and make sure we're able + // to update the graph with any closed channels from them. + for i := 0; i < blockDifference; i++ { + var ( + missingBlock *chainntnfs.BlockEpoch + ok bool + ) + + select { + case missingBlock, ok = <-epochClient.Epochs: + if !ok { + return nil + } + + case <-r.quit: + return nil + } + + filteredBlock, err := r.cfg.ChainView.FilterBlock( + missingBlock.Hash, + ) + if err != nil { + return err + } + + err = r.updateGraphWithClosedChannels( + filteredBlock, + ) + if err != nil { + return err + } + } + + return nil +} + +// updateGraphWithClosedChannels prunes the channel graph of closed channels +// that are no longer needed. +func (r *ChannelRouter) updateGraphWithClosedChannels( + chainUpdate *chainview.FilteredBlock) error { + + // Once a new block arrives, we update our running track of the height + // of the chain tip. + blockHeight := chainUpdate.Height + + atomic.StoreUint32(&r.bestHeight, blockHeight) + log.Infof("Pruning channel graph using block %v (height=%v)", + chainUpdate.Hash, blockHeight) + + // We're only interested in all prior outputs that have been spent in + // the block, so collate all the referenced previous outpoints within + // each tx and input. + var spentOutputs []*wire.OutPoint + for _, tx := range chainUpdate.Transactions { + for _, txIn := range tx.TxIn { + spentOutputs = append(spentOutputs, + &txIn.PreviousOutPoint) + } + } + + // With the spent outputs gathered, attempt to prune the channel graph, + // also passing in the hash+height of the block being pruned so the + // prune tip can be updated. + chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, + &chainUpdate.Hash, chainUpdate.Height) + if err != nil { + log.Errorf("unable to prune routing table: %v", err) + return err + } + + log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, + blockHeight, len(chansClosed)) + + if len(chansClosed) == 0 { + return err + } + + // Notify all currently registered clients of the newly closed channels. + closeSummaries := createCloseSummaries(blockHeight, chansClosed...) + r.notifyTopologyChange(&TopologyChange{ + ClosedChannels: closeSummaries, + }) + + return nil +} + // assertNodeAnnFreshness returns a non-nil error if we have an announcement in // the database for the passed node with a timestamp newer than the passed // timestamp. ErrIgnored will be returned if we already have the node, and diff --git a/server.go b/server.go index 8e0498f81..b8d711529 100644 --- a/server.go +++ b/server.go @@ -781,6 +781,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Graph: chanGraph, Chain: cc.ChainIO, ChainView: cc.ChainView, + Notifier: cc.ChainNotifier, Payer: s.htlcSwitch, Control: s.controlTower, MissionControl: s.missionControl,