From b237dbfd742b1618e09d38a480577c76727ca05c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 22 Nov 2022 05:14:07 +0800 Subject: [PATCH] discovery: add method `handleNetworkMessages` to process messages --- discovery/gossiper.go | 129 ++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 67 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f3c497025..5b9d025da 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1231,73 +1231,10 @@ func (d *AuthenticatedGossiper) networkHandler() { validationBarrier.InitJobDependencies(announcement.msg) d.wg.Add(1) - go func() { - defer d.wg.Done() - defer validationBarrier.CompleteJob() - - // If this message has an existing dependency, - // then we'll wait until that has been fully - // validated before we proceed. - err := validationBarrier.WaitForDependants( - announcement.msg, - ) - if err != nil { - log.Debugf("Validating network message %s got err: %v", - announcement.msg.MsgType(), err) - - if !routing.IsError( - err, - routing.ErrVBarrierShuttingDown, - routing.ErrParentValidationFailed, - ) { - - log.Warnf("unexpected error "+ - "during validation "+ - "barrier shutdown: %v", - err) - } - announcement.err <- err - return - } - - // Process the network announcement to - // determine if this is either a new - // announcement from our PoV or an edges to a - // prior vertex/edge we previously proceeded. - emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( - announcement, - ) - - log.Tracef("Processed network message %s, "+ - "returned len(announcements)=%v, "+ - "allowDependents=%v", - announcement.msg.MsgType(), - len(emittedAnnouncements), - allowDependents) - - // If this message had any dependencies, then - // we can now signal them to continue. - validationBarrier.SignalDependants( - announcement.msg, allowDependents, - ) - - // If the announcement was accepted, then add - // the emitted announcements to our announce - // batch to be broadcast once the trickle timer - // ticks gain. - if emittedAnnouncements != nil && shouldBroadcast { - // TODO(roasbeef): exclude peer that - // sent. - announcements.AddMsgs( - emittedAnnouncements..., - ) - } else if emittedAnnouncements != nil { - log.Trace("Skipping broadcast of " + - "announcements received " + - "during initial graph sync") - } - - }() + go d.handleNetworkMessages( + announcement, &announcements, + validationBarrier, shouldBroadcast, + ) // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements @@ -1362,6 +1299,64 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// handleNetworkMessages is responsible for waiting for dependencies for a +// given network message and processing the message. Once processed, it will +// signal its dependants and add the new announcements to the announce batch. +// +// NOTE: must be run as a goroutine. +func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, + deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier, + shouldBroadcast bool) { + + defer d.wg.Done() + defer vb.CompleteJob() + + // If this message has an existing dependency, then we'll wait until + // that has been fully validated before we proceed. + err := vb.WaitForDependants(nMsg.msg) + if err != nil { + log.Debugf("Validating network message %s got err: %v", + nMsg.msg.MsgType(), err) + + if !routing.IsError( + err, + routing.ErrVBarrierShuttingDown, + routing.ErrParentValidationFailed, + ) { + + log.Warnf("unexpected error during validation "+ + "barrier shutdown: %v", err) + } + nMsg.err <- err + + return + } + + // Process the network announcement to determine if this is either a + // new announcement from our PoV or an edges to a prior vertex/edge we + // previously proceeded. + newAnns, allow := d.processNetworkAnnouncement(nMsg) + + log.Tracef("Processed network message %s, returned "+ + "len(announcements)=%v, allowDependents=%v", + nMsg.msg.MsgType(), len(newAnns), allow) + + // If this message had any dependencies, then we can now signal them to + // continue. + vb.SignalDependants(nMsg.msg, allow) + + // If the announcement was accepted, then add the emitted announcements + // to our announce batch to be broadcast once the trickle timer ticks + // gain. + if newAnns != nil && shouldBroadcast { + // TODO(roasbeef): exclude peer that sent. + deDuped.AddMsgs(newAnns...) + } else if newAnns != nil { + log.Trace("Skipping broadcast of announcements received " + + "during initial graph sync") + } +} + // TODO(roasbeef): d/c peers that send updates not on our chain // InitSyncState is called by outside sub-systems when a connection is