From eaa8cdf916bbd8cbc754d36dd52ce0a7a7fe78cf Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 7 May 2018 16:27:34 -0700 Subject: [PATCH] routing/router: improve validation barrier shutdown This commit improves the shutdown of the router's pending validation tasks, by ensuring the pending tasks exit early if the validation barrier receives a shutdown request. Currently, any goroutines blocked by WaitForDependants will continue execution after a shutdown is signaled. This may lead to unnexpected behavior as the relation between updates is no longer upheld. It also has the side effect of slowing down shutdown, since we continue to process the remaining updates. To remedy this, WaitForDependants now returns an error that signals if a shutdown was requested. The blocked goroutines can exit early upon seeing this error, without also signaling completion of their task to the dependent tasks, which should will now properly wait to read the validation barrier's quit signal. --- routing/router.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/routing/router.go b/routing/router.go index 833989b88..3882eced6 100644 --- a/routing/router.go +++ b/routing/router.go @@ -637,31 +637,43 @@ func (r *ChannelRouter) networkHandler() { // A new fully validated network update has just arrived. As a // result we'll modify the channel graph accordingly depending // on the exact type of the message. - case updateMsg := <-r.networkUpdates: + case update := <-r.networkUpdates: // We'll set up any dependants, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(updateMsg.msg) + validationBarrier.InitJobDependencies(update.msg) + r.wg.Add(1) go func() { + defer r.wg.Done() defer validationBarrier.CompleteJob() // If this message has an existing dependency, // then we'll wait until that has been fully // validated before we proceed. - validationBarrier.WaitForDependants(updateMsg.msg) + err := validationBarrier.WaitForDependants( + update.msg, + ) + if err != nil { + if err != ErrVBarrierShuttingDown { + log.Warnf("unexpected error "+ + "during validation "+ + "barrier shutdown: %v", + err) + } + return + } // Process the routing update to determine if // this is either a new update from our PoV or // an update to a prior vertex/edge we // previously accepted. - err := r.processUpdate(updateMsg.msg) - updateMsg.err <- err + err = r.processUpdate(update.msg) + update.err <- err // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(updateMsg.msg) - + validationBarrier.SignalDependants(update.msg) if err != nil { return } @@ -669,8 +681,9 @@ func (r *ChannelRouter) networkHandler() { // Send off a new notification for the newly // accepted update. topChange := &TopologyChange{} - err = addToTopologyChange(r.cfg.Graph, topChange, - updateMsg.msg) + err = addToTopologyChange( + r.cfg.Graph, topChange, update.msg, + ) if err != nil { log.Errorf("unable to update topology "+ "change notification: %v", err)