From 89c4a8dfd7dffe0fc50a277cd07b7e9f67560b98 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 11 Feb 2025 14:24:20 +0800 Subject: [PATCH 1/3] chainio: add method `CurrentHeight` Add a new method `CurrentHeight` to query the current best height of the dispatcher. --- chainio/dispatcher.go | 63 ++++++++++++++++++++++++++++++++++++-- chainio/dispatcher_test.go | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 3 deletions(-) diff --git a/chainio/dispatcher.go b/chainio/dispatcher.go index 269bb1892..3f22948bd 100644 --- a/chainio/dispatcher.go +++ b/chainio/dispatcher.go @@ -49,14 +49,35 @@ type BlockbeatDispatcher struct { // quit is used to signal the BlockbeatDispatcher to stop. quit chan struct{} + + // queryHeightChan is used to receive queries on the current height of + // the dispatcher. + queryHeightChan chan *query +} + +// query is used to fetch the internal state of the dispatcher. +type query struct { + // respChan is used to send back the current height back to the caller. + // + // NOTE: This channel must be buffered. + respChan chan int32 +} + +// newQuery creates a query to be used to fetch the internal state of the +// dispatcher. +func newQuery() *query { + return &query{ + respChan: make(chan int32, 1), + } } // NewBlockbeatDispatcher returns a new blockbeat dispatcher instance. func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher { return &BlockbeatDispatcher{ - notifier: n, - quit: make(chan struct{}), - consumerQueues: make(map[uint32][]Consumer), + notifier: n, + quit: make(chan struct{}), + consumerQueues: make(map[uint32][]Consumer), + queryHeightChan: make(chan *query, 1), } } @@ -161,6 +182,18 @@ func (b *BlockbeatDispatcher) dispatchBlocks( b.log().Infof("Notified all consumers on new block "+ "in %v", time.Since(start)) + // A query has been made to fetch the current height, we now + // send the height from its current beat. + case query := <-b.queryHeightChan: + // The beat may not be set yet, e.g., during the startup + // the query is made before the block epoch being sent. + height := int32(0) + if b.beat != nil { + height = b.beat.Height() + } + + query.respChan <- height + case <-b.quit: b.log().Debugf("BlockbeatDispatcher quit signal " + "received") @@ -170,6 +203,30 @@ func (b *BlockbeatDispatcher) dispatchBlocks( } } +// CurrentHeight returns the current best height known to the dispatcher. 0 is +// returned if the dispatcher is shutting down. +func (b *BlockbeatDispatcher) CurrentHeight() int32 { + query := newQuery() + + select { + case b.queryHeightChan <- query: + + case <-b.quit: + clog.Debugf("BlockbeatDispatcher quit before query") + return 0 + } + + select { + case height := <-query.respChan: + clog.Debugf("Responded current height: %v", height) + return height + + case <-b.quit: + clog.Debugf("BlockbeatDispatcher quit before response") + return 0 + } +} + // notifyQueues notifies each queue concurrently about the latest block epoch. func (b *BlockbeatDispatcher) notifyQueues() error { // errChans is a map of channels that will be used to receive errors diff --git a/chainio/dispatcher_test.go b/chainio/dispatcher_test.go index 1cab5b998..eb1dedb6f 100644 --- a/chainio/dispatcher_test.go +++ b/chainio/dispatcher_test.go @@ -381,3 +381,60 @@ func TestNotifyQueuesError(t *testing.T) { err := b.notifyQueues() require.ErrorIs(t, err, errDummy) } + +// TestCurrentHeight asserts `CurrentHeight` returns the expected block height. +func TestCurrentHeight(t *testing.T) { + t.Parallel() + + testHeight := int32(1000) + + // Create a mock chain notifier. + mockNotifier := &chainntnfs.MockChainNotifier{} + defer mockNotifier.AssertExpectations(t) + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + mockBeat.On("Height").Return(testHeight).Once() + + // Create a mock consumer. + consumer := &MockConsumer{} + defer consumer.AssertExpectations(t) + consumer.On("Name").Return("mocker1") + + // Create one queue. + queue := []Consumer{consumer} + + // Create a new dispatcher. + b := NewBlockbeatDispatcher(mockNotifier) + + // Register the queues. + b.RegisterQueue(queue) + + // Attach the blockbeat. + b.beat = mockBeat + + // Mock the chain notifier to return a valid notifier. + blockEpochs := &chainntnfs.BlockEpochEvent{ + Cancel: func() {}, + } + mockNotifier.On("RegisterBlockEpochNtfn", + mock.Anything).Return(blockEpochs, nil).Once() + + // Start the dispatcher now should not return an error. + err := b.Start() + require.NoError(t, err) + + // Make a query on the current height and assert it equals to + // testHeight. + height := b.CurrentHeight() + require.Equal(t, testHeight, height) + + // Stop the dispatcher. + b.Stop() + + // Make a query on the current height and assert it equals to 0. + height = b.CurrentHeight() + require.Zero(t, height) +} From 59759f861ff974509fd36dbd09b97ffbc1bed43b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 11 Feb 2025 14:55:37 +0800 Subject: [PATCH 2/3] rpcserver: check `blockbeatDispatcher` when deciding `isSynced` This commit changes `GetInfo` to include `blockbeatDispatcher`'s current state when deciding whether the system is synced to chain. Previously we check the best height against the wallet and the channel graph, we should also do this to the blockbeat dispatcher to make sure the internal consumers are also synced to the best block. --- rpcserver.go | 110 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 25 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 2e17eb75c..e096d31f8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3225,28 +3225,10 @@ func (r *rpcServer) GetInfo(_ context.Context, idPub := r.server.identityECDH.PubKey().SerializeCompressed() encodedIDPub := hex.EncodeToString(idPub) - bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock() + // Get the system's chain sync info. + syncInfo, err := r.getChainSyncInfo() if err != nil { - return nil, fmt.Errorf("unable to get best block info: %w", err) - } - - isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced() - if err != nil { - return nil, fmt.Errorf("unable to sync PoV of the wallet "+ - "with current best block in the main chain: %v", err) - } - - // If the router does full channel validation, it has a lot of work to - // do for each block. So it might be possible that it isn't yet up to - // date with the most recent block, even if the wallet is. This can - // happen in environments with high CPU load (such as parallel itests). - // Since the `synced_to_chain` flag in the response of this call is used - // by many wallets (and also our itests) to make sure everything's up to - // date, we add the router's state to it. So the flag will only toggle - // to true once the router was also able to catch up. - if !r.cfg.Routing.AssumeChannelValid { - routerHeight := r.server.graphBuilder.SyncedHeight() - isSynced = isSynced && uint32(bestHeight) == routerHeight + return nil, err } network := lncfg.NormalizeNetwork(r.cfg.ActiveNetParams.Name) @@ -3297,15 +3279,15 @@ func (r *rpcServer) GetInfo(_ context.Context, NumActiveChannels: activeChannels, NumInactiveChannels: inactiveChannels, NumPeers: uint32(len(serverPeers)), - BlockHeight: uint32(bestHeight), - BlockHash: bestHash.String(), - SyncedToChain: isSynced, + BlockHeight: uint32(syncInfo.bestHeight), + BlockHash: syncInfo.blockHash.String(), + SyncedToChain: syncInfo.isSynced, Testnet: isTestNet, Chains: activeChains, Uris: uris, Alias: nodeAnn.Alias.String(), Color: nodeColor, - BestHeaderTimestamp: bestHeaderTimestamp, + BestHeaderTimestamp: syncInfo.timestamp, Version: version, CommitHash: build.CommitHash, SyncedToGraph: isGraphSynced, @@ -8929,3 +8911,81 @@ func rpcInitiator(isInitiator bool) lnrpc.Initiator { return lnrpc.Initiator_INITIATOR_REMOTE } + +// chainSyncInfo wraps info about the best block and whether the system is +// synced to that block. +type chainSyncInfo struct { + // isSynced specifies whether the whole system is considered synced. + // When true, it means the following subsystems are at the best height + // reported by the chain backend, + // - wallet. + // - channel graph. + // - blockbeat dispatcher. + isSynced bool + + // bestHeight is the current height known to the chain backend. + bestHeight int32 + + // blockHash is the hash of the current block known to the chain + // backend. + blockHash chainhash.Hash + + // timestamp is the block's timestamp the wallet has synced to. + timestamp int64 +} + +// getChainSyncInfo queries the chain backend, the wallet, the channel router +// and the blockbeat dispatcher to determine the best block and whether the +// system is considered synced. +func (r *rpcServer) getChainSyncInfo() (*chainSyncInfo, error) { + bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock() + if err != nil { + return nil, fmt.Errorf("unable to get best block info: %w", err) + } + + isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced() + if err != nil { + return nil, fmt.Errorf("unable to sync PoV of the wallet "+ + "with current best block in the main chain: %v", err) + } + + // Create an info to be returned. + info := &chainSyncInfo{ + isSynced: isSynced, + bestHeight: bestHeight, + blockHash: *bestHash, + timestamp: bestHeaderTimestamp, + } + + // Exit early if the wallet is not synced. + if !isSynced { + return info, nil + } + + // If the router does full channel validation, it has a lot of work to + // do for each block. So it might be possible that it isn't yet up to + // date with the most recent block, even if the wallet is. This can + // happen in environments with high CPU load (such as parallel itests). + // Since the `synced_to_chain` flag in the response of this call is used + // by many wallets (and also our itests) to make sure everything's up to + // date, we add the router's state to it. So the flag will only toggle + // to true once the router was also able to catch up. + if !r.cfg.Routing.AssumeChannelValid { + routerHeight := r.server.graphBuilder.SyncedHeight() + isSynced = uint32(bestHeight) == routerHeight + } + + // Exit early if the channel graph is not synced. + if !isSynced { + return info, nil + } + + // Given the wallet and the channel router are synced, we now check + // whether the blockbeat dispatcher is synced. + height := r.server.blockbeatDispatcher.CurrentHeight() + + // Overwrite isSynced and return. + info.isSynced = height == bestHeight + + return info, nil +} From 759dc2066e930d93fc0cee7a7321f69ff2ac359b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 11 Feb 2025 15:04:29 +0800 Subject: [PATCH 3/3] docs: update release notes --- docs/release-notes/release-notes-0.19.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index e70492d6f..a34f1edb6 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -165,6 +165,10 @@ use the configured budget values for HTLCs (first level sweep) in parcticular `--sweeper.budget.deadlinehtlcratio` and `--sweeper.budget.deadlinehtlc`. +* When deciding whether `lnd` is synced to chain, the current height from the + blockbeat dispatcher is now also [taken into + consideration](https://github.com/lightningnetwork/lnd/pull/9501). + ## RPC Updates * Some RPCs that previously just returned an empty response message now at least