From 3854c1ed68a6170e394ba76485a0ab4497a5d4c2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 7 May 2018 19:10:44 -0700 Subject: [PATCH 1/4] discovery/gossiper: exit early on validation barrier quit --- discovery/gossiper.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a2e8ec9cc..90b845904 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -906,13 +906,26 @@ func (d *AuthenticatedGossiper) networkHandler() { // have thousands of goroutines active. validationBarrier.InitJobDependencies(announcement.msg) + d.wg.Add(1) go func() { + defer d.wg.Done() defer validationBarrier.CompleteJob() // If this message has an existing dependency, // then we'll wait until that has been fully // validated before we proceed. - validationBarrier.WaitForDependants(announcement.msg) + err := validationBarrier.WaitForDependants( + announcement.msg, + ) + if err != nil { + if err != routing.ErrVBarrierShuttingDown { + log.Warnf("unexpected error "+ + "during validation "+ + "barrier shutdown: %v", + err) + } + return + } // Process the network announcement to determine if // this is either a new announcement from our PoV From eaa8cdf916bbd8cbc754d36dd52ce0a7a7fe78cf Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 7 May 2018 16:27:34 -0700 Subject: [PATCH 2/4] routing/router: improve validation barrier shutdown This commit improves the shutdown of the router's pending validation tasks, by ensuring the pending tasks exit early if the validation barrier receives a shutdown request. Currently, any goroutines blocked by WaitForDependants will continue execution after a shutdown is signaled. This may lead to unnexpected behavior as the relation between updates is no longer upheld. It also has the side effect of slowing down shutdown, since we continue to process the remaining updates. To remedy this, WaitForDependants now returns an error that signals if a shutdown was requested. The blocked goroutines can exit early upon seeing this error, without also signaling completion of their task to the dependent tasks, which should will now properly wait to read the validation barrier's quit signal. --- routing/router.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/routing/router.go b/routing/router.go index 833989b88..3882eced6 100644 --- a/routing/router.go +++ b/routing/router.go @@ -637,31 +637,43 @@ func (r *ChannelRouter) networkHandler() { // A new fully validated network update has just arrived. As a // result we'll modify the channel graph accordingly depending // on the exact type of the message. - case updateMsg := <-r.networkUpdates: + case update := <-r.networkUpdates: // We'll set up any dependants, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(updateMsg.msg) + validationBarrier.InitJobDependencies(update.msg) + r.wg.Add(1) go func() { + defer r.wg.Done() defer validationBarrier.CompleteJob() // If this message has an existing dependency, // then we'll wait until that has been fully // validated before we proceed. - validationBarrier.WaitForDependants(updateMsg.msg) + err := validationBarrier.WaitForDependants( + update.msg, + ) + if err != nil { + if err != ErrVBarrierShuttingDown { + log.Warnf("unexpected error "+ + "during validation "+ + "barrier shutdown: %v", + err) + } + return + } // Process the routing update to determine if // this is either a new update from our PoV or // an update to a prior vertex/edge we // previously accepted. - err := r.processUpdate(updateMsg.msg) - updateMsg.err <- err + err = r.processUpdate(update.msg) + update.err <- err // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(updateMsg.msg) - + validationBarrier.SignalDependants(update.msg) if err != nil { return } @@ -669,8 +681,9 @@ func (r *ChannelRouter) networkHandler() { // Send off a new notification for the newly // accepted update. topChange := &TopologyChange{} - err = addToTopologyChange(r.cfg.Graph, topChange, - updateMsg.msg) + err = addToTopologyChange( + r.cfg.Graph, topChange, update.msg, + ) if err != nil { log.Errorf("unable to update topology "+ "change notification: %v", err) From 995e3fa85fea307608d2c9f5e35d10751312c770 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 7 May 2018 16:33:17 -0700 Subject: [PATCH 3/4] routing/validation_barrier: adds ErrVBarrierShuttingDown Adds a new error ErrVBarrierShuttingDown that is returned from WaitForDependants if the validation barrier's quit chan is closed. This allows any blocked goroutines to distinguish whether the dependent task has been completed, or if validation should be aborted entirely. --- routing/validation_barrier.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index b0a0aa854..ce4eb98f6 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -1,12 +1,18 @@ package routing import ( + "errors" "sync" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" ) +// ErrVBarrierShuttingDown signals that the barrier has been requested to +// shutdown, and that the caller should not treat the wait condition as +// fulfilled. +var ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") + // ValidationBarrier is a barrier used to ensure proper validation order while // concurrently validating new announcements for channel edges, and the // attributes of channel edges. It uses this set of maps (protected by this @@ -152,7 +158,7 @@ func (v *ValidationBarrier) CompleteJob() { // finished executing. This allows us a graceful way to schedule goroutines // based on any pending uncompleted dependent jobs. If this job doesn't have an // active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) { +func (v *ValidationBarrier) WaitForDependants(job interface{}) error { var ( signal chan struct{} @@ -181,13 +187,13 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) { case *lnwire.AnnounceSignatures: // TODO(roasbeef): need to wait on chan ann? v.Unlock() - return + return nil case *channeldb.ChannelEdgeInfo: v.Unlock() - return + return nil case *lnwire.ChannelAnnouncement: v.Unlock() - return + return nil } v.Unlock() @@ -196,10 +202,13 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) { if ok { select { case <-v.quit: - return + return ErrVBarrierShuttingDown case <-signal: + return nil } } + + return nil } // SignalDependants will signal any jobs that are dependent on this job that From 99e7ec08952cbadb26f690f04c85a79f497efbc2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 8 May 2018 16:10:43 -0700 Subject: [PATCH 4/4] routing/validation_barrier_test: test semaphore and quit Adds test checking the basic functionality of the barrier's semaphore and behavior during shutdown. --- routing/validation_barrier_test.go | 151 +++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 routing/validation_barrier_test.go diff --git a/routing/validation_barrier_test.go b/routing/validation_barrier_test.go new file mode 100644 index 000000000..6fd3930b3 --- /dev/null +++ b/routing/validation_barrier_test.go @@ -0,0 +1,151 @@ +package routing_test + +import ( + "encoding/binary" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing" +) + +// TestValidationBarrierSemaphore checks basic properties of the validation +// barrier's semaphore wrt. enqueuing/dequeuing. +func TestValidationBarrierSemaphore(t *testing.T) { + const ( + numTasks = 8 + numPendingTasks = 8 + timeout = 50 * time.Millisecond + ) + + quit := make(chan struct{}) + barrier := routing.NewValidationBarrier(numTasks, quit) + + // Saturate the semaphore with jobs. + for i := 0; i < numTasks; i++ { + barrier.InitJobDependencies(nil) + } + + // Spawn additional tasks that will signal completion when added. + jobAdded := make(chan struct{}) + for i := 0; i < numPendingTasks; i++ { + go func() { + barrier.InitJobDependencies(nil) + jobAdded <- struct{}{} + }() + } + + // Check that no jobs are added while semaphore is full. + select { + case <-time.After(timeout): + // Expected since no slots open. + case <-jobAdded: + t.Fatalf("job should not have been added") + } + + // Complete jobs one at a time and verify that they get added. + for i := 0; i < numPendingTasks; i++ { + barrier.CompleteJob() + + select { + case <-time.After(timeout): + t.Fatalf("timeout waiting for job to be added") + case <-jobAdded: + // Expected since one slot opened up. + } + } +} + +// TestValidationBarrierQuit checks that pending validation tasks will return an +// error from WaitForDependants if the barrier's quit signal is canceled. +func TestValidationBarrierQuit(t *testing.T) { + const ( + numTasks = 8 + timeout = 50 * time.Millisecond + ) + + quit := make(chan struct{}) + barrier := routing.NewValidationBarrier(2*numTasks, quit) + + // Create a set of unique channel announcements that we will prep for + // validation. + anns := make([]*lnwire.ChannelAnnouncement, 0, numTasks) + for i := 0; i < numTasks; i++ { + anns = append(anns, &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), + NodeID1: nodeIDFromInt(uint64(2 * i)), + NodeID2: nodeIDFromInt(uint64(2*i + 1)), + }) + barrier.InitJobDependencies(anns[i]) + } + + // Create a set of channel updates, that must wait until their + // associated channel announcement has been verified. + chanUpds := make([]*lnwire.ChannelUpdate, 0, numTasks) + for i := 0; i < numTasks; i++ { + chanUpds = append(chanUpds, &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), + }) + barrier.InitJobDependencies(chanUpds[i]) + } + + // Spawn additional tasks that will send the error returned after + // waiting for the announcements to finish. In the background, we will + // iteratively queue the channel updates, which will send back the error + // returned from waiting. + jobErrs := make(chan error) + for i := 0; i < numTasks; i++ { + go func(ii int) { + jobErrs <- barrier.WaitForDependants(chanUpds[ii]) + }(i) + } + + // Check that no jobs are added while semaphore is full. + select { + case <-time.After(timeout): + // Expected since no slots open. + case <-jobErrs: + t.Fatalf("job should not have been signaled") + } + + // Complete the first half of jobs, one at a time, verifying that they + // get signaled. Then, quit the barrier and check that all others exit + // with the correct error. + for i := 0; i < numTasks; i++ { + switch { + // First half, signal completion and task semaphore + case i < numTasks/2: + barrier.SignalDependants(anns[i]) + barrier.CompleteJob() + + // At midpoint, quit the validation barrier. + case i == numTasks/2: + close(quit) + } + + var err error + select { + case <-time.After(timeout): + t.Fatalf("timeout waiting for job to be signaled") + case err = <-jobErrs: + } + + switch { + // First half should return without failure. + case i < numTasks/2 && err != nil: + t.Fatalf("unexpected failure while waiting: %v", err) + + // Last half should return the shutdown error. + case i >= numTasks/2 && err != routing.ErrVBarrierShuttingDown: + t.Fatalf("expected failure after quitting: want %v, "+ + "got %v", routing.ErrVBarrierShuttingDown, err) + } + } +} + +// nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes. +func nodeIDFromInt(i uint64) [33]byte { + var nodeID [33]byte + binary.BigEndian.PutUint64(nodeID[:8], i) + return nodeID +}