diff --git a/chainio/dispatcher.go b/chainio/dispatcher.go index 269bb1892..3f22948bd 100644 --- a/chainio/dispatcher.go +++ b/chainio/dispatcher.go @@ -49,14 +49,35 @@ type BlockbeatDispatcher struct { // quit is used to signal the BlockbeatDispatcher to stop. quit chan struct{} + + // queryHeightChan is used to receive queries on the current height of + // the dispatcher. + queryHeightChan chan *query +} + +// query is used to fetch the internal state of the dispatcher. +type query struct { + // respChan is used to send back the current height back to the caller. + // + // NOTE: This channel must be buffered. + respChan chan int32 +} + +// newQuery creates a query to be used to fetch the internal state of the +// dispatcher. +func newQuery() *query { + return &query{ + respChan: make(chan int32, 1), + } } // NewBlockbeatDispatcher returns a new blockbeat dispatcher instance. func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher { return &BlockbeatDispatcher{ - notifier: n, - quit: make(chan struct{}), - consumerQueues: make(map[uint32][]Consumer), + notifier: n, + quit: make(chan struct{}), + consumerQueues: make(map[uint32][]Consumer), + queryHeightChan: make(chan *query, 1), } } @@ -161,6 +182,18 @@ func (b *BlockbeatDispatcher) dispatchBlocks( b.log().Infof("Notified all consumers on new block "+ "in %v", time.Since(start)) + // A query has been made to fetch the current height, we now + // send the height from its current beat. + case query := <-b.queryHeightChan: + // The beat may not be set yet, e.g., during the startup + // the query is made before the block epoch being sent. + height := int32(0) + if b.beat != nil { + height = b.beat.Height() + } + + query.respChan <- height + case <-b.quit: b.log().Debugf("BlockbeatDispatcher quit signal " + "received") @@ -170,6 +203,30 @@ func (b *BlockbeatDispatcher) dispatchBlocks( } } +// CurrentHeight returns the current best height known to the dispatcher. 0 is +// returned if the dispatcher is shutting down. +func (b *BlockbeatDispatcher) CurrentHeight() int32 { + query := newQuery() + + select { + case b.queryHeightChan <- query: + + case <-b.quit: + clog.Debugf("BlockbeatDispatcher quit before query") + return 0 + } + + select { + case height := <-query.respChan: + clog.Debugf("Responded current height: %v", height) + return height + + case <-b.quit: + clog.Debugf("BlockbeatDispatcher quit before response") + return 0 + } +} + // notifyQueues notifies each queue concurrently about the latest block epoch. func (b *BlockbeatDispatcher) notifyQueues() error { // errChans is a map of channels that will be used to receive errors diff --git a/chainio/dispatcher_test.go b/chainio/dispatcher_test.go index 1cab5b998..eb1dedb6f 100644 --- a/chainio/dispatcher_test.go +++ b/chainio/dispatcher_test.go @@ -381,3 +381,60 @@ func TestNotifyQueuesError(t *testing.T) { err := b.notifyQueues() require.ErrorIs(t, err, errDummy) } + +// TestCurrentHeight asserts `CurrentHeight` returns the expected block height. +func TestCurrentHeight(t *testing.T) { + t.Parallel() + + testHeight := int32(1000) + + // Create a mock chain notifier. + mockNotifier := &chainntnfs.MockChainNotifier{} + defer mockNotifier.AssertExpectations(t) + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + mockBeat.On("Height").Return(testHeight).Once() + + // Create a mock consumer. + consumer := &MockConsumer{} + defer consumer.AssertExpectations(t) + consumer.On("Name").Return("mocker1") + + // Create one queue. + queue := []Consumer{consumer} + + // Create a new dispatcher. + b := NewBlockbeatDispatcher(mockNotifier) + + // Register the queues. + b.RegisterQueue(queue) + + // Attach the blockbeat. + b.beat = mockBeat + + // Mock the chain notifier to return a valid notifier. + blockEpochs := &chainntnfs.BlockEpochEvent{ + Cancel: func() {}, + } + mockNotifier.On("RegisterBlockEpochNtfn", + mock.Anything).Return(blockEpochs, nil).Once() + + // Start the dispatcher now should not return an error. + err := b.Start() + require.NoError(t, err) + + // Make a query on the current height and assert it equals to + // testHeight. + height := b.CurrentHeight() + require.Equal(t, testHeight, height) + + // Stop the dispatcher. + b.Stop() + + // Make a query on the current height and assert it equals to 0. + height = b.CurrentHeight() + require.Zero(t, height) +} diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index f5dadbe57..45216abd8 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -170,6 +170,10 @@ use the configured budget values for HTLCs (first level sweep) in parcticular `--sweeper.budget.deadlinehtlcratio` and `--sweeper.budget.deadlinehtlc`. +* When deciding whether `lnd` is synced to chain, the current height from the + blockbeat dispatcher is now also [taken into + consideration](https://github.com/lightningnetwork/lnd/pull/9501). + ## RPC Updates * Some RPCs that previously just returned an empty response message now at least diff --git a/rpcserver.go b/rpcserver.go index 2e17eb75c..e096d31f8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3225,28 +3225,10 @@ func (r *rpcServer) GetInfo(_ context.Context, idPub := r.server.identityECDH.PubKey().SerializeCompressed() encodedIDPub := hex.EncodeToString(idPub) - bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock() + // Get the system's chain sync info. + syncInfo, err := r.getChainSyncInfo() if err != nil { - return nil, fmt.Errorf("unable to get best block info: %w", err) - } - - isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced() - if err != nil { - return nil, fmt.Errorf("unable to sync PoV of the wallet "+ - "with current best block in the main chain: %v", err) - } - - // If the router does full channel validation, it has a lot of work to - // do for each block. So it might be possible that it isn't yet up to - // date with the most recent block, even if the wallet is. This can - // happen in environments with high CPU load (such as parallel itests). - // Since the `synced_to_chain` flag in the response of this call is used - // by many wallets (and also our itests) to make sure everything's up to - // date, we add the router's state to it. So the flag will only toggle - // to true once the router was also able to catch up. - if !r.cfg.Routing.AssumeChannelValid { - routerHeight := r.server.graphBuilder.SyncedHeight() - isSynced = isSynced && uint32(bestHeight) == routerHeight + return nil, err } network := lncfg.NormalizeNetwork(r.cfg.ActiveNetParams.Name) @@ -3297,15 +3279,15 @@ func (r *rpcServer) GetInfo(_ context.Context, NumActiveChannels: activeChannels, NumInactiveChannels: inactiveChannels, NumPeers: uint32(len(serverPeers)), - BlockHeight: uint32(bestHeight), - BlockHash: bestHash.String(), - SyncedToChain: isSynced, + BlockHeight: uint32(syncInfo.bestHeight), + BlockHash: syncInfo.blockHash.String(), + SyncedToChain: syncInfo.isSynced, Testnet: isTestNet, Chains: activeChains, Uris: uris, Alias: nodeAnn.Alias.String(), Color: nodeColor, - BestHeaderTimestamp: bestHeaderTimestamp, + BestHeaderTimestamp: syncInfo.timestamp, Version: version, CommitHash: build.CommitHash, SyncedToGraph: isGraphSynced, @@ -8929,3 +8911,81 @@ func rpcInitiator(isInitiator bool) lnrpc.Initiator { return lnrpc.Initiator_INITIATOR_REMOTE } + +// chainSyncInfo wraps info about the best block and whether the system is +// synced to that block. +type chainSyncInfo struct { + // isSynced specifies whether the whole system is considered synced. + // When true, it means the following subsystems are at the best height + // reported by the chain backend, + // - wallet. + // - channel graph. + // - blockbeat dispatcher. + isSynced bool + + // bestHeight is the current height known to the chain backend. + bestHeight int32 + + // blockHash is the hash of the current block known to the chain + // backend. + blockHash chainhash.Hash + + // timestamp is the block's timestamp the wallet has synced to. + timestamp int64 +} + +// getChainSyncInfo queries the chain backend, the wallet, the channel router +// and the blockbeat dispatcher to determine the best block and whether the +// system is considered synced. +func (r *rpcServer) getChainSyncInfo() (*chainSyncInfo, error) { + bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock() + if err != nil { + return nil, fmt.Errorf("unable to get best block info: %w", err) + } + + isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced() + if err != nil { + return nil, fmt.Errorf("unable to sync PoV of the wallet "+ + "with current best block in the main chain: %v", err) + } + + // Create an info to be returned. + info := &chainSyncInfo{ + isSynced: isSynced, + bestHeight: bestHeight, + blockHash: *bestHash, + timestamp: bestHeaderTimestamp, + } + + // Exit early if the wallet is not synced. + if !isSynced { + return info, nil + } + + // If the router does full channel validation, it has a lot of work to + // do for each block. So it might be possible that it isn't yet up to + // date with the most recent block, even if the wallet is. This can + // happen in environments with high CPU load (such as parallel itests). + // Since the `synced_to_chain` flag in the response of this call is used + // by many wallets (and also our itests) to make sure everything's up to + // date, we add the router's state to it. So the flag will only toggle + // to true once the router was also able to catch up. + if !r.cfg.Routing.AssumeChannelValid { + routerHeight := r.server.graphBuilder.SyncedHeight() + isSynced = uint32(bestHeight) == routerHeight + } + + // Exit early if the channel graph is not synced. + if !isSynced { + return info, nil + } + + // Given the wallet and the channel router are synced, we now check + // whether the blockbeat dispatcher is synced. + height := r.server.blockbeatDispatcher.CurrentHeight() + + // Overwrite isSynced and return. + info.isSynced = height == bestHeight + + return info, nil +}