From e075817e443e03b99effa0d7e558daf242946a7d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 22 Mar 2019 19:55:47 -0700 Subject: [PATCH] discovery: introduce GossipSyncer signal delivery of chansSynced state In this commit, we introduce another feature to the GossipSyncer in which it can deliver a signal to an external caller once it reaches its terminal chansSynced state. This is yet to be used, but will serve useful with a round-robin sync mechanism, where we wait for to finish syncing with a specific peer before moving on to the next. --- discovery/syncer.go | 29 +++++++++++++++++++++++ discovery/syncer_test.go | 50 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/discovery/syncer.go b/discovery/syncer.go index a9818ae0b..8ec322a58 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -271,6 +271,10 @@ type GossipSyncer struct { // number of queries. rateLimiter *rate.Limiter + // syncedSignal is a channel that, if set, will be closed when the + // GossipSyncer reaches its terminal chansSynced state. + syncedSignal chan struct{} + sync.Mutex quit chan struct{} @@ -470,6 +474,13 @@ func (g *GossipSyncer) channelGraphSyncer() { // This is our final terminal state where we'll only reply to // any further queries by the remote peer. case chansSynced: + g.Lock() + if g.syncedSignal != nil { + close(g.syncedSignal) + g.syncedSignal = nil + } + g.Unlock() + // If we haven't yet sent out our update horizon, and // we want to receive real-time channel updates, we'll // do so now. @@ -1049,6 +1060,24 @@ func (g *GossipSyncer) syncState() syncerState { return syncerState(atomic.LoadUint32(&g.state)) } +// ResetSyncedSignal returns a channel that will be closed in order to serve as +// a signal for when the GossipSyncer has reached its chansSynced state. +func (g *GossipSyncer) ResetSyncedSignal() chan struct{} { + g.Lock() + defer g.Unlock() + + syncedSignal := make(chan struct{}) + + syncState := syncerState(atomic.LoadUint32(&g.state)) + if syncState == chansSynced { + close(syncedSignal) + return syncedSignal + } + + g.syncedSignal = syncedSignal + return g.syncedSignal +} + // ProcessSyncTransition sends a request to the gossip syncer to transition its // sync type to a new one. // diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 3850e3641..ee5719d80 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -2140,3 +2140,53 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { t.Fatalf("expected to send a lnwire.QueryChannelRange message") } } + +// TestGossipSyncerSyncedSignal ensures that we receive a signal when a gossip +// syncer reaches its terminal chansSynced state. +func TestGossipSyncerSyncedSignal(t *testing.T) { + t.Parallel() + + // We'll create a new gossip syncer and manually override its state to + // chansSynced. + _, syncer, _ := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, + ) + syncer.setSyncState(chansSynced) + + // We'll go ahead and request a signal to be notified of when it reaches + // this state. + signalChan := syncer.ResetSyncedSignal() + + // Starting the gossip syncer should cause the signal to be delivered. + syncer.Start() + + select { + case <-signalChan: + case <-time.After(time.Second): + t.Fatal("expected to receive chansSynced signal") + } + + syncer.Stop() + + // We'll try this again, but this time we'll request the signal after + // the syncer is active and has already reached its chansSynced state. + _, syncer, _ = newTestSyncer( + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, + ) + + syncer.setSyncState(chansSynced) + + syncer.Start() + defer syncer.Stop() + + signalChan = syncer.ResetSyncedSignal() + + // The signal should be delivered immediately. + select { + case <-signalChan: + case <-time.After(time.Second): + t.Fatal("expected to receive chansSynced signal") + } +}