From 42343184f49f362c843b9001bf7655a5923a8111 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 23 Feb 2023 17:59:44 -0800 Subject: [PATCH] lnd: hook up neutrino's rebroadcaster for full node backends Neutrino already runs this rebroadcaster in the background, so we don't need to create it again if we're running in that operating mode. --- config_builder.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/config_builder.go b/config_builder.go index 225c82b96..08b134270 100644 --- a/config_builder.go +++ b/config_builder.go @@ -11,18 +11,23 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/walletdb" proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/neutrino" + "github.com/lightninglabs/neutrino/blockntfns" "github.com/lightninglabs/neutrino/headerfs" + "github.com/lightninglabs/neutrino/pushtx" "github.com/lightningnetwork/lnd/blockcache" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/invoices" @@ -606,6 +611,65 @@ func (d *DefaultWalletImpl) BuildWalletConfig(ctx context.Context, return partialChainControl, walletConfig, cleanUp, nil } +// proxyBlockEpoch proxies a block epoch subsections to the underlying neutrino +// rebroadcaster client. +func proxyBlockEpoch(notifier chainntnfs.ChainNotifier, +) func() (*blockntfns.Subscription, error) { + + return func() (*blockntfns.Subscription, error) { + blockEpoch, err := notifier.RegisterBlockEpochNtfn( + nil, + ) + if err != nil { + return nil, err + } + + sub := blockntfns.Subscription{ + Notifications: make(chan blockntfns.BlockNtfn, 6), + Cancel: blockEpoch.Cancel, + } + go func() { + for blk := range blockEpoch.Epochs { + ntfn := blockntfns.NewBlockConnected( + *blk.BlockHeader, + uint32(blk.Height), + ) + + sub.Notifications <- ntfn + } + }() + + return &sub, nil + } +} + +// walletReBroadcaster is a simple wrapper around the pushtx.Broadcaster +// interface to adhere to the expanded lnwallet.Rebraodcaster interface. +type walletReBroadcaster struct { + started atomic.Bool + + *pushtx.Broadcaster +} + +// newWalletReBroadcaster creates a new instance of the walletReBroadcaster. +func newWalletReBroadcaster(broadcaster *pushtx.Broadcaster) *walletReBroadcaster { + return &walletReBroadcaster{ + Broadcaster: broadcaster, + } +} + +// Start launches all goroutines the rebroadcaster needs to operate. +func (w *walletReBroadcaster) Start() error { + defer w.started.Store(true) + + return w.Broadcaster.Start() +} + +// Started returns true if the broadcaster is already active. +func (w *walletReBroadcaster) Started() bool { + return w.started.Load() +} + // BuildChainControl is responsible for creating a fully populated chain // control instance from a wallet. // @@ -641,6 +705,29 @@ func (d *DefaultWalletImpl) BuildChainControl( NetParams: *walletConfig.NetParams, } + // The broadcast is already always active for neutrino nodes, so we + // don't want to create a rebroadcast loop. + if partialChainControl.Cfg.NeutrinoCS == nil { + broadcastCfg := pushtx.Config{ + Broadcast: func(tx *wire.MsgTx) error { + cs := partialChainControl.ChainSource + _, err := cs.SendRawTransaction( + tx, true, + ) + + return err + }, + SubscribeBlocks: proxyBlockEpoch( + partialChainControl.ChainNotifier, + ), + RebroadcastInterval: pushtx.DefaultRebroadcastInterval, + } + + lnWalletConfig.Rebroadcaster = newWalletReBroadcaster( + pushtx.NewBroadcaster(&broadcastCfg), + ) + } + // We've created the wallet configuration now, so we can finish // initializing the main chain control. activeChainControl, cleanUp, err := chainreg.NewChainControl(