From 29c24588317dd592d5ad641443097a8045c9c633 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 16 Nov 2022 04:01:38 +0800 Subject: [PATCH] routing: add method `handleNetworkUpdate` to process the network update This commit refactors the `networkHandler` to use the new method `handleNetworkUpdate`. Because the `select` is called inside a for loop, which is equivalent of firing goroutine inside range loop, it's possible that a variable used inside a previous goroutine is referencing the current one. This is now fixed by making the goroutine taking the params used for network update. --- routing/router.go | 143 +++++++++++++++++++++++----------------------- 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/routing/router.go b/routing/router.go index 8d61a25f2..0588d5122 100644 --- a/routing/router.go +++ b/routing/router.go @@ -991,6 +991,78 @@ func (r *ChannelRouter) pruneZombieChans() error { return nil } +// handleNetworkUpdate is responsible for processing the update message and +// notifies topology changes, if any. +// +// NOTE: must be run inside goroutine. +func (r *ChannelRouter) handleNetworkUpdate(vb *ValidationBarrier, + update *routingMsg) { + + defer r.wg.Done() + defer vb.CompleteJob() + + // If this message has an existing dependency, then we'll wait until + // that has been fully validated before we proceed. + err := vb.WaitForDependants(update.msg) + if err != nil { + switch { + case IsError(err, ErrVBarrierShuttingDown): + update.err <- err + + case IsError(err, ErrParentValidationFailed): + update.err <- newErrf(ErrIgnored, err.Error()) + + default: + log.Warnf("unexpected error during validation "+ + "barrier shutdown: %v", err) + update.err <- err + } + + return + } + + // Process the routing update to determine if this is either a new + // update from our PoV or an update to a prior vertex/edge we + // previously accepted. + err = r.processUpdate(update.msg, update.op...) + update.err <- err + + // If this message had any dependencies, then we can now signal them to + // continue. + allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated) + vb.SignalDependants(update.msg, allowDependents) + + // If the error is not nil here, there's no need to send topology + // change. + if err != nil { + // We now decide to log an error or not. If allowDependents is + // false, it means there is an error and the error is neither + // ErrIgnored or ErrOutdated. In this case, we'll log an error. + // Otherwise, we'll add debug log only. + if allowDependents { + log.Debugf("process network updates got: %v", err) + } else { + log.Errorf("process network updates got: %v", err) + } + + return + } + + // Otherwise, we'll send off a new notification for the newly accepted + // update, if any. + topChange := &TopologyChange{} + err = addToTopologyChange(r.cfg.Graph, topChange, update.msg) + if err != nil { + log.Errorf("unable to update topology change notification: %v", + err) + return + } + + if !topChange.isEmpty() { + r.notifyTopologyChange(topChange) + } +} + // networkHandler is the primary goroutine for the ChannelRouter. The roles of // this goroutine include answering queries related to the state of the // network, pruning the graph on new block notification, applying network @@ -1050,76 +1122,7 @@ func (r *ChannelRouter) networkHandler() { validationBarrier.InitJobDependencies(update.msg) r.wg.Add(1) - go func() { - defer r.wg.Done() - defer validationBarrier.CompleteJob() - - // If this message has an existing dependency, - // then we'll wait until that has been fully - // validated before we proceed. - err := validationBarrier.WaitForDependants( - update.msg, - ) - if err != nil { - switch { - case IsError( - err, ErrVBarrierShuttingDown, - ): - update.err <- err - - case IsError( - err, ErrParentValidationFailed, - ): - update.err <- newErrf( - ErrIgnored, err.Error(), - ) - - default: - log.Warnf("unexpected error "+ - "during validation "+ - "barrier shutdown: %v", - err) - update.err <- err - } - return - } - - // Process the routing update to determine if - // this is either a new update from our PoV or - // an update to a prior vertex/edge we - // previously accepted. - err = r.processUpdate(update.msg, update.op...) - update.err <- err - - // If this message had any dependencies, then - // we can now signal them to continue. - allowDependents := err == nil || - IsError(err, ErrIgnored, ErrOutdated) - validationBarrier.SignalDependants( - update.msg, allowDependents, - ) - if err != nil { - log.Debugf("process network updates "+ - "got: %v", err) - return - } - - // Send off a new notification for the newly - // accepted update. - topChange := &TopologyChange{} - err = addToTopologyChange( - r.cfg.Graph, topChange, update.msg, - ) - if err != nil { - log.Errorf("unable to update topology "+ - "change notification: %v", err) - return - } - - if !topChange.isEmpty() { - r.notifyTopologyChange(topChange) - } - }() + go r.handleNetworkUpdate(validationBarrier, update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding