routing: add method handleNetworkUpdate to process the network update

This commit refactors the `networkHandler` to use the new method
`handleNetworkUpdate`. Because the `select` is called inside a for loop,
which is equivalent of firing goroutine inside range loop, it's possible
that a variable used inside a previous goroutine is referencing the
current one. This is now fixed by making the goroutine taking the params
used for network update.
This commit is contained in:
yyforyongyu
2022-11-16 04:01:38 +08:00
parent 26d1a41926
commit 29c2458831

View File

@@ -991,6 +991,78 @@ func (r *ChannelRouter) pruneZombieChans() error {
return nil
}
// handleNetworkUpdate is responsible for processing the update message and
// notifies topology changes, if any.
//
// NOTE: must be run inside goroutine.
func (r *ChannelRouter) handleNetworkUpdate(vb *ValidationBarrier,
update *routingMsg) {
defer r.wg.Done()
defer vb.CompleteJob()
// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(update.msg)
if err != nil {
switch {
case IsError(err, ErrVBarrierShuttingDown):
update.err <- err
case IsError(err, ErrParentValidationFailed):
update.err <- newErrf(ErrIgnored, err.Error())
default:
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
update.err <- err
}
return
}
// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
err = r.processUpdate(update.msg, update.op...)
update.err <- err
// If this message had any dependencies, then we can now signal them to
// continue.
allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
vb.SignalDependants(update.msg, allowDependents)
// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// We now decide to log an error or not. If allowDependents is
// false, it means there is an error and the error is neither
// ErrIgnored or ErrOutdated. In this case, we'll log an error.
// Otherwise, we'll add debug log only.
if allowDependents {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
}
return
}
// Otherwise, we'll send off a new notification for the newly accepted
// update, if any.
topChange := &TopologyChange{}
err = addToTopologyChange(r.cfg.Graph, topChange, update.msg)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if !topChange.isEmpty() {
r.notifyTopologyChange(topChange)
}
}
// networkHandler is the primary goroutine for the ChannelRouter. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
@@ -1050,76 +1122,7 @@ func (r *ChannelRouter) networkHandler() {
validationBarrier.InitJobDependencies(update.msg)
r.wg.Add(1)
go func() {
defer r.wg.Done()
defer validationBarrier.CompleteJob()
// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
err := validationBarrier.WaitForDependants(
update.msg,
)
if err != nil {
switch {
case IsError(
err, ErrVBarrierShuttingDown,
):
update.err <- err
case IsError(
err, ErrParentValidationFailed,
):
update.err <- newErrf(
ErrIgnored, err.Error(),
)
default:
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
update.err <- err
}
return
}
// Process the routing update to determine if
// this is either a new update from our PoV or
// an update to a prior vertex/edge we
// previously accepted.
err = r.processUpdate(update.msg, update.op...)
update.err <- err
// If this message had any dependencies, then
// we can now signal them to continue.
allowDependents := err == nil ||
IsError(err, ErrIgnored, ErrOutdated)
validationBarrier.SignalDependants(
update.msg, allowDependents,
)
if err != nil {
log.Debugf("process network updates "+
"got: %v", err)
return
}
// Send off a new notification for the newly
// accepted update.
topChange := &TopologyChange{}
err = addToTopologyChange(
r.cfg.Graph, topChange, update.msg,
)
if err != nil {
log.Errorf("unable to update topology "+
"change notification: %v", err)
return
}
if !topChange.isEmpty() {
r.notifyTopologyChange(topChange)
}
}()
go r.handleNetworkUpdate(validationBarrier, update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding