From 6d05cb5aae87b7c6c8400a318e905350b8c4bf8e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 30 Jan 2018 20:34:40 -0800 Subject: [PATCH] routing: extract zombie pruning to distinct method --- routing/router.go | 163 +++++++++++++++++++--------------- routing/validation_barrier.go | 8 +- 2 files changed, 94 insertions(+), 77 deletions(-) diff --git a/routing/router.go b/routing/router.go index beba40ad9..0ea7ed7b5 100644 --- a/routing/router.go +++ b/routing/router.go @@ -504,6 +504,94 @@ func (r *ChannelRouter) syncGraphWithChain() error { return nil } +// pruneZombieChans is a method that will be called periodically to prune out +// any "zombie" channels. We consider channels zombies if *both* edges haven't +// been updated since our zombie horizon. We do this periodically to keep a +// health, lively routing table. +func (r *ChannelRouter) pruneZombieChans() error { + var chansToPrune []wire.OutPoint + chanExpiry := r.cfg.ChannelPruneExpiry + + log.Infof("Examining Channel Graph for zombie channels") + + // First, we'll collect all the channels which are eligible for garbage + // collection due to being zombies. + filterPruneChans := func(info *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) error { + + // We'll ensure that we don't attempt to prune our *own* + // channels from the graph, as in any case this should be + // re-advertised by the sub-system above us. + if info.NodeKey1Bytes == r.selfNode.PubKeyBytes || + info.NodeKey2Bytes == r.selfNode.PubKeyBytes { + + return nil + } + + // If *both* edges haven't been updated for a period of + // chanExpiry, then we'll mark the channel itself as eligible + // for graph pruning. + e1Zombie, e2Zombie := true, true + if e1 != nil { + e1Zombie = time.Since(e1.LastUpdate) >= chanExpiry + if e1Zombie { + log.Tracef("Edge #1 of ChannelPoint(%v) "+ + "last update: %v", + info.ChannelPoint, e1.LastUpdate) + } + } + if e2 != nil { + e2Zombie = time.Since(e2.LastUpdate) >= chanExpiry + if e2Zombie { + log.Tracef("Edge #2 of ChannelPoint(%v) "+ + "last update: %v", + info.ChannelPoint, e2.LastUpdate) + } + } + if e1Zombie && e2Zombie { + log.Debugf("ChannelPoint(%v) is a zombie, collecting "+ + "to prune", info.ChannelPoint) + + // TODO(roasbeef): add ability to delete single + // directional edge + chansToPrune = append(chansToPrune, info.ChannelPoint) + + // As we're detecting this as a zombie channel, we'll + // add this to the set of recently rejected items so we + // don't re-accept it shortly after. + r.rejectCache[info.ChannelID] = struct{}{} + } + + return nil + } + + r.rejectMtx.Lock() + err := r.cfg.Graph.ForEachChannel(filterPruneChans) + if err != nil { + r.rejectMtx.Unlock() + return fmt.Errorf("Unable to filter local zombie "+ + "chans: %v", err) + } + + log.Infof("Pruning %v Zombie Channels", len(chansToPrune)) + + // With the set zombie-like channels obtained, we'll do another pass to + // delete al zombie channels from the channel graph. + for _, chanToPrune := range chansToPrune { + log.Tracef("Pruning zombie chan ChannelPoint(%v)", chanToPrune) + + err := r.cfg.Graph.DeleteChannelEdge(&chanToPrune) + if err != nil { + r.rejectMtx.Unlock() + return fmt.Errorf("Unable to prune zombie "+ + "chans: %v", err) + } + } + r.rejectMtx.Unlock() + + return nil +} + // networkHandler is the primary goroutine for the ChannelRouter. The roles of // this goroutine include answering queries related to the state of the // network, pruning the graph on new block notification, applying network @@ -716,79 +804,8 @@ func (r *ChannelRouter) networkHandler() { // state of the known graph to filter out any zombie channels // for pruning. case <-graphPruneTicker.C: - - var chansToPrune []wire.OutPoint - chanExpiry := r.cfg.ChannelPruneExpiry - - log.Infof("Examining Channel Graph for zombie channels") - - // First, we'll collect all the channels which are - // eligible for garbage collection due to being - // zombies. - filterPruneChans := func(info *channeldb.ChannelEdgeInfo, - e1, e2 *channeldb.ChannelEdgePolicy) error { - - // We'll ensure that we don't attempt to prune - // our *own* channels from the graph, as in any - // case this should be re-advertised by the - // sub-system above us. - if info.NodeKey1.IsEqual(r.selfNode.PubKey) || - info.NodeKey2.IsEqual(r.selfNode.PubKey) { - - return nil - } - - // If *both* edges haven't been updated for a - // period of chanExpiry, then we'll mark the - // channel itself as eligible for graph - // pruning. - e1Zombie, e2Zombie := true, true - if e1 != nil { - e1Zombie = time.Since(e1.LastUpdate) >= chanExpiry - log.Tracef("Edge #1 of ChannelPoint(%v) "+ - "last update: %v", - info.ChannelPoint, e1.LastUpdate) - } - if e2 != nil { - e2Zombie = time.Since(e2.LastUpdate) >= chanExpiry - log.Tracef("Edge #2 of ChannelPoint(%v) "+ - "last update: %v", - info.ChannelPoint, e2.LastUpdate) - } - if e1Zombie && e2Zombie { - log.Infof("ChannelPoint(%v) is a "+ - "zombie, collecting to prune", - info.ChannelPoint) - - // TODO(roasbeef): add ability to - // delete single directional edge - chansToPrune = append(chansToPrune, - info.ChannelPoint) - } - - return nil - } - err := r.cfg.Graph.ForEachChannel(filterPruneChans) - if err != nil { - log.Errorf("Unable to local zombie chans: %v", err) - continue - } - - log.Infof("Pruning %v Zombie Channels", len(chansToPrune)) - - // With the set zombie-like channels obtained, we'll do - // another pass to delete al zombie channels from the - // channel graph. - for _, chanToPrune := range chansToPrune { - log.Tracef("Pruning zombie chan ChannelPoint(%v)", - chanToPrune) - - err := r.cfg.Graph.DeleteChannelEdge(&chanToPrune) - if err != nil { - log.Errorf("Unable to prune zombie "+ - "chans: %v", err) - continue - } + if err := r.pruneZombieChans(); err != nil { + log.Errorf("unable to prune zombies: %v", err) } // The router has been signalled to exit, to we exit our main diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index 7878099ef..63bdfa55e 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -116,8 +116,8 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { v.chanAnnFinSignal[shortID] = annFinCond v.chanEdgeDependencies[shortID] = annFinCond - v.nodeAnnDependencies[Vertex(msg.NodeKey1)] = annFinCond - v.nodeAnnDependencies[Vertex(msg.NodeKey2)] = annFinCond + v.nodeAnnDependencies[Vertex(msg.NodeKey1Bytes)] = annFinCond + v.nodeAnnDependencies[Vertex(msg.NodeKey2Bytes)] = annFinCond } // These other types don't have any dependants, so no further @@ -168,7 +168,7 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) { shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) signal, ok = v.chanEdgeDependencies[shortID] case *channeldb.LightningNode: - vertex := Vertex(msg.PubKey) + vertex := Vertex(msg.PubKeyBytes) signal, ok = v.nodeAnnDependencies[vertex] case *lnwire.ChannelUpdate: signal, ok = v.chanEdgeDependencies[msg.ShortChannelID] @@ -234,7 +234,7 @@ func (v *ValidationBarrier) SignalDependants(job interface{}) { // map, as if we reach this point, then all dependants have already // finished executing and we can proceed. case *channeldb.LightningNode: - delete(v.nodeAnnDependencies, Vertex(msg.PubKey)) + delete(v.nodeAnnDependencies, Vertex(msg.PubKeyBytes)) case *lnwire.NodeAnnouncement: delete(v.nodeAnnDependencies, Vertex(msg.NodeID)) case *lnwire.ChannelUpdate: