multi: add new method ChainArbitrator.RedispatchBlockbeat

This commit adds a new method to enable us resending the blockbeat in
`ChainArbitrator`, which is needed for the channel restore as the chain
watcher and channel arbitrator are added after the start of the chain
arbitrator.
This commit is contained in:
yyforyongyu 2024-11-14 02:04:00 +08:00
parent 4d765668cc
commit 6eb9bb1ed6
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 52 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
@ -286,6 +287,9 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
ltndLog.Infof("Informing chain watchers of new restored channels")
// Create a slice of channel points.
chanPoints := make([]wire.OutPoint, 0, len(channelShells))
// Finally, we'll need to inform the chain arbitrator of these new
// channels so we'll properly watch for their ultimate closure on chain
// and sweep them via the DLP.
@ -294,8 +298,15 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
if err != nil {
return err
}
chanPoints = append(
chanPoints, restoredChannel.Chan.FundingOutpoint,
)
}
// With all the channels restored, we'll now re-send the blockbeat.
c.chainArb.RedispatchBlockbeat(chanPoints)
return nil
}
@ -314,7 +325,7 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
// to ensure the new connection is created after this new link/channel
// is known.
if err := s.DisconnectPeer(nodePub); err != nil {
ltndLog.Infof("Peer(%v) is already connected, proceeding "+
ltndLog.Infof("Peer(%x) is already connected, proceeding "+
"with chan restore", nodePub.SerializeCompressed())
}

View File

@ -1371,3 +1371,43 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {
return nil
}
// RedispatchBlockbeat resends the current blockbeat to the channels specified
// by the chanPoints. It is used when a channel is added to the chain
// arbitrator after it has been started, e.g., during the channel restore
// process.
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
// Get the current blockbeat.
beat := c.beat
// Prepare two sets of consumers.
channels := make([]chainio.Consumer, 0, len(chanPoints))
watchers := make([]chainio.Consumer, 0, len(chanPoints))
// Read the active channels in a lock.
c.Lock()
for _, op := range chanPoints {
if channel, ok := c.activeChannels[op]; ok {
channels = append(channels, channel)
}
if watcher, ok := c.activeWatchers[op]; ok {
watchers = append(watchers, watcher)
}
}
c.Unlock()
// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
}
// Iterate all the copied channels and send the blockbeat to them.
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
// Shutdown lnd if there's an error processing the block.
log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
err)
}
}