Merge pull request #3355 from wpaulino/is-graph-synced

discovery+rpc: expose graph synced status within GetInfo
This commit is contained in:
Olaoluwa Osuntokun 2019-08-12 18:22:45 -07:00 committed by GitHub
commit 4e62e8ae67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 636 additions and 508 deletions

View File

@ -3,6 +3,7 @@ package discovery
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
@ -100,6 +101,16 @@ type SyncManagerCfg struct {
// attempt a historical sync to ensure we have as much of the public channel // attempt a historical sync to ensure we have as much of the public channel
// graph as possible. // graph as possible.
type SyncManager struct { type SyncManager struct {
// initialHistoricalSyncCompleted serves as a barrier when initializing
// new active GossipSyncers. If 0, the initial historical sync has not
// completed, so we'll defer initializing any active GossipSyncers. If
// 1, then we can transition the GossipSyncer immediately. We set up
// this barrier to ensure we have most of the graph before attempting to
// accept new updates at tip.
//
// NOTE: This must be used atomically.
initialHistoricalSyncCompleted int32
start sync.Once start sync.Once
stop sync.Once stop sync.Once
@ -192,15 +203,6 @@ func (m *SyncManager) syncerHandler() {
defer m.cfg.HistoricalSyncTicker.Stop() defer m.cfg.HistoricalSyncTicker.Stop()
var ( var (
// initialHistoricalSyncCompleted serves as a barrier when
// initializing new active GossipSyncers. If false, the initial
// historical sync has not completed, so we'll defer
// initializing any active GossipSyncers. If true, then we can
// transition the GossipSyncer immediately. We set up this
// barrier to ensure we have most of the graph before attempting
// to accept new updates at tip.
initialHistoricalSyncCompleted = false
// initialHistoricalSyncer is the syncer we are currently // initialHistoricalSyncer is the syncer we are currently
// performing an initial historical sync with. // performing an initial historical sync with.
initialHistoricalSyncer *GossipSyncer initialHistoricalSyncer *GossipSyncer
@ -251,10 +253,10 @@ func (m *SyncManager) syncerHandler() {
fallthrough fallthrough
// If the initial historical sync has yet to complete, // If the initial historical sync has yet to complete,
// then we'll declare is as passive and attempt to // then we'll declare it as passive and attempt to
// transition it when the initial historical sync // transition it when the initial historical sync
// completes. // completes.
case !initialHistoricalSyncCompleted: case !m.IsGraphSynced():
s.setSyncType(PassiveSync) s.setSyncType(PassiveSync)
m.inactiveSyncers[s.cfg.peerPub] = s m.inactiveSyncers[s.cfg.peerPub] = s
@ -279,7 +281,7 @@ func (m *SyncManager) syncerHandler() {
if !attemptHistoricalSync { if !attemptHistoricalSync {
continue continue
} }
initialHistoricalSyncCompleted = false m.markGraphSyncing()
log.Debugf("Attempting initial historical sync with "+ log.Debugf("Attempting initial historical sync with "+
"GossipSyncer(%x)", s.cfg.peerPub) "GossipSyncer(%x)", s.cfg.peerPub)
@ -344,7 +346,7 @@ func (m *SyncManager) syncerHandler() {
case <-initialHistoricalSyncSignal: case <-initialHistoricalSyncSignal:
initialHistoricalSyncer = nil initialHistoricalSyncer = nil
initialHistoricalSyncSignal = nil initialHistoricalSyncSignal = nil
initialHistoricalSyncCompleted = true m.markGraphSynced()
log.Debug("Initial historical sync completed") log.Debug("Initial historical sync completed")
@ -379,7 +381,21 @@ func (m *SyncManager) syncerHandler() {
// Our HistoricalSyncTicker has ticked, so we'll randomly select // Our HistoricalSyncTicker has ticked, so we'll randomly select
// a peer and force a historical sync with them. // a peer and force a historical sync with them.
case <-m.cfg.HistoricalSyncTicker.Ticks(): case <-m.cfg.HistoricalSyncTicker.Ticks():
m.forceHistoricalSync() s := m.forceHistoricalSync()
// If we've already performed our initial historical
// sync, then we have nothing left to do.
if m.IsGraphSynced() {
continue
}
// Otherwise, we'll track the peer we've performed a
// historical sync with in order to handle the case
// where our previous historical sync peer did not
// respond to our queries and we haven't ingested as
// much of the graph as we should.
initialHistoricalSyncer = s
initialHistoricalSyncSignal = s.ResetSyncedSignal()
case <-m.quit: case <-m.quit:
return return
@ -667,3 +683,22 @@ func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
return syncers return syncers
} }
// markGraphSynced allows us to report that the initial historical sync has
// completed.
func (m *SyncManager) markGraphSynced() {
atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
}
// markGraphSyncing allows us to report that the initial historical sync is
// still undergoing.
func (m *SyncManager) markGraphSyncing() {
atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 0)
}
// IsGraphSynced determines whether we've completed our initial historical sync.
// The initial historical sync is done to ensure we've ingested as much of the
// public graph as possible.
func (m *SyncManager) IsGraphSynced() bool {
return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
}

View File

@ -185,6 +185,13 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
t.Parallel() t.Parallel()
syncMgr := newTestSyncManager(0) syncMgr := newTestSyncManager(0)
// The graph should not be considered as synced since the sync manager
// has yet to start.
if syncMgr.IsGraphSynced() {
t.Fatal("expected graph to not be considered as synced")
}
syncMgr.Start() syncMgr.Start()
defer syncMgr.Stop() defer syncMgr.Stop()
@ -198,6 +205,12 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
NumBlocks: math.MaxUint32, NumBlocks: math.MaxUint32,
}) })
// The graph should not be considered as synced since the initial
// historical sync has not finished.
if syncMgr.IsGraphSynced() {
t.Fatal("expected graph to not be considered as synced")
}
// If an additional peer connects, then another historical sync should // If an additional peer connects, then another historical sync should
// not be attempted. // not be attempted.
finalHistoricalPeer := randPeer(t, syncMgr.quit) finalHistoricalPeer := randPeer(t, syncMgr.quit)
@ -208,7 +221,14 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
// If we disconnect the peer performing the initial historical sync, a // If we disconnect the peer performing the initial historical sync, a
// new one should be chosen. // new one should be chosen.
syncMgr.PruneSyncState(peer.PubKey()) syncMgr.PruneSyncState(peer.PubKey())
// Complete the initial historical sync by transitionining the syncer to
// its final chansSynced state. The graph should be considered as synced
// after the fact.
assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer) assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
if !syncMgr.IsGraphSynced() {
t.Fatal("expected graph to be considered as synced")
}
// Once the initial historical sync has succeeded, another one should // Once the initial historical sync has succeeded, another one should
// not be attempted by disconnecting the peer who performed it. // not be attempted by disconnecting the peer who performed it.
@ -289,6 +309,58 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) {
}) })
} }
// TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement ensures that the
// sync manager properly marks the graph as synced given that our initial
// historical sync has stalled, but a replacement has fully completed.
func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(0)
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
// FirstBlockHeight of the genesis block, signaling that an initial
// historical sync is being attempted.
peer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(peer)
assertMsgSent(t, peer, &lnwire.QueryChannelRange{
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
})
// The graph should not be considered as synced since the initial
// historical sync has not finished.
if syncMgr.IsGraphSynced() {
t.Fatal("expected graph to not be considered as synced")
}
// If an additional peer connects, then another historical sync should
// not be attempted.
finalHistoricalPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(finalHistoricalPeer)
finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
assertNoMsgSent(t, finalHistoricalPeer)
// To simulate that our initial historical sync has stalled, we'll force
// a historical sync with the new peer to ensure it is replaced.
syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
// The graph should still not be considered as synced since the
// replacement historical sync has not finished.
if syncMgr.IsGraphSynced() {
t.Fatal("expected graph to not be considered as synced")
}
// Complete the replacement historical sync by transitioning the syncer
// to its final chansSynced state. The graph should be considered as
// synced after the fact.
assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
if !syncMgr.IsGraphSynced() {
t.Fatal("expected graph to be considered as synced")
}
}
// TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers // TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers
// are initialized as ActiveSync until the initial historical sync has been // are initialized as ActiveSync until the initial historical sync has been
// completed. Once it does, the pending GossipSyncers should be transitioned to // completed. Once it does, the pending GossipSyncers should be transitioned to

File diff suppressed because it is too large Load Diff

View File

@ -1343,6 +1343,9 @@ message GetInfoResponse {
/// The color of the current node in hex code format /// The color of the current node in hex code format
string color = 17 [json_name = "color"]; string color = 17 [json_name = "color"];
// Whether we consider ourselves synced with the public channel graph.
bool synced_to_graph = 18 [json_name = "synced_to_graph"];
} }
message Chain { message Chain {

View File

@ -2218,6 +2218,11 @@
"color": { "color": {
"type": "string", "type": "string",
"title": "/ The color of the current node in hex code format" "title": "/ The color of the current node in hex code format"
},
"synced_to_graph": {
"type": "boolean",
"format": "boolean",
"description": "Whether we consider ourselves synced with the public channel graph."
} }
} }
}, },

View File

@ -2038,6 +2038,8 @@ func (r *rpcServer) GetInfo(ctx context.Context,
uris[i] = fmt.Sprintf("%s@%s", encodedIDPub, addr.String()) uris[i] = fmt.Sprintf("%s@%s", encodedIDPub, addr.String())
} }
isGraphSynced := r.server.authGossiper.SyncManager().IsGraphSynced()
// TODO(roasbeef): add synced height n stuff // TODO(roasbeef): add synced height n stuff
return &lnrpc.GetInfoResponse{ return &lnrpc.GetInfoResponse{
IdentityPubkey: encodedIDPub, IdentityPubkey: encodedIDPub,
@ -2055,6 +2057,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
Color: routing.EncodeHexColor(nodeAnn.RGBColor), Color: routing.EncodeHexColor(nodeAnn.RGBColor),
BestHeaderTimestamp: int64(bestHeaderTimestamp), BestHeaderTimestamp: int64(bestHeaderTimestamp),
Version: build.Version(), Version: build.Version(),
SyncedToGraph: isGraphSynced,
}, nil }, nil
} }