diff --git a/routing/router.go b/routing/router.go index 17b5c4e72..8b9cd97da 100644 --- a/routing/router.go +++ b/routing/router.go @@ -19,6 +19,7 @@ import ( sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/amp" "github.com/lightningnetwork/lnd/batch" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/htlcswitch" @@ -290,6 +291,10 @@ type Config struct { // we need in order to properly maintain the channel graph. ChainView chainview.FilteredChainView + // Notifier is a reference to the ChainNotifier, used to grab + // the latest blocks if the router is missing any. + Notifier chainntnfs.ChainNotifier + // Payer is an instance of a PaymentAttemptDispatcher and is used by // the router to send payment attempts onto the network, and receive // their results. @@ -1140,61 +1145,40 @@ func (r *ChannelRouter) networkHandler() { // We'll ensure that any new blocks received attach // directly to the end of our main chain. If not, then - // we've somehow missed some blocks. We don't process - // this block as otherwise, we may miss on-chain - // events. + // we've somehow missed some blocks. Here we'll catch + // up the chain with the latest blocks. currentHeight := atomic.LoadUint32(&r.bestHeight) - if chainUpdate.Height != currentHeight+1 { - log.Errorf("out of order block: expecting "+ - "height=%v, got height=%v", currentHeight+1, - chainUpdate.Height) - continue - } - - // Once a new block arrives, we update our running - // track of the height of the chain tip. - blockHeight := uint32(chainUpdate.Height) - atomic.StoreUint32(&r.bestHeight, blockHeight) - log.Infof("Pruning channel graph using block %v (height=%v)", - chainUpdate.Hash, blockHeight) - - // We're only interested in all prior outputs that have - // been spent in the block, so collate all the - // referenced previous outpoints within each tx and - // input. - var spentOutputs []*wire.OutPoint - for _, tx := range chainUpdate.Transactions { - for _, txIn := range tx.TxIn { - spentOutputs = append(spentOutputs, - &txIn.PreviousOutPoint) + switch { + case chainUpdate.Height == currentHeight+1: + err := r.updateGraphWithClosedChannels( + chainUpdate, + ) + if err != nil { + log.Errorf("unable to prune graph "+ + "with closed channels: %v", err) } + + case chainUpdate.Height > currentHeight+1: + log.Errorf("out of order block: expecting "+ + "height=%v, got height=%v", + currentHeight+1, chainUpdate.Height) + + err := r.getMissingBlocks(currentHeight, chainUpdate) + if err != nil { + log.Errorf("unable to retrieve missing"+ + "blocks: %v", err) + } + + case chainUpdate.Height < currentHeight+1: + log.Errorf("out of order block: expecting "+ + "height=%v, got height=%v", + currentHeight+1, chainUpdate.Height) + + log.Infof("Skipping channel pruning since "+ + "received block height %v was already"+ + " processed.", chainUpdate.Height) } - // With the spent outputs gathered, attempt to prune - // the channel graph, also passing in the hash+height - // of the block being pruned so the prune tip can be - // updated. - chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, - &chainUpdate.Hash, chainUpdate.Height) - if err != nil { - log.Errorf("unable to prune routing table: %v", err) - continue - } - - log.Infof("Block %v (height=%v) closed %v channels", - chainUpdate.Hash, blockHeight, len(chansClosed)) - - if len(chansClosed) == 0 { - continue - } - - // Notify all currently registered clients of the newly - // closed channels. - closeSummaries := createCloseSummaries(blockHeight, chansClosed...) - r.notifyTopologyChange(&TopologyChange{ - ClosedChannels: closeSummaries, - }) - // A new notification client update has arrived. We're either // gaining a new client, or cancelling notifications for an // existing client. @@ -1254,6 +1238,117 @@ func (r *ChannelRouter) networkHandler() { } } +// getMissingBlocks walks through all missing blocks and updates the graph +// closed channels accordingly. +func (r *ChannelRouter) getMissingBlocks(currentHeight uint32, + chainUpdate *chainview.FilteredBlock) error { + + outdatedHash, err := r.cfg.Chain.GetBlockHash(int64(currentHeight)) + if err != nil { + return err + } + + outdatedBlock := &chainntnfs.BlockEpoch{ + Height: int32(currentHeight), + Hash: outdatedHash, + } + + epochClient, err := r.cfg.Notifier.RegisterBlockEpochNtfn( + outdatedBlock, + ) + if err != nil { + return err + } + defer epochClient.Cancel() + + blockDifference := int(chainUpdate.Height - currentHeight) + + // We'll walk through all the outdated blocks and make sure we're able + // to update the graph with any closed channels from them. + for i := 0; i < blockDifference; i++ { + var ( + missingBlock *chainntnfs.BlockEpoch + ok bool + ) + + select { + case missingBlock, ok = <-epochClient.Epochs: + if !ok { + return nil + } + + case <-r.quit: + return nil + } + + filteredBlock, err := r.cfg.ChainView.FilterBlock( + missingBlock.Hash, + ) + if err != nil { + return err + } + + err = r.updateGraphWithClosedChannels( + filteredBlock, + ) + if err != nil { + return err + } + } + + return nil +} + +// updateGraphWithClosedChannels prunes the channel graph of closed channels +// that are no longer needed. +func (r *ChannelRouter) updateGraphWithClosedChannels( + chainUpdate *chainview.FilteredBlock) error { + + // Once a new block arrives, we update our running track of the height + // of the chain tip. + blockHeight := chainUpdate.Height + + atomic.StoreUint32(&r.bestHeight, blockHeight) + log.Infof("Pruning channel graph using block %v (height=%v)", + chainUpdate.Hash, blockHeight) + + // We're only interested in all prior outputs that have been spent in + // the block, so collate all the referenced previous outpoints within + // each tx and input. + var spentOutputs []*wire.OutPoint + for _, tx := range chainUpdate.Transactions { + for _, txIn := range tx.TxIn { + spentOutputs = append(spentOutputs, + &txIn.PreviousOutPoint) + } + } + + // With the spent outputs gathered, attempt to prune the channel graph, + // also passing in the hash+height of the block being pruned so the + // prune tip can be updated. + chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs, + &chainUpdate.Hash, chainUpdate.Height) + if err != nil { + log.Errorf("unable to prune routing table: %v", err) + return err + } + + log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, + blockHeight, len(chansClosed)) + + if len(chansClosed) == 0 { + return err + } + + // Notify all currently registered clients of the newly closed channels. + closeSummaries := createCloseSummaries(blockHeight, chansClosed...) + r.notifyTopologyChange(&TopologyChange{ + ClosedChannels: closeSummaries, + }) + + return nil +} + // assertNodeAnnFreshness returns a non-nil error if we have an announcement in // the database for the passed node with a timestamp newer than the passed // timestamp. ErrIgnored will be returned if we already have the node, and diff --git a/server.go b/server.go index 8e0498f81..b8d711529 100644 --- a/server.go +++ b/server.go @@ -781,6 +781,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Graph: chanGraph, Chain: cc.ChainIO, ChainView: cc.ChainView, + Notifier: cc.ChainNotifier, Payer: s.htlcSwitch, Control: s.controlTower, MissionControl: s.missionControl,