diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 7d127ff5b..290e529bd 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -518,7 +518,7 @@ type AuthenticatedGossiper struct { chanUpdateRateLimiter map[uint64][2]*rate.Limiter // vb is used to enforce job dependency ordering of gossip messages. - vb *graph.ValidationBarrier + vb *ValidationBarrier sync.Mutex } @@ -545,7 +545,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper banman: newBanman(), } - gossiper.vb = graph.NewValidationBarrier(1000, gossiper.quit) + gossiper.vb = NewValidationBarrier(1000, gossiper.quit) gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, @@ -1543,7 +1543,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, jobID graph.JobID) { + deDuped *deDupedAnnouncements, jobID JobID) { defer d.wg.Done() defer d.vb.CompleteJob() @@ -1559,12 +1559,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) - if !graph.IsError( - err, - graph.ErrVBarrierShuttingDown, - graph.ErrParentValidationFailed, - ) { - + if errors.Is(err, ErrVBarrierShuttingDown) { log.Warnf("unexpected error during validation "+ "barrier shutdown: %v", err) } @@ -2423,7 +2418,6 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, err, graph.ErrOutdated, graph.ErrIgnored, - graph.ErrVBarrierShuttingDown, ) { log.Error(err) @@ -3164,7 +3158,6 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, if graph.IsError( err, graph.ErrOutdated, graph.ErrIgnored, - graph.ErrVBarrierShuttingDown, ) { log.Debugf("Update edge for short_chan_id(%v) got: %v", diff --git a/graph/validation_barrier.go b/discovery/validation_barrier.go similarity index 97% rename from graph/validation_barrier.go rename to discovery/validation_barrier.go index 9d20bcb57..bd3bc7039 100644 --- a/graph/validation_barrier.go +++ b/discovery/validation_barrier.go @@ -1,4 +1,4 @@ -package graph +package discovery import ( "fmt" @@ -11,6 +11,13 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) +var ( + // ErrVBarrierShuttingDown signals that the barrier has been requested + // to shutdown, and that the caller should not treat the wait condition + // as fulfilled. + ErrVBarrierShuttingDown = errors.New("ValidationBarrier shutting down") +) + // JobID identifies an active job in the validation barrier. It is large so // that we don't need to worry about overflows. type JobID uint64 @@ -88,7 +95,7 @@ func NewValidationBarrier(numActiveReqs int, } // InitJobDependencies will wait for a new job slot to become open, and then -// sets up any dependent signals/trigger for the new job +// sets up any dependent signals/trigger for the new job. func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID, error) { @@ -296,8 +303,7 @@ func (v *ValidationBarrier) WaitForParents(childJobID JobID, for { select { case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") + return ErrVBarrierShuttingDown case <-jobChan: // Every time this is sent on or if it's closed, a diff --git a/graph/validation_barrier_test.go b/discovery/validation_barrier_test.go similarity index 88% rename from graph/validation_barrier_test.go rename to discovery/validation_barrier_test.go index e6224d7f0..bc58fe9ec 100644 --- a/graph/validation_barrier_test.go +++ b/discovery/validation_barrier_test.go @@ -1,11 +1,12 @@ -package graph_test +package discovery import ( "encoding/binary" + "errors" + "sync" "testing" "time" - "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/require" ) @@ -22,18 +23,38 @@ func TestValidationBarrierSemaphore(t *testing.T) { ) quit := make(chan struct{}) - barrier := graph.NewValidationBarrier(numTasks, quit) + barrier := NewValidationBarrier(numTasks, quit) + + var scidMtx sync.RWMutex + currentScid := lnwire.ShortChannelID{} // Saturate the semaphore with jobs. for i := 0; i < numTasks; i++ { - barrier.InitJobDependencies(nil) + scidMtx.Lock() + dummyUpdate := &lnwire.ChannelUpdate1{ + ShortChannelID: currentScid, + } + currentScid.TxIndex++ + scidMtx.Unlock() + + _, err := barrier.InitJobDependencies(dummyUpdate) + require.NoError(t, err) } // Spawn additional tasks that will signal completion when added. jobAdded := make(chan struct{}) for i := 0; i < numPendingTasks; i++ { go func() { - barrier.InitJobDependencies(nil) + scidMtx.Lock() + dummyUpdate := &lnwire.ChannelUpdate1{ + ShortChannelID: currentScid, + } + currentScid.TxIndex++ + scidMtx.Unlock() + + _, err := barrier.InitJobDependencies(dummyUpdate) + require.NoError(t, err) + jobAdded <- struct{}{} }() } @@ -70,12 +91,12 @@ func TestValidationBarrierQuit(t *testing.T) { ) quit := make(chan struct{}) - barrier := graph.NewValidationBarrier(2*numTasks, quit) + barrier := NewValidationBarrier(2*numTasks, quit) // Create a set of unique channel announcements that we will prep for // validation. anns := make([]*lnwire.ChannelAnnouncement1, 0, numTasks) - parentJobIDs := make([]graph.JobID, 0, numTasks) + parentJobIDs := make([]JobID, 0, numTasks) for i := 0; i < numTasks; i++ { anns = append(anns, &lnwire.ChannelAnnouncement1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), @@ -91,7 +112,7 @@ func TestValidationBarrierQuit(t *testing.T) { // Create a set of channel updates, that must wait until their // associated channel announcement has been verified. chanUpds := make([]*lnwire.ChannelUpdate1, 0, numTasks) - childJobIDs := make([]graph.JobID, 0, numTasks) + childJobIDs := make([]JobID, 0, numTasks) for i := 0; i < numTasks; i++ { chanUpds = append(chanUpds, &lnwire.ChannelUpdate1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), @@ -154,11 +175,12 @@ func TestValidationBarrierQuit(t *testing.T) { t.Fatalf("unexpected failure while waiting: %v", err) // Last half should return the shutdown error. - case i >= numTasks/2 && !graph.IsError( - err, graph.ErrVBarrierShuttingDown, + case i >= numTasks/2 && !errors.Is( + err, ErrVBarrierShuttingDown, ): + t.Fatalf("expected failure after quitting: want %v, "+ - "got %v", graph.ErrVBarrierShuttingDown, err) + "got %v", ErrVBarrierShuttingDown, err) } } } @@ -175,7 +197,7 @@ func TestValidationBarrierParentJobsClear(t *testing.T) { ) quit := make(chan struct{}) - barrier := graph.NewValidationBarrier(numTasks, quit) + barrier := NewValidationBarrier(numTasks, quit) sharedScid := lnwire.NewShortChanIDFromInt(0) sharedNodeID := nodeIDFromInt(0) @@ -221,8 +243,8 @@ func TestValidationBarrierParentJobsClear(t *testing.T) { childID2, err := barrier.InitJobDependencies(node1) require.NoError(t, err) - run := func(vb *graph.ValidationBarrier, childJobID graph.JobID, - job interface{}, resp chan error, start chan error) { + run := func(vb *ValidationBarrier, childJobID JobID, job interface{}, + resp chan error, start chan error) { close(start) diff --git a/graph/errors.go b/graph/errors.go index 0a1d6fd24..30996bb94 100644 --- a/graph/errors.go +++ b/graph/errors.go @@ -28,15 +28,6 @@ const ( // ErrInvalidFundingOutput is returned if the channel funding output // fails validation. ErrInvalidFundingOutput - - // ErrVBarrierShuttingDown signals that the barrier has been requested - // to shutdown, and that the caller should not treat the wait condition - // as fulfilled. - ErrVBarrierShuttingDown - - // ErrParentValidationFailed signals that the validation of a - // dependent's parent failed, so the dependent must not be processed. - ErrParentValidationFailed ) // Error is a structure that represent the error inside the graph package,